diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/command/CommandParse.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/command/CommandParse.java index 988448033ea50ec5f8892801865e5c9c30588e40..80e067f004434748f397a8911702ad793fe45517 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/command/CommandParse.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/command/CommandParse.java @@ -106,8 +106,9 @@ public class CommandParse { argumentParser.addArgument("-sgu", "--seg-time-unit") .dest(ArgumentsConstants.TIME_UNIT) .setDefault(ArgumentsConstants.HOUR) - .choices(ArgumentsConstants.DAY,ArgumentsConstants.HOUR,ArgumentsConstants.MONTH) + .choices(ArgumentsConstants.DAY,ArgumentsConstants.HOUR,ArgumentsConstants.MONTH,ArgumentsConstants.YEAR) .help("Time unit of segmentation algorithm"); + argumentParser.addArgument("-sgm", "--segmentation-method") .dest("segMethod") .dest(ArgumentsConstants.SEG_METHOD) @@ -115,6 +116,57 @@ public class CommandParse { .choices(ArgumentsConstants.UNIT_METHOD,ArgumentsConstants.SPEED_METHOD) .help("Segmentation Algorithm"); + argumentParser.addArgument("-ft", "--feature-type") + .dest("featureType") + .dest(ArgumentsConstants.FEATURE_TYPE) + .setDefault(ArgumentsConstants.NORMAL_TYPE) + .choices(ArgumentsConstants.NORMAL_TYPE,ArgumentsConstants.HISTOGRAM_TYPE) + .help("Feature Type"); + + argumentParser.addArgument("-dm", "--detector-method") + .dest("detectMethod") + .dest(ArgumentsConstants.DETECTOR_METHOD) + .setDefault(ArgumentsConstants.LOF_METHOD) + .choices(ArgumentsConstants.LOF_METHOD,ArgumentsConstants.HISTORGRAM_METHOD) + .help("Detector Algorithm"); + + argumentParser.addArgument("-wft", "--weight-function-type") + .dest("detectMethod") + .dest(ArgumentsConstants.WEIGHTFUNC_METHOD) + .setDefault(ArgumentsConstants.CONSTANT_METHOD) + .choices(ArgumentsConstants.CONSTANT_METHOD,ArgumentsConstants.PIECEWISE_METHOD,ArgumentsConstants.LINEAR_METHOD,ArgumentsConstants.POLYMIALCONVEX_METHOD,ArgumentsConstants.POLYMIALCONCAVE_METHOD) + .help("Weight Function Method"); + + argumentParser.addArgument("-bn", "--buck-number") + .type(Integer.class) + .dest(ArgumentsConstants.BUCK_NUM) + .setDefault(100) + .help("the number of buck"); + + argumentParser.addArgument("-tpf", "--top-percent-filter") + .type(Double.class) + .dest(ArgumentsConstants.TOP_PERCENT) + .setDefault(0.0) + .help("the first a% percent of node to filter"); + + argumentParser.addArgument("-dpf", "--down-percent-filter") + .type(Double.class) + .dest(ArgumentsConstants.DOWN_PERCENT) + .setDefault(0.0) + .help("the last b% percent of node to filter"); + + argumentParser.addArgument("-wmin", "--weight-function-min") + .type(Double.class) + .dest(ArgumentsConstants.WEIGHTFUNC_THRES) + .setDefault(0.0) + .help("the min value of the weight function"); + + argumentParser.addArgument("-nthres", "--normal-thres") + .type(Double.class) + .dest(ArgumentsConstants.NORMAL_THRES) + .setDefault(0.5) + .help("the threshold of normal point"); + Namespace ns = null; try { ns = argumentParser.parseArgs(args); diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/constant/ArgumentsConstants.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/constant/ArgumentsConstants.java index 3d58b8651acf4cc8518fbdb573f018d1bb8715e6..8a9253a8da3867209ee1f7911d01b8b436240f90 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/constant/ArgumentsConstants.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/constant/ArgumentsConstants.java @@ -20,18 +20,6 @@ public class ArgumentsConstants { public final static String LOF_K = "lofK"; public final static String LOCAL = "local"; - public final static String TIME_UNIT = "timeunit"; - public final static String YEAR = "year"; - public final static String MONTH = "month"; - public final static String DAY = "day"; - public final static String HOUR = "hour"; - public final static String MINUTE = "minute"; - public final static String SECOND = "second"; - - public final static String SEG_METHOD = "segmethod"; - public final static String SPEED_METHOD = "speed"; - public final static String UNIT_METHOD = "unit"; - public final static String PATH_SEGMENT_PARAMETER = "segmentParameterPath"; public final static String OUTPUT_PATH_SEGMENT_PARAMETER = "segmentParameterOutput"; public final static String PATH_MASTER_SEGMENTS = "masterSegmentPath"; @@ -44,6 +32,41 @@ public class ArgumentsConstants { public final static String OUTPUT_PATH_TEST_FEATURES = "testFeatureOutput"; public final static String OUTPUT_PATH_OUTLIERS = "testOutlierOutput"; + public final static String SEG_METHOD = "segmethod"; + public final static String SPEED_METHOD = "speed"; + public final static String UNIT_METHOD = "unit"; + + public final static String TIME_UNIT = "timeunit"; + public final static String YEAR = "year"; + public final static String MONTH = "month"; + public final static String DAY = "day"; + public final static String HOUR = "hour"; + public final static String MINUTE = "minute"; + public final static String SECOND = "second"; + + public final static String WEIGHTFUNC_METHOD = "wightfuncmethod"; + public final static String LINEAR_METHOD = "linear"; + public final static String POLYMIALCONVEX_METHOD = "polymialconvex"; + public final static String POLYMIALCONCAVE_METHOD = "polymialconcave"; + public final static String CONSTANT_METHOD = "constant"; + public final static String PIECEWISE_METHOD = "piecewise"; + public final static String WEIGHTFUNC_THRES = "wightfuncthres"; + + public final static String FEATURE_TYPE = "feature"; + public final static String NORMAL_TYPE = "normal"; + public final static String HISTOGRAM_TYPE = "histogram"; + + public final static String DETECTOR_METHOD = "detectmethod"; + public final static String LOF_METHOD = "lof"; + public final static String HISTORGRAM_METHOD = "histogram"; + + + public final static String BUCK_NUM = "bucknum"; + public final static String TOP_PERCENT = "toppercent"; + public final static String DOWN_PERCENT = "downpercent"; + public final static String NORMAL_THRES = "normalthres"; + + public final static String ALL_COLUMN_PREFIX = "#ALL#"; } diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/constant/ExtractorConstants.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/constant/ExtractorConstants.java index ccd4edc6ba71bc61fc4821cfe88952a34ea88df5..26000878f8baa44c93a8286c23d71fcf55e71f60 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/constant/ExtractorConstants.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/constant/ExtractorConstants.java @@ -17,5 +17,7 @@ public class ExtractorConstants { public final static String SKEWNESS = "Skewness"; public final static String KURTOSIS = "Kurtosis"; public final static String COUNT = "Count"; + public final static String TIME = "SegStartTime"; + public final static String BUCKINDEX = "BUCKINDEX"; public final static int FEATURE_NUM = 7; } diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/DetectMethodType.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/DetectMethodType.java new file mode 100644 index 0000000000000000000000000000000000000000..dad127429e5152ec7842d447b38b5cab3adec797 --- /dev/null +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/DetectMethodType.java @@ -0,0 +1,40 @@ +package com.k2data.governance.matching.common.type; + +import com.k2data.governance.matching.common.constant.ArgumentsConstants; + +/** + * Created by gaoyu on 2018/2/5 10:12. + * E-mail address is nkugaoyu@163.com. + * Copyright 2018 gaoyu. All Rights Reserved. + * + * @author gaoyu + */ +public enum DetectMethodType { + LOF(0),HISTOGRAM(1); + + private final int value; + + DetectMethodType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public static DetectMethodType fromString(String str) { + DetectMethodType type; + switch (str.toLowerCase()) { + case ArgumentsConstants.LOF_METHOD: + type = DetectMethodType.LOF; + break; + case ArgumentsConstants.HISTOGRAM_TYPE: + type = DetectMethodType.HISTOGRAM; + break; + default: + type = DetectMethodType.LOF; + break; + } + return type; + } +} diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/FeatureType.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/FeatureType.java new file mode 100644 index 0000000000000000000000000000000000000000..dd6d5abae7394b7eac1dec8bf9fbd596cbd3330f --- /dev/null +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/FeatureType.java @@ -0,0 +1,40 @@ +package com.k2data.governance.matching.common.type; + +import com.k2data.governance.matching.common.constant.ArgumentsConstants; + +/** + * Created by gaoyu on 2018/2/5 10:03. + * E-mail address is nkugaoyu@163.com. + * Copyright 2018 gaoyu. All Rights Reserved. + * + * @author gaoyu + */ +public enum FeatureType { + NORMAL(0),HISTOGRAM(1); + + private final int value; + + FeatureType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public static FeatureType fromString(String str) { + FeatureType type; + switch (str.toLowerCase()) { + case ArgumentsConstants.NORMAL_TYPE: + type = FeatureType.NORMAL; + break; + case ArgumentsConstants.HISTOGRAM_TYPE: + type = FeatureType.HISTOGRAM; + break; + default: + type = FeatureType.NORMAL; + break; + } + return type; + } +} diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/common/type/SegMethodType.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/SegMethodType.java similarity index 93% rename from k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/common/type/SegMethodType.java rename to k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/SegMethodType.java index 233549753d934abc8095d3296a096336464ce5f4..1637c3d305cc8348296658dea9f84a53c72a60df 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/common/type/SegMethodType.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/SegMethodType.java @@ -1,4 +1,4 @@ -package com.k2data.governance.matching.segment.common.type; +package com.k2data.governance.matching.common.type; import com.k2data.governance.matching.common.constant.ArgumentsConstants; diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/common/type/TimeUnitType.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/TimeUnitType.java similarity index 90% rename from k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/common/type/TimeUnitType.java rename to k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/TimeUnitType.java index b232c657d36461d2ed2c08afa5e04682732f943e..b9b98fd69ac7d97ea173c0630483f7bfdea96dba 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/common/type/TimeUnitType.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/TimeUnitType.java @@ -1,4 +1,4 @@ -package com.k2data.governance.matching.segment.common.type; +package com.k2data.governance.matching.common.type; import com.k2data.governance.matching.common.constant.ArgumentsConstants; @@ -11,7 +11,7 @@ import com.k2data.governance.matching.common.constant.ArgumentsConstants; * @author gaoyu */ public enum TimeUnitType { - YEAR(0),MONTH(1),DAY(2),HOUR(3),MINUTE(4),SECOND(5); + YEAR(0),MONTH(1),WEEK(2),DAY(3),HOUR(4),MINUTE(5),SECOND(6); private final int value; diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/WeightFunctionType.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/WeightFunctionType.java new file mode 100644 index 0000000000000000000000000000000000000000..710cd5bd86bcf2bd77c39319d4d837f93e20eeda --- /dev/null +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/common/type/WeightFunctionType.java @@ -0,0 +1,48 @@ +package com.k2data.governance.matching.common.type; + +import com.k2data.governance.matching.common.constant.ArgumentsConstants; + +/** + * Created by gaoyu on 2018/2/5 0:42. + * E-mail address is nkugaoyu@163.com. + * Copyright 2018 gaoyu. All Rights Reserved. + * + * @author gaoyu + */ +public enum WeightFunctionType { + LINEAR(0), POLYMIALCONVEX(1), POLYMIALCONCAVE(2), CONSTANT(3), PIECEWISE(4); + private final int value; + + WeightFunctionType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public static WeightFunctionType fromString(String str) { + WeightFunctionType type; + switch (str.toLowerCase()) { + case ArgumentsConstants.LINEAR_METHOD: + type = WeightFunctionType.LINEAR; + break; + case ArgumentsConstants.POLYMIALCONCAVE_METHOD: + type = WeightFunctionType.POLYMIALCONCAVE; + break; + case ArgumentsConstants.POLYMIALCONVEX_METHOD: + type = WeightFunctionType.POLYMIALCONVEX; + break; + case ArgumentsConstants.CONSTANT_METHOD: + type = WeightFunctionType.CONSTANT; + break; + case ArgumentsConstants.PIECEWISE_METHOD: + type = WeightFunctionType.PIECEWISE; + break; + default: + type = WeightFunctionType.CONSTANT; + break; + } + return type; + } +} diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/record/HistogramFeature.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/record/HistogramFeature.java new file mode 100644 index 0000000000000000000000000000000000000000..5bfbdc7e1c3af4bfb1234800227669266295199e --- /dev/null +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/record/HistogramFeature.java @@ -0,0 +1,119 @@ +package com.k2data.governance.matching.record; + +import scala.Serializable; + +/** + * Created by gaoyu on 2018/2/3 12:59. + * E-mail address is nkugaoyu@163.com. + * Copyright 2018 gaoyu. All Rights Reserved. + * + * @author gaoyu + */ +public class HistogramFeature implements Serializable { + private int segID; + private double min; + private double max; + private int cnt; + private String transTime; + private int buckNum; + private double[]buckVal; + private int startBuck; + private int endBuck; + + public HistogramFeature(int segID, double min, double max, int cnt, String transTime, int buckNum) { + this.segID = segID; + this.min = min; + this.max = max; + this.cnt = cnt; + this.transTime = transTime; + this.buckNum = buckNum; + this.buckVal = new double[buckNum]; + startBuck = 0; + endBuck = buckNum-1; + + } + + public HistogramFeature(int segID, double min, double max, int cnt, String transTime, int buckNum, double[] buckVal) { + this.segID = segID; + this.min = min; + this.max = max; + this.cnt = cnt; + this.transTime = transTime; + this.buckNum = buckNum; + this.buckVal = buckVal; + this.startBuck = 0; + this.endBuck = buckNum-1; + } + + public String getTransTime() { + return transTime; + } + + public int getStartBuck() { + return startBuck; + } + + public int getEndBuck() { + return endBuck; + } + + public void setTransTime(String transTime) { + this.transTime = transTime; + } + + public void setStartBuck(int startBuck) { + this.startBuck = startBuck; + } + + public void setEndBuck(int endBuck) { + this.endBuck = endBuck; + } + + public int getCnt() { + return cnt; + } + + public void setCnt(int cnt) { + this.cnt = cnt; + } + + public int getSegID() { + return segID; + } + + public void setSegID(int segID) { + this.segID = segID; + } + + public void setMin(double min) { + this.min = min; + } + + public void setMax(double max) { + this.max = max; + } + + public void setBuckNum(int buckNum) { + this.buckNum = buckNum; + } + + public void setBuckVal(double[] buckVal) { + this.buckVal = buckVal; + } + + public double getMin() { + return min; + } + + public double getMax() { + return max; + } + + public int getBuckNum() { + return buckNum; + } + + public double[] getBuckVal() { + return buckVal; + } +} diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/util/DataFrameUtil.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/util/DataFrameUtil.java index b91af02b17ec8577537423b3db6cc4e5334633c5..dc6c30fda89a7b130f80735efab482afe530404b 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/util/DataFrameUtil.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/util/DataFrameUtil.java @@ -241,6 +241,8 @@ public class DataFrameUtil { return getDouble(row, attrIndex); } + + public static int getInt(Row row, int attrIndex) throws NumberFormatException { Object value = row.get(attrIndex); if (value == null) { diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/util/DateUtil.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/util/DateUtil.java index 277b83500b86d1829c9535c8e9401e15d8698a2f..db892866d7017f1bc285ef0032dae78e21d412ef 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/util/DateUtil.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-common/src/main/java/com/k2data/governance/matching/util/DateUtil.java @@ -1,7 +1,10 @@ package com.k2data.governance.matching.util; +import com.k2data.governance.matching.common.type.TimeUnitType; + import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Calendar; import java.util.Date; /** @@ -12,7 +15,7 @@ import java.util.Date; * @author wangyihan */ public class DateUtil { - private static ThreadLocal local = new ThreadLocal(); + private static ThreadLocal local = new ThreadLocal<>(); private static String formatString = "yyyy-MM-dd HH:mm:ss.SSS"; public static void setDateFormat(String formatStr) { @@ -38,13 +41,56 @@ public class DateUtil { return getDateFormat().format(timestamp); } - public static String getTranFormatTime(long timestamp){ + public static String getTranFormatTime(long timestamp) { String transFormat = "yyyy-MM-dd HH:mm:ss"; SimpleDateFormat dateFormat = new SimpleDateFormat(transFormat); String transTime = dateFormat.format(timestamp); return transTime; } + public static int getSpecificTimeUnit(String timestamp, TimeUnitType timeUnitType) { + String transFormat = "yyyy-MM-dd HH:mm:ss"; + SimpleDateFormat dateFormat = new SimpleDateFormat(transFormat); + String transTime = dateFormat.format(Long.valueOf(timestamp)); + Date date = null; + Calendar calendar = Calendar.getInstance(); + try { + date = dateFormat.parse(transTime); + calendar.setTime(date); + calendar.setFirstDayOfWeek(2); + } catch (ParseException e) { + e.printStackTrace(); + } + + + int index; + switch (timeUnitType) { + case YEAR: + index = calendar.get(Calendar.YEAR); + break; + case MONTH: + index = calendar.get(Calendar.MONTH) + 1; + break; + case WEEK: + index = calendar.get(Calendar.WEEK_OF_YEAR); + break; + case DAY: + index = calendar.get(Calendar.DAY_OF_YEAR); + break; + case HOUR: + int tmpDay = calendar.get(Calendar.DAY_OF_YEAR); + int tmpYear = calendar.get(Calendar.YEAR); + int tmpHour = calendar.get(Calendar.HOUR_OF_DAY); + index = (tmpYear-1)*366*24+(tmpDay-1)*24+tmpHour; + break; + default: + index = -1; + break; + } + + return index; + } + public static Date parse(String dateStr) throws ParseException { return getDateFormat().parse(dateStr); } diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/README.md b/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/README.md index 22670bf8976d77c75ca339a1af0b331a753ff378..de3dc5257e1d72f987af2347fe26e2a5c83f437f 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/README.md +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/README.md @@ -1,3 +1,5 @@ + + # 金风 异常发现 ## 环境说明 @@ -10,20 +12,45 @@ ## 使用说明 * spark 版本基项目入口函数为 `com.k2data.governance.matching.detecting.command.DetectWorker` - -* 运行参数:detect [-mfP MASTERFEATUREPATH\] [-tfP TESTFEATUREPATH\] [-toO TESTOUTLIEROUTPUT\] [-lk LOFK\] [-m METRIC\] [-k KEYS\] - +* 运行参数:detect [-mfP MASTERFEATUREPATH\] [-tfP TESTFEATUREPATH\] [-toO TESTOUTLIEROUTPUT\] [-k KEYS\] [-l\] [-dm DETECTOR_METHOD\] [-m METRIC\] [-lk LOFK\] [-wft WEIGHTFUNC_METHOD\] [-bn BUCK_NUM\] [-sgu TIME_UNIT\] [-tpf TOP_PERCENT\] [-dpf DOWN_PERCENT\] [-wmin WEIGHTFUNC_THRES] * MASTERFEATUREPATH:主数据特征输入路径 * TESTFEATUREPATH:待测数据特征输入路径 * TESTOUTLIEROUTPUT:待测数据异常因子输出路径 - * LOFK:lof算法近邻值 - * METRIC:用于计算异常的特征属性 * KEYS:主键 - + * LOFK:lof算法近邻值(基于Lof方法需要) + * METRIC:用于计算异常的特征属性(基于Lof方法需要) + * WEIGHTFUNC_METHOD:选取的权重函数{constant, piecewise, linear, polymialconvex, polymialconcave}(基于覆盖率方法需要,后面有详细介绍各种函数) + * TIME_UNIT 时间分段的单位,同权重函数有关(基于覆盖率方法需要) + * BUCK_NUM:分桶的大小(基于覆盖率方法需要) + * TOP_PERCENT:从小到大,过滤掉前a%的点(基于覆盖率方法需要) + * DOWN_PERCENT:从小到大,过滤掉后a%的点(基于覆盖率方法需要) + * -l:以 Spark Local 模式运行,否则去掉该参数即可 + + +## 权重函数 + +* 针对月,日,周: + * 常值函数 + * $y = 1.0$ + * linear (线性函数) + * $y =\frac{(min-1)}{l} *x+1$ + * 即两个时间点差值x为0时权重为1,差值x达到最大值l时权重为用户指定的最小值min (小于1.0) + * polymialconvex (多项式,凸函数) + * $y = \frac{(1-min)}{l* l}*(x-6)^2+min$ + * 即两个时间点差值x为0时权重为1,差值x达到最大值lowest时权重为用户指定的最小值min (小于1.0) + * polymialconvex (多项式,凹函数) + * $y = \frac{(min-1)}{l*l}*x^2+1$ + * 即两个时间点差值x为0时权重为1,差值x达到最大值lowest时权重为用户指定的最小值min (小于1.0) + * piecewise (分段函数) + * 目前认为在同一个时间点(同一天,同一个月,同一星期)则为1否则为0 +* 针对年 + * 如果按年进行分段的话,默认采用exponential函数,即 + * $min+ (1.0 - min) * e^{-x}$ + * 即最大值为1,后面随着x增大逐渐趋近于用户指定的最小值 ## 结果输出格式 -| 主键(wfid,wtid) | 属性(FiledName) | 分段ID(SegmentID) | 异常因子(AnomalyFactor,LOF算法越接近于1越正常) | -| ---------------- | -------------------------- | --------------- | --------------------------------- | -| 650127,650127033 | WTPS_Temp_Ra_F32_Pcharger2 | 0 | 1.0 | +| 主键(wfid,wtid) | 属性(FiledName) | 分段ID(SegmentID) | 检测值(越接近于1越正常,lof算法结果是[0,正无穷),基于覆盖率的方法是[0,1] ) | +| ---------------- | -------------------------- | --------------- | ---------------------------------------- | +| 650127,650127033 | WTPS_Temp_Ra_F32_Pcharger2 | 0 | 1.0 | diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/command/DetectProcess.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/command/DetectProcess.java index f344a56eb2a2f39756a9a4d5ef2e767ab0a9a375..1322d68ad8075537aa535e29ec4f64305436dfbf 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/command/DetectProcess.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/command/DetectProcess.java @@ -2,8 +2,12 @@ package com.k2data.governance.matching.detecting.command; import com.k2data.governance.matching.command.IWorkProcess; import com.k2data.governance.matching.common.constant.ArgumentsConstants; +import com.k2data.governance.matching.common.type.DetectMethodType; +import com.k2data.governance.matching.common.type.TimeUnitType; +import com.k2data.governance.matching.common.type.WeightFunctionType; import com.k2data.governance.matching.detecting.detector.IDetector; import com.k2data.governance.matching.detecting.detector.LofDetector; +import com.k2data.governance.matching.detecting.detector.OverlapDetector; import com.k2data.governance.matching.util.DataFrameUtil; import com.k2data.governance.matching.util.InitUtil; import net.sourceforge.argparse4j.inf.Namespace; @@ -38,13 +42,41 @@ public class DetectProcess implements IWorkProcess { List metrics = ns.get(ArgumentsConstants.USED_METRICS); int lofK = ns.getInt(ArgumentsConstants.LOF_K); + + IDetector detector; + DetectMethodType detectMethodType = DetectMethodType.fromString(ns.getString(ArgumentsConstants.DETECTOR_METHOD)); + switch (detectMethodType) { + case LOF: + detector = new LofDetector(sparkSession, keys.toArray(new String[keys.size()]), + lofK); + break; + case HISTOGRAM: + int buckNum = ns.getInt(ArgumentsConstants.BUCK_NUM); + double topPercent = ns.getDouble(ArgumentsConstants.TOP_PERCENT); + double downPercent = ns.getDouble(ArgumentsConstants.DOWN_PERCENT); + double weightFuncThres = ns.getDouble(ArgumentsConstants.WEIGHTFUNC_THRES); + double normalThres = ns.getDouble(ArgumentsConstants.NORMAL_THRES); + WeightFunctionType weightFunctionType = WeightFunctionType.fromString(ns.getString(ArgumentsConstants.WEIGHTFUNC_METHOD)); + TimeUnitType timeUnitType = TimeUnitType.fromString(ns.getString(ArgumentsConstants.TIME_UNIT)); + detector = new OverlapDetector(sparkSession, keys.toArray(new String[keys.size()]), + buckNum, topPercent, downPercent, weightFuncThres, normalThres, weightFunctionType, timeUnitType); + break; + default: + detector = new LofDetector(sparkSession, keys.toArray(new String[keys.size()]), + lofK); + break; + } + + Dataset masterFeatureTable = DataFrameUtil.readCsvData(masterFeaturePath, sparkSession); Dataset testFeatureTable = DataFrameUtil.readCsvData(testFeaturePath, sparkSession); - IDetector lofDetector = new LofDetector(sparkSession, keys.toArray(new String[keys.size()]), - lofK); - Dataset testLofTable = lofDetector - .detect(masterFeatureTable, testFeatureTable, metrics.toArray(new String[metrics.size()])); + String[] metricsParam = null; + if (metrics!=null) { + metricsParam = metrics.toArray(new String[metrics.size()]); + } + Dataset testLofTable = detector + .detect(masterFeatureTable, testFeatureTable, metricsParam); String testOutlierOutput = ns.get(ArgumentsConstants.OUTPUT_PATH_OUTLIERS); if (testOutlierOutput != null) { diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/detector/IDetector.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/detector/IDetector.java index b8effe8023b77a87c4008597ee873d145049e275..b9a0b7719ed8ede2a4aeba3fb8cf123bd2d9db7c 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/detector/IDetector.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/detector/IDetector.java @@ -1,13 +1,8 @@ package com.k2data.governance.matching.detecting.detector; -import com.k2data.governance.matching.detecting.record.LofNode; -import com.k2data.governance.matching.record.MetricVector; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import scala.Tuple2; -import java.util.List; /** * Created by stoke on 2018/1/11. @@ -20,7 +15,7 @@ public interface IDetector { Dataset detect(Dataset masterTable, Dataset testTable, String[] metrics); - Dataset detect(JavaPairRDD, Iterable>> featureRdd, boolean sort); +// Dataset detect(JavaPairRDD, Iterable>> featureRdd, boolean sort); - List detectAnomaly(List masterSeries, List testSeries); +// List detectAnomaly(List masterSeries, List testSeries); } diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/detector/LofDetector.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/detector/LofDetector.java index b8f0e43e4c206e23bfefe92437586d1f7295db34..bc29f80d4970e780cb15ed0116fdf6b96efa6fa8 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/detector/LofDetector.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/detector/LofDetector.java @@ -54,7 +54,6 @@ public class LofDetector implements IDetector, Serializable { return detect(masterFeatureRdd.join(testFeatureRdd), true); } - @Override public Dataset detect(JavaPairRDD, Iterable>> featureRdd, boolean sort) { JavaRDD rowRdd = featureRdd.flatMap(new FlatMapFunction, Iterable>>, Row>() { @Override @@ -84,7 +83,6 @@ public class LofDetector implements IDetector, Serializable { /** * ref: http://blog.csdn.net/freedomboy319/article/details/48828449 */ - @Override public List detectAnomaly(List masterSeries, List testSeries) { vector_num = masterSeries.get(0).getMetrics().length; // do not need to sort the masterSeries and testSeries by segID diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/detector/OverlapDetector.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/detector/OverlapDetector.java new file mode 100644 index 0000000000000000000000000000000000000000..af6bb4de05f66b6c87be8f958d25d610cc790863 --- /dev/null +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/detector/OverlapDetector.java @@ -0,0 +1,399 @@ +package com.k2data.governance.matching.detecting.detector; + +import com.k2data.governance.matching.common.constant.DetectorConstants; +import com.k2data.governance.matching.common.constant.ExtractorConstants; +import com.k2data.governance.matching.common.constant.LearnerConstants; +import com.k2data.governance.matching.common.type.TimeUnitType; +import com.k2data.governance.matching.common.type.WeightFunctionType; +import com.k2data.governance.matching.detecting.record.DetecorRes; +import com.k2data.governance.matching.record.HistogramFeature; +import com.k2data.governance.matching.util.CommonUtil; +import com.k2data.governance.matching.util.DataFrameUtil; +import com.k2data.governance.matching.util.DateUtil; +import com.k2data.governance.matching.util.KeyUtil; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import scala.Serializable; +import scala.Tuple2; + +import java.util.*; + +import static com.k2data.governance.matching.common.type.WeightFunctionType.PIECEWISE; +import static com.k2data.governance.matching.util.DataFrameUtil.getString; + +/** + * Created by gaoyu on 2018/2/4 10:32. + * E-mail address is nkugaoyu@163.com. + * Copyright 2018 gaoyu. All Rights Reserved. + * + * @author gaoyu + */ +public class OverlapDetector implements IDetector, Serializable { + private SparkSession sparkSession; + private String[] keyAttrs; + private int buckNum; + private double topPercent; + private double downPercent; + private double weightFuncThres; + private double normalThres; + private WeightFunctionType weightFunctionType; + private TimeUnitType timeUnitType; + + public OverlapDetector(SparkSession sparkSession, String[] keyAttrs, int buckNum, double topPercent, double downPercent, double weightFuncThres, double normalThres, WeightFunctionType weightFunctionType, TimeUnitType timeUnitType) { + this.sparkSession = sparkSession; + this.keyAttrs = keyAttrs; + this.buckNum = buckNum; + this.topPercent = topPercent; + this.downPercent = downPercent; + this.weightFuncThres = weightFuncThres; + this.normalThres = normalThres; + this.weightFunctionType = weightFunctionType; + this.timeUnitType = timeUnitType; + } + + @Override + public Dataset detect(Dataset masterTable, Dataset testTable, + final String[] metrics) { + JavaPairRDD> masterFeatureRdd = + splitFeature(masterTable, keyAttrs); + JavaPairRDD> testFeatureRdd = + splitFeature(testTable, keyAttrs); + + return detect(masterFeatureRdd.join(testFeatureRdd), true); + } + + + /** + * 将数据按属性拆分成段 + * + * @param df 输入数据 + * @param keys 主键 + * @return 返回JavaPairRDD, key为主键, v为特征集合 + */ + private JavaPairRDD> splitFeature( + Dataset df, final String[] keys) { + JavaPairRDD featureRdd = df.javaRDD().mapToPair(new PairFunction() { + @Override + public Tuple2 call(Row row) throws Exception { + Map keyMap = new HashMap<>(); + for (String key : keys) { + keyMap.put(key, getString(row, key)); + } + String keyString = KeyUtil.createConcatString(keyMap); + keyString = KeyUtil.appendKey(keyString, LearnerConstants.FIELD_NAME, getString(row, LearnerConstants.FIELD_NAME)); + int segID = DataFrameUtil.getInt(row, ExtractorConstants.SEGMENT_ID); + //后面两个加1分别是属性和segID + int BuckStart = keys.length + 1 + 1; + double min = DataFrameUtil.getDouble(row, ExtractorConstants.MIN); + double max = DataFrameUtil.getDouble(row, ExtractorConstants.MAX); + int cnt = DataFrameUtil.getInt(row, ExtractorConstants.COUNT); + String unTransTime = DataFrameUtil.getString(row, ExtractorConstants.TIME); + + double[] usedMetrics = new double[buckNum]; + for (int i = 0; i < buckNum; ++i) { + usedMetrics[i] = DataFrameUtil.getDouble(row, BuckStart + i); + } + HistogramFeature hisFea = new HistogramFeature(segID, min, max, cnt, unTransTime, buckNum, usedMetrics); + outlierFilter(hisFea); + return new Tuple2<>(keyString, hisFea); + } + }); + return featureRdd.groupByKey(); + } + + + public Dataset detect(JavaPairRDD, Iterable>> featureRdd, boolean sort) { + JavaRDD rowRdd = featureRdd.flatMap(new FlatMapFunction, Iterable>>, Row>() { + @Override + public Iterator call(Tuple2, Iterable>> keyMasterTestFeatures) throws Exception { + String keyString = keyMasterTestFeatures._1(); + List keyValues = new ArrayList<>(); + for (String keyAttr : keyAttrs) { + keyValues.add(KeyUtil.getFieldFromConcatString(keyString, keyAttr)); + } + List rowList = new ArrayList<>(); + List masterFeatures = CommonUtil.convertIterable(keyMasterTestFeatures._2()._1()); + List testFeatures = CommonUtil.convertIterable(keyMasterTestFeatures._2()._2()); + List testNodeList = detectAnomaly(masterFeatures, testFeatures); + for (DetecorRes testNode : testNodeList) { + List rowValues = new ArrayList<>(keyValues); + rowValues.add(KeyUtil.getFieldFromConcatString(keyString, LearnerConstants.FIELD_NAME)); + rowValues.add(testNode.getNodeID()); + rowValues.add(testNode.getNormalScore()); + rowList.add(RowFactory.create(rowValues.toArray())); + } + return rowList.iterator(); + } + }); + return sparkSession.createDataFrame(rowRdd, buildResultSchema()); + } + + /** + * @param masterSeries 主数据 + * @param testSeries 测试数据 + * @return 主数据同测试数据的 + */ + private List detectAnomaly(List masterSeries, List testSeries) { + + List detecorRes = new ArrayList<>(); + for (HistogramFeature testHisFea : testSeries) { + double sumScores = 0.0; + for (HistogramFeature masterHisFea : masterSeries) { + double tmpScore = getOverLapRate(testHisFea, masterHisFea); + sumScores += tmpScore; + } + double num = (double) masterSeries.size(); + double normalScore = sumScores / num; + DetecorRes tmpRes = new DetecorRes(testHisFea.getSegID(), normalScore); + detecorRes.add(tmpRes); + } + + return detecorRes; + } + + /** + * @param masterTime 主数据时间段 + * @param testTime 测试数据时间段 + * @return 获得这两个时间段之间的权重 + */ + private double getWeight(String masterTime, String testTime) { + //解析时间,然后计算时间之间的间隔 + int masterUnit = DateUtil.getSpecificTimeUnit(masterTime, timeUnitType); + //只能解析到年月周日时的时间单位,其他的都范围常数函数 + int testUnit = DateUtil.getSpecificTimeUnit(testTime, timeUnitType); + int min = Math.min(masterUnit, testUnit); + int max = Math.max(masterUnit, testUnit); + double median; + double x; + + //目前的逻辑是不管针对什么时间单位的分段方法,这个分段方法优先级是最高的 + if (weightFunctionType == PIECEWISE) { + x = max - min; + return piecewise(x); + } + switch (timeUnitType) { + // TODO: 目前直接返回,后面会对年的函数的进行扩充 + case YEAR: + x = max - min; + return exponential(x); + case MONTH: + median = 12.0 / 2.0; + x = (double) Math.min((max - min), (12 + min - max)); + break; + case WEEK: + median = 53.0 / 2.0; + x = (double) Math.min((max - min), (53 + min - max)); + break; + case DAY: + median = 366.0 / 2.0; + x = (double) Math.min((max - min), (366 + min - max)); + break; + //其他的这里就不算权重了 + default: + return 1.0; + } + + double res; + switch (weightFunctionType) { + case LINEAR: + res = liner(x, median); + break; + case POLYMIALCONVEX: + res = polynomialConvex(x, median); + break; + case POLYMIALCONCAVE: + res = polynomialConcave(x, median); + break; + case CONSTANT: + res = 1.0; + break; + default: + res = 1.0; + break; + } + + return res; + } + + //按照月类举例 + //线性:y = (min-1)/6*x+1 + private double liner(double x, double median) { + return (weightFuncThres - 1.0) / median * x + 1.0; + } + + //多项式1:y = (1-min)/(6*6)*(x-6)^2+min (凸函数) + private double polynomialConvex(double x, double median) { + return (1.0 - weightFuncThres) / (median * median) * (x - median) * (x - median) + weightFuncThres; + } + + //多项式2:y = (min-1)/(6*6)*(x)^2+1 (凹函数) + private double polynomialConcave(double x, double median) { + return (weightFuncThres - 1.0) / (median * median) * x * x + 1.0; + } + + //逐渐递减,知道趋近于最小值,攻年这个函数使用 + private double exponential(double x) { + return weightFuncThres + (1.0 - weightFuncThres) * Math.exp(-x); + } + + //分段函数,认为同一个时间点则为1.0,否则则为0.0; + private double piecewise(double x) { + if (x < 1e-9) { + return 1.0; + } + return 0.0; + } + + /** + * @param hisFea 过来当前段的前topPercent个点和后面downPercent个点,并重新设定最大最小值 + */ + private void outlierFilter(HistogramFeature hisFea) { + double min = hisFea.getMin(); + double max = hisFea.getMax(); + if (min == max) { + return; + } + int cnt = hisFea.getCnt(); + //前百分几个 + int topNumber = (int) ((double) cnt * topPercent); + //后百分之几个 + int downNumber = (int) ((double) cnt * downPercent); + + int startBuck = hisFea.getStartBuck(); + int endBuck = hisFea.getEndBuck(); + double[] buckVal = hisFea.getBuckVal(); + double inter = (max - min) / (double) hisFea.getBuckNum(); + + int tmp = 0; + for (int i = startBuck; i <= endBuck; i++) { + tmp += buckVal[i]; + //这个桶超过了应该过滤掉的点,就从这个桶开始,且最值和该桶个数进行改变 + if (tmp > topNumber) { + min += inter * (double) (i - startBuck); + buckVal[i] -= topNumber - (tmp - buckVal[i]); + startBuck = i; + break; + } + } + + + tmp = 0; + for (int i = endBuck; i >= startBuck; i--) { + tmp += buckVal[i]; + if (tmp > downNumber) { + max -= inter * (double) (endBuck - i); + buckVal[i] -= downNumber - (tmp - buckVal[i]); + endBuck = i; + break; + } + } + //最大值,最小值,点数,桶的起始 + hisFea.setMin(min); + hisFea.setMax(max); + hisFea.setCnt(cnt - topNumber - downNumber); + hisFea.setStartBuck(startBuck); + hisFea.setEndBuck(endBuck); + } + + /** + * @param masterFea 主数据中的某一段 + * @param testFea 测试数据中的某一段 + * @return 返回相似度 + */ + private double getOverLapRate(HistogramFeature masterFea, HistogramFeature testFea) { + double masterMin = masterFea.getMin(); + double masterMax = masterFea.getMax(); + double testMin = testFea.getMin(); + double testMax = testFea.getMax(); + if (testMin > masterMax || testMax < masterMin) { + return 0.0; + } + + String masterTime = masterFea.getTransTime(); + String testTime = testFea.getTransTime(); + double alpha = getWeight(masterTime, testTime); + if (testMin >= masterMin && testMax <= masterMax) { + return alpha * 1.0; + } + + int testBuckStart = testFea.getStartBuck(); + int testBuckEnd = testFea.getEndBuck(); + double inter = (testMax - testMin) / (double) buckNum; + + if (testMax >= masterMax) { + double tmpScore = 0.0; + for (int i = testBuckStart; i <= testBuckEnd; i++) { + double tmpVal = (i + 1) * inter + testMin; + tmpScore += testFea.getBuckVal()[i]; + if (tmpVal > masterMax) { + //这边近似认为在边界的地方,有一半是重叠的 + tmpScore = tmpScore - testFea.getBuckVal()[i] / 2.0; + break; + } + } + tmpScore = tmpScore/(double) testFea.getCnt(); + return alpha * tmpScore; + } + + if (testMax < masterMax) { + double tmpScore = 0.0; + for (int i = testBuckEnd; i >= testBuckStart; i--) { + double tmpVal = (i + 1) * inter + testMin; + tmpScore += testFea.getBuckVal()[i]; + if (tmpVal < masterMax) { + tmpScore = tmpScore - testFea.getBuckVal()[i] / 2.0; + break; + } + } + tmpScore = tmpScore/(double) testFea.getCnt(); + return alpha * tmpScore; + } + + if (testMin <= masterMin && testMax >= masterMax) { + double tmpScore = testFea.getCnt(); + for(int i=testBuckStart;imasterMin&&preValmasterMax){ + tmpScore-=testFea.getBuckVal()[i]/2.0; + }else if(preVal>masterMax){ + tmpScore-=testFea.getBuckVal()[i]; + } + } + return alpha*tmpScore/(double)testFea.getCnt(); + } + + //上面5种情况考虑了所有,不会再此处返回值 + return -1.0; + + } + + + private StructType buildResultSchema() { + List structFieldList = new ArrayList<>(); + for (String attr : keyAttrs) { + structFieldList.add(DataTypes.createStructField(attr, DataTypes.StringType, true)); + } + structFieldList.add( + DataTypes.createStructField(DetectorConstants.FIELD_NAME, DataTypes.StringType, true)); + structFieldList.add( + DataTypes.createStructField(DetectorConstants.SEGMENT_ID, DataTypes.IntegerType, false)); + structFieldList + .add(DataTypes + .createStructField(DetectorConstants.ANOMALY_FACTOR, DataTypes.DoubleType, false)); + return DataTypes.createStructType(structFieldList); + } +} diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/record/DetecorRes.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/record/DetecorRes.java new file mode 100644 index 0000000000000000000000000000000000000000..23bde25baa8f995021ddc9a4cc003eed2eb3d77a --- /dev/null +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-detection/src/main/java/com/k2data/governance/matching/detecting/record/DetecorRes.java @@ -0,0 +1,35 @@ +package com.k2data.governance.matching.detecting.record; + +/** + * Created by gaoyu on 2018/2/4 11:07. + * E-mail address is nkugaoyu@163.com. + * Copyright 2018 gaoyu. All Rights Reserved. + * + * @author gaoyu + */ +public class DetecorRes { + private int nodeID; + private double normalScore; + + public DetecorRes(int nodeID, double outlierScore) { + this.nodeID = nodeID; + this.normalScore = outlierScore; + } + + public double getNormalScore() { + return normalScore; + } + + public int getNodeID() { + return nodeID; + } + + public void setNormalScore(double normalScore) { + this.normalScore = normalScore; + } + + public void setNodeID(int nodeID) { + this.nodeID = nodeID; + } + +} diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/README.md b/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/README.md index 8e6c989066078afc6e35a31006f7fc39fbed6288..79d444f29b46816816b3b7574418ba483ac03a89 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/README.md +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/README.md @@ -11,7 +11,7 @@ ## 使用说明 * spark 版本基项目入口函数为 `com.k2data.governance.matching.feature.command.ExtractWorker` -* 运行参数:feature [-msP MASTERSEGMENTPATH\] [-tsP TESTSEGMENTPATH\] [-msO MASTERSEGMENTOUTPUT\] [-tfP TESTFEATUREPATH\] [-tc TIMECOLUMN\] [-k KEYS\] [-a ATTRS\] +* 运行参数:feature [-msP MASTERSEGMENTPATH\] [-tsP TESTSEGMENTPATH\] [-msO MASTERSEGMENTOUTPUT\] [-tfP TESTFEATUREPATH\] [-tc TIMECOLUMN\] [-k KEYS\] [-a ATTRS\] [-ft FEATURE_TYPE\] [-bn BUCK_NUM\] [-l\] * MASTERSEGEMENTPATH:主数据分段输入路径 * TESTSEGEMENTPATH:待测数据分段输入路径 * MASTERFEATUREOUTPUT:主数据特征输出路径 @@ -19,6 +19,9 @@ * TIMECOLUMN:时间列 * KEYS:主键 * ATTRS:待统计属性 + * FEATURE_TYPE: 想要得到的特征情况{normal, histogram} normal即最大值最小值,均值那些,histogram为直方统计图,每个桶代表该范围有几个点 + * BUCK_NUM 为histogram方法需要的参数,即分为多少个桶 + * -l:以 Spark Local 模式运行,否则去掉该参数即可 ## 结果输出格式 diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/command/ExtractProcess.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/command/ExtractProcess.java index ce9694a5e9e24d77e7f06b3a1e0155546f15dd14..919ce6af400a473c263329b10bb5839b355c3c60 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/command/ExtractProcess.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/command/ExtractProcess.java @@ -2,7 +2,9 @@ package com.k2data.governance.matching.feature.command; import com.k2data.governance.matching.command.IWorkProcess; import com.k2data.governance.matching.common.constant.ArgumentsConstants; -import com.k2data.governance.matching.feature.extractor.Extractor; +import com.k2data.governance.matching.common.type.FeatureType; +import com.k2data.governance.matching.feature.extractor.HistogramExtracor; +import com.k2data.governance.matching.feature.extractor.NormalExtractor; import com.k2data.governance.matching.feature.extractor.IExtractor; import com.k2data.governance.matching.util.DataFrameUtil; import com.k2data.governance.matching.util.InitUtil; @@ -53,9 +55,25 @@ public class ExtractProcess implements IWorkProcess { Dataset masterSegmentTable = DataFrameUtil.readCsvData(masterSegmentPath, sparkSession); Dataset testSegmentTable = DataFrameUtil.readCsvData(testSegmentPath, sparkSession); - // TODO: how to make sure the segment read in are in right order? - IExtractor extractor = new Extractor(sparkSession, keys.toArray(new String[keys.size()]), - timeColumn, featureMinSize); + FeatureType featureType = FeatureType.fromString(ns.getString(ArgumentsConstants.FEATURE_TYPE)); + IExtractor extractor; + + switch (featureType) { + case NORMAL: + extractor = new NormalExtractor(sparkSession, keys.toArray(new String[keys.size()]), + timeColumn, featureMinSize); + break; + case HISTOGRAM: + int buckNum = ns.getInt(ArgumentsConstants.BUCK_NUM); + extractor = new HistogramExtracor(sparkSession, keys.toArray(new String[keys.size()]), + timeColumn, featureMinSize,buckNum); + break; + default: + extractor = new NormalExtractor(sparkSession, keys.toArray(new String[keys.size()]), + timeColumn, featureMinSize); + break; + } + Dataset masterFeatureTable = extractor .extract(masterTable, masterSegmentTable, attrs.toArray(new String[attrs.size()]), true); Dataset testFeatureTable = extractor diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/extractor/AbstractExtractMethod.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/extractor/AbstractExtractMethod.java new file mode 100644 index 0000000000000000000000000000000000000000..ceff1310704738d312a5b33e4cf0218895f8bad3 --- /dev/null +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/extractor/AbstractExtractMethod.java @@ -0,0 +1,120 @@ +package com.k2data.governance.matching.feature.extractor; + +import com.k2data.governance.matching.common.constant.ExtractorConstants; +import com.k2data.governance.matching.common.constant.LearnerConstants; +import com.k2data.governance.matching.feature.record.Feature; +import com.k2data.governance.matching.record.Record; +import com.k2data.governance.matching.record.Segment; +import com.k2data.governance.matching.util.CommonUtil; +import com.k2data.governance.matching.util.DataFrameUtil; +import com.k2data.governance.matching.util.KeyUtil; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Created by gaoyu on 2018/2/3 19:33. + * E-mail address is nkugaoyu@163.com. + * Copyright 2018 gaoyu. All Rights Reserved. + * + * @author gaoyu + */ +public class AbstractExtractMethod implements IExtractor,Serializable { + public SparkSession sparkSession; + public String[] keyAttrs; + public String timeColumn; + public int featureMinSize; + + public AbstractExtractMethod(SparkSession sparkSession, String[] keyAttrs, String timeColumn, int featureMinSize) { + this.sparkSession = sparkSession; + this.keyAttrs = keyAttrs; + this.timeColumn = timeColumn; + this.featureMinSize = featureMinSize; + } + + @Override + public Dataset extract(Dataset table, Dataset segmentTable, String[] attrs) { + return extract(table, segmentTable, attrs, false); + } + + @Override + public Dataset extract(Dataset table, Dataset segmentTable, String[] attrs, boolean sort) { + JavaPairRDD> seriesRdd = DataFrameUtil.splitSeries(table, keyAttrs, attrs, timeColumn); + JavaPairRDD> segmentRdd = DataFrameUtil.splitSegment(segmentTable, keyAttrs); + return extract(seriesRdd.join(segmentRdd), sort); + } + + @Override + public Dataset extract(JavaPairRDD, Iterable>> seriesSegmentsRdd, final boolean sort) { + JavaRDD rowRdd = seriesSegmentsRdd.flatMap(new FlatMapFunction, Iterable>>, Row>() { + @Override + public Iterator call(Tuple2, Iterable>> keySeriesSegments) throws Exception { + String keyString = keySeriesSegments._1(); + List keyValues = new ArrayList<>(); + List rowList = new ArrayList<>(); + for (String keyAttr : keyAttrs) { + keyValues.add(KeyUtil.getFieldFromConcatString(keyString, keyAttr)); + } + keyValues.add(KeyUtil.getFieldFromConcatString(keyString, LearnerConstants.FIELD_NAME)); + List series = DataFrameUtil.convertSeriesIterable(keySeriesSegments._2()._1(), sort); + List segments = CommonUtil.convertIterable(keySeriesSegments._2()._2()); + + List featureList = extractFeature(series, segments); + for (Feature feature : featureList) { + List newRow = new ArrayList<>(keyValues); + newRow.add(feature.getSegID()); + for (double val : feature.getFeatures()) { + newRow.add(val); + } + rowList.add(RowFactory.create(newRow.toArray())); + } + return rowList.iterator(); + } + }); + return sparkSession.createDataFrame(rowRdd, buildResultSchema()); + } + + @Override + public List extractFeature(List series, List segmentList) { + return null; + } + + public StructType buildResultSchema() { + List structFieldList = new ArrayList<>(); + for (String attr : keyAttrs) { + structFieldList.add(DataTypes.createStructField(attr, DataTypes.StringType, true)); + } + structFieldList.add( + DataTypes.createStructField(ExtractorConstants.FIELD_NAME, DataTypes.StringType, true)); + structFieldList.add( + DataTypes.createStructField(ExtractorConstants.SEGMENT_ID, DataTypes.IntegerType, false)); + structFieldList + .add(DataTypes.createStructField(ExtractorConstants.MIN, DataTypes.DoubleType, false)); + structFieldList + .add(DataTypes.createStructField(ExtractorConstants.MAX, DataTypes.DoubleType, false)); + structFieldList + .add(DataTypes.createStructField(ExtractorConstants.AVG, DataTypes.DoubleType, false)); + structFieldList + .add(DataTypes.createStructField(ExtractorConstants.SIGMA, DataTypes.DoubleType, false)); + structFieldList + .add(DataTypes.createStructField(ExtractorConstants.SKEWNESS, DataTypes.DoubleType, false)); + structFieldList + .add(DataTypes.createStructField(ExtractorConstants.KURTOSIS, DataTypes.DoubleType, false)); + structFieldList + .add(DataTypes.createStructField(ExtractorConstants.COUNT, DataTypes.DoubleType, false)); + return DataTypes.createStructType(structFieldList); + } +} diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/extractor/HistogramExtracor.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/extractor/HistogramExtracor.java new file mode 100644 index 0000000000000000000000000000000000000000..525bdf72f7ad003ed07b67c466697912a33e6a3c --- /dev/null +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/extractor/HistogramExtracor.java @@ -0,0 +1,199 @@ +package com.k2data.governance.matching.feature.extractor; + +import com.k2data.governance.matching.common.constant.ExtractorConstants; +import com.k2data.governance.matching.common.constant.LearnerConstants; +import com.k2data.governance.matching.feature.record.Feature; +import com.k2data.governance.matching.record.HistogramFeature; +import com.k2data.governance.matching.record.Record; +import com.k2data.governance.matching.record.Segment; +import com.k2data.governance.matching.util.CommonUtil; +import com.k2data.governance.matching.util.DataFrameUtil; +import com.k2data.governance.matching.util.KeyUtil; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import scala.Tuple2; + +import java.util.*; + +/** + * Created by gaoyu on 2018/2/3 13:44. + * E-mail address is nkugaoyu@163.com. + * Copyright 2018 gaoyu. All Rights Reserved. + * + * @author gaoyu + */ +public class HistogramExtracor extends AbstractExtractMethod { + private int buckNum; + + public HistogramExtracor(SparkSession sparkSession, String[] keyAttrs, String timeColumn, int featureMinSize, int buckNum) { + super(sparkSession, keyAttrs, timeColumn, featureMinSize); + this.buckNum = buckNum; + } + + public HistogramExtracor(SparkSession sparkSession, String[] keyAttrs, String timeColumn, int featureMinSize) { + super(sparkSession, keyAttrs, timeColumn, featureMinSize); + this.buckNum = 100; + } + + @Override + public Dataset extract(JavaPairRDD, Iterable>> seriesSegmentsRdd, final boolean sort) { + JavaRDD rowRdd = seriesSegmentsRdd.flatMap(new FlatMapFunction, Iterable>>, Row>() { + @Override + public Iterator call(Tuple2, Iterable>> keySeriesSegments) throws Exception { + String keyString = keySeriesSegments._1(); + List keyValues = new ArrayList<>(); + List rowList = new ArrayList<>(); + for (String keyAttr : keyAttrs) { + keyValues.add(KeyUtil.getFieldFromConcatString(keyString, keyAttr)); + } + keyValues.add(KeyUtil.getFieldFromConcatString(keyString, LearnerConstants.FIELD_NAME)); + List series = DataFrameUtil.convertSeriesIterable(keySeriesSegments._2()._1(), sort); + List segments = CommonUtil.convertIterable(keySeriesSegments._2()._2()); + + List featureList = extractHisFeature(series, segments); + for (HistogramFeature feature : featureList) { + List newRow = new ArrayList<>(keyValues); + newRow.add(feature.getSegID()); + for (double val : feature.getBuckVal()) { + newRow.add(val); + } + newRow.add(feature.getMin()); + newRow.add(feature.getMax()); + newRow.add(feature.getCnt()); + newRow.add(feature.getTransTime()); + rowList.add(RowFactory.create(newRow.toArray())); + } + return rowList.iterator(); + } + }); + return sparkSession.createDataFrame(rowRdd, buildResultSchema()); + } + + @Override + public List extractFeature(List series, List segmentList) { + return null; + } + + private List extractHisFeature(List series, List segmentList) { + List featureList = new ArrayList<>(); + // sort segmentList by SegID + Collections.sort(segmentList, new Comparator() { + @Override + public int compare(Segment s1, Segment s2) { + if (s1.getSegID() > s2.getSegID()) { + return 1; + } else if (s1.getSegID() < s2.getSegID()) { + return -1; + } else { + return 0; + } + } + }); + + int segIndex = 0; + long endTime = segmentList.get(segIndex).getEndTimestamp(); + int segID = segmentList.get(segIndex).getSegID(); + int cnt = 0; + double curMin = Double.POSITIVE_INFINITY; + double curMax = Double.NEGATIVE_INFINITY; + + long beginTimestamp = 0; + //这一遍算出最大值和最小值 + for (Record record : series) { + if (record.getTimestamp() > endTime) { + beginTimestamp = segmentList.get(segIndex).getBeginTimestamp(); + String transTime = String.valueOf(beginTimestamp); + featureList.add(new HistogramFeature(segID,curMin,curMax,cnt,transTime,buckNum)); + + curMin = Double.POSITIVE_INFINITY; + curMax = Double.NEGATIVE_INFINITY; + cnt=0; + segIndex++; + endTime = segmentList.get(segIndex).getEndTimestamp(); + segID = segmentList.get(segIndex).getSegID(); + } + double value = record.getValue(); + cnt++; + + if(value>curMax){ + curMax = value; + } + if(value endTime) { + segIndex++; + tmpFeature = featureList.get(segIndex); + endTime = segmentList.get(segIndex).getEndTimestamp(); + } + + double value = record.getValue(); + int index = getBuckIndex(tmpFeature,value); + featureList.get(segIndex).getBuckVal()[index]++; + + } + + return featureList; + } + + private int getBuckIndex(HistogramFeature tmpFeature,double value){ + double min = tmpFeature.getMin(); + double max = tmpFeature.getMax(); + if(min==max){ + return 0; + } + int buckNum = tmpFeature.getBuckNum(); + if(value==max){ + return buckNum-1; + } + double inter = (max-min)/(double) buckNum; + return (int) ((value-min)/inter); + } + + public StructType buildResultSchema() { + List structFieldList = new ArrayList<>(); + for (String attr : keyAttrs) { + structFieldList.add(DataTypes.createStructField(attr, DataTypes.StringType, true)); + } + structFieldList.add( + DataTypes.createStructField(ExtractorConstants.FIELD_NAME, DataTypes.StringType, true)); + structFieldList.add( + DataTypes.createStructField(ExtractorConstants.SEGMENT_ID, DataTypes.IntegerType, false)); + for(int i=1;i<=buckNum;i++){ + structFieldList + .add(DataTypes.createStructField(ExtractorConstants.BUCKINDEX+String.valueOf(i), DataTypes.DoubleType, false)); + } + + structFieldList + .add(DataTypes.createStructField(ExtractorConstants.MIN, DataTypes.DoubleType, false)); + structFieldList + .add(DataTypes.createStructField(ExtractorConstants.MAX, DataTypes.DoubleType, false)); + structFieldList + .add(DataTypes.createStructField(ExtractorConstants.COUNT, DataTypes.IntegerType, false)); + structFieldList + .add(DataTypes.createStructField(ExtractorConstants.TIME, DataTypes.StringType, false)); + + + return DataTypes.createStructType(structFieldList); + } + +} diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/extractor/Extractor.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/extractor/NormalExtractor.java similarity index 79% rename from k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/extractor/Extractor.java rename to k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/extractor/NormalExtractor.java index 6359082e28bbfc7c2c9275c1da77790364038f13..4810cad1e52f6d2a3962ac84f3242df6301f9b13 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/extractor/Extractor.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-feature/src/main/java/com/k2data/governance/matching/feature/extractor/NormalExtractor.java @@ -30,19 +30,10 @@ import java.util.*; * * @author stoke */ -public class Extractor implements IExtractor, Serializable { - - private SparkSession sparkSession; - private String[] keyAttrs; - private String timeColumn; - private int featureMinSize; - - public Extractor(SparkSession sparkSession, String[] keyAttrs, - String timeColumn, int extractMinSize) { - this.sparkSession = sparkSession; - this.keyAttrs = keyAttrs; - this.timeColumn = timeColumn; - this.featureMinSize = extractMinSize; +public class NormalExtractor extends AbstractExtractMethod { + + public NormalExtractor(SparkSession sparkSession, String[] keyAttrs, String timeColumn, int featureMinSize) { + super(sparkSession, keyAttrs, timeColumn, featureMinSize); } @Override @@ -217,29 +208,4 @@ public class Extractor implements IExtractor, Serializable { return metrics; } - private StructType buildResultSchema() { - List structFieldList = new ArrayList<>(); - for (String attr : keyAttrs) { - structFieldList.add(DataTypes.createStructField(attr, DataTypes.StringType, true)); - } - structFieldList.add( - DataTypes.createStructField(ExtractorConstants.FIELD_NAME, DataTypes.StringType, true)); - structFieldList.add( - DataTypes.createStructField(ExtractorConstants.SEGMENT_ID, DataTypes.IntegerType, false)); - structFieldList - .add(DataTypes.createStructField(ExtractorConstants.MIN, DataTypes.DoubleType, false)); - structFieldList - .add(DataTypes.createStructField(ExtractorConstants.MAX, DataTypes.DoubleType, false)); - structFieldList - .add(DataTypes.createStructField(ExtractorConstants.AVG, DataTypes.DoubleType, false)); - structFieldList - .add(DataTypes.createStructField(ExtractorConstants.SIGMA, DataTypes.DoubleType, false)); - structFieldList - .add(DataTypes.createStructField(ExtractorConstants.SKEWNESS, DataTypes.DoubleType, false)); - structFieldList - .add(DataTypes.createStructField(ExtractorConstants.KURTOSIS, DataTypes.DoubleType, false)); - structFieldList - .add(DataTypes.createStructField(ExtractorConstants.COUNT, DataTypes.DoubleType, false)); - return DataTypes.createStructType(structFieldList); - } } diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-param/README.md b/k2de-governance-schema-matching/k2de-governance-schema-matching-param/README.md index aee1eb3e4fa24fc97544f94fe50f09fa07b9bbef..fbf7e0fe9b40936b7e6812af26e9e5b3044013b4 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-param/README.md +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-param/README.md @@ -12,12 +12,12 @@ ## 使用说明 * spark 版本基项目入口函数为 `com.k2data.governance.matching.param.command.LearnWorker` * 运行参数: - -* learn [-mp MASTERPATH\] [-spO SEGMENTPARAMETEROUTPUT\] [-k KEYS\] [-a ATTRS\] +* learn [-mp MASTERPATH\] [-spO SEGMENTPARAMETEROUTPUT\] [-k KEYS\] [-a ATTRS\] [-l\] * MASTERPATH:主数据输入路径 * SEGMENTPARAMETEROUTPUT:分段参数输出路径 * KEYS:主键 * ATTRS:待统计属性 + * -l:以 Spark Local 模式运行,否则去掉该参数即可 ## 结果输出格式 diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/README.md b/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/README.md index 9d887eef4b7a328211056191fd2627e819d907c0..b686fac64c8ca19f03d7638b272ef5dd56a56970 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/README.md +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/README.md @@ -12,7 +12,7 @@ ## 使用说明 * spark 版本基项目入口函数为 `com.k2data.governance.matching.segment.command.SegmentWorker` -* 运行参数:segment [-mp MASTERPATH\] [-tp TESTPATH\] [-msO MASTERSEGEMENTOUTPU\] [-tsO TESTSEGEMENTOUTPUT\] [-sgm SEG_METHOD\] [-sgu TIME_UNIT] [-spP SEGMENTPARAMETERPATH\] +* 运行参数:segment [-mp MASTERPATH\] [-tp TESTPATH\] [-msO MASTERSEGEMENTOUTPU\] [-tsO TESTSEGEMENTOUTPUT\] [-sgm SEG_METHOD\] [-sgu TIME_UNIT][-spP SEGMENTPARAMETERPATH\] [-l\] * MASTERPATH:主数据输入路径 * TESTPATH:待测数据输入路径 @@ -21,6 +21,7 @@ * SEG_METHOD:分段方法,基于速度和基于时间的分段方法 {speed,unit} * TIME_UNIT: 基于时间的分段方法的分段时间单位 {year,month,day,hour,minute,second} * SEGMENTPARAMETERPATH:基于速度的分段参数输入路径 + * -l:以 Spark Local 模式运行,否则去掉该参数即可 ## 结果输出格式 diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/command/SegmentProcess.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/command/SegmentProcess.java index de404d89b96da786feb5d87c0de22110f5c9a087..8b3a9db50e850f90a8b7f88bf12b480cc3fa2bbd 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/command/SegmentProcess.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/command/SegmentProcess.java @@ -2,8 +2,8 @@ package com.k2data.governance.matching.segment.command; import com.k2data.governance.matching.command.IWorkProcess; import com.k2data.governance.matching.common.constant.ArgumentsConstants; -import com.k2data.governance.matching.segment.common.type.SegMethodType; -import com.k2data.governance.matching.segment.common.type.TimeUnitType; +import com.k2data.governance.matching.common.type.SegMethodType; +import com.k2data.governance.matching.common.type.TimeUnitType; import com.k2data.governance.matching.segment.segmenter.ISegmenter; import com.k2data.governance.matching.segment.segmenter.SpeedThresholdSegmenter; import com.k2data.governance.matching.segment.segmenter.TimeUnitSegmenter; diff --git a/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/segmenter/TimeUnitSegmenter.java b/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/segmenter/TimeUnitSegmenter.java index 1ae839a6ef082c6484752f242f4675ee2d177d68..b5f5a330d975f25cbb8d3d58c453354d76eeddbb 100644 --- a/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/segmenter/TimeUnitSegmenter.java +++ b/k2de-governance-schema-matching/k2de-governance-schema-matching-segment/src/main/java/com/k2data/governance/matching/segment/segmenter/TimeUnitSegmenter.java @@ -1,13 +1,17 @@ package com.k2data.governance.matching.segment.segmenter; -import com.k2data.governance.matching.segment.common.type.TimeUnitType; +import com.k2data.governance.matching.common.type.TimeUnitType; import com.k2data.governance.matching.record.Record; import com.k2data.governance.matching.record.Segment; import com.k2data.governance.matching.util.DateUtil; import org.apache.spark.sql.SparkSession; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; import java.util.List; /** @@ -68,6 +72,12 @@ public class TimeUnitSegmenter extends AbstractSegMethod { indexEnd = 7; break; } + //按周的话比较特殊,用其他方式来处理 + case WEEK:{ + String curWeek = getByWeek(dateStr); + indexEnd = 4; + return dateStr.substring(0, indexEnd)+curWeek; + } case DAY: { indexEnd = 11; break; @@ -89,4 +99,21 @@ public class TimeUnitSegmenter extends AbstractSegMethod { } return dateStr.substring(0, indexEnd); } + + private String getByWeek(String dateStr){ + String transFormat = "yyyy-MM-dd HH:mm:ss"; + SimpleDateFormat dateFormat = new SimpleDateFormat(transFormat); + String transTime = dateFormat.format(dateStr); + Date date = null; + try { + date = dateFormat.parse(transTime); + } catch (ParseException e) { + e.printStackTrace(); + } + Calendar calendar = Calendar.getInstance(); + calendar.setTime(date); + calendar.setFirstDayOfWeek(2); + int curWeek = calendar.get(Calendar.WEEK_OF_YEAR); + return String.valueOf(curWeek); + } }