diff --git a/.gitignore b/.gitignore
index bf33abfdbce7ddc6fa97e7209845fee559cb3de0..13042f9cefeacc6fafc3bba7d61d3542c80aaa74 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,3 +25,5 @@
*.csv
*META-INF/
+
+**/LocalTest.java
diff --git a/k2de-governance-data-extractor/pom.xml b/k2de-governance-data-extractor/pom.xml
index d8055df60d7144c8ebda2dd244a4eaeb45e6118a..c262e4c01c48b6da947cc2a5c6b9a8fee4775ede 100644
--- a/k2de-governance-data-extractor/pom.xml
+++ b/k2de-governance-data-extractor/pom.xml
@@ -9,14 +9,14 @@
k2de-governance-data-extractor
- 0.0.1-SNAPSHOT
+ 1.0-SNAPSHOT
jar
- DataExtractor
+ k2de-governance-data-extractor
UTF-8
- 1.8
+ 1.7
2.6.0
@@ -44,8 +44,6 @@
com.sun
tools
- 1.4.2
- system
${java.home}/../lib/tools.jar
diff --git a/k2de-governance-data-extractor/src/main/java/com/k2data/governance/extractor/metadata/GWSqliteMetaDatabase.java b/k2de-governance-data-extractor/src/main/java/com/k2data/governance/extractor/metadata/GWSqliteMetaDatabase.java
index 2b880d38ef50415fa9519abbd96ad459ccaea97f..bb7f275811b5fc58101be25686b92be4acbfefe6 100644
--- a/k2de-governance-data-extractor/src/main/java/com/k2data/governance/extractor/metadata/GWSqliteMetaDatabase.java
+++ b/k2de-governance-data-extractor/src/main/java/com/k2data/governance/extractor/metadata/GWSqliteMetaDatabase.java
@@ -145,7 +145,7 @@ public class GWSqliteMetaDatabase extends AbstractGWMetaDatabase implements IMet
String fieldGroup = rs.getString(1).replace(" ","").toLowerCase();
String fieldName = rs.getString(2).replace(" ", "").toLowerCase();
if (!fieldGroupFieldsMap.containsKey(fieldGroup)) {
- fieldGroupFieldsMap.put(fieldGroup, new ArrayList<>());
+ fieldGroupFieldsMap.put(fieldGroup, new ArrayList());
}
fieldGroupFieldsMap.get(fieldGroup).add(fieldName);
}
diff --git a/k2de-governance-exception-manager-base/README.md b/k2de-governance-exception-manager-base/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..c64fee5353930b0b0cf27370a6e5e3ec704b4ed3
--- /dev/null
+++ b/k2de-governance-exception-manager-base/README.md
@@ -0,0 +1,34 @@
+# 金风 KMX 异常统计工具
+
+## 环境说明
+需再 JDK1.7 及以上环境下运行
+
+## 使用说明
+* spark 版本基项目入口函数为 `com.k2data.governance.manager.ExceptionManagerSpark.run`
+* 运行参数:
+ usage: ExceptionManager [-h] [-m METADATAFILE] [-t {text,sequence}] [-l]
+ InputPath OutputPath
+ * InputPath:文件输入路径
+ * OutputPath:结果输出路径
+ * -m METADATAFILE:协议点表 SQLite 文件路径
+ * -t:输入文件格式,文本文件格式,或 SequenceFile 格式,默认为 seqence
+ * -l:以 Spark Local 模式运行
+ * -h:运行参数说明
+
+## 功能概述
+* 输入:异常区异常文件(SequenceFile)
+* 输出
+ 1. 异常分类储存(暂未实现,输出格式待明确)
+ 2. 异常类型统计(异常类型,异常计数),输出路径为 `outputPath/TypeCountResult/`
+ 3. 异常类型细节统计(异常类型,协议号,风机号,异常计数),输出路径为 `outputPath/TypeDetailResult/`
+ 4. 异常协议统计(协议号,风机计数,异常计数),输出路径为 `outputPath/ProtocolResult/`
+
+## TODO
+* 测试大数据量的效率表现
+* 数据倾斜优化
+* 单元测试
+* 似乎重构之后效率慢了一些,需后续检查测试
+* 执行任务可配置
+* 文档及报告
+
+
diff --git a/k2de-governance-exception-manager-base/pom.xml b/k2de-governance-exception-manager-base/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..92ed68eec5ddcdbc7a4604c263ea993c0a346511
--- /dev/null
+++ b/k2de-governance-exception-manager-base/pom.xml
@@ -0,0 +1,113 @@
+
+ 4.0.0
+
+
+ com.k2data
+ k2de-governance-parent
+ 1.0-SNAPSHOT
+
+
+ k2de-governance-exception-manager-base
+ 1.0-SNAPSHOT
+ jar
+
+ k2de-governance-exception-manager-base
+
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ javax.servlet
+ *
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ javax.servlet
+ *
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ javax.servlet
+ *
+
+
+
+
+
+ org.xerial
+ sqlite-jdbc
+
+
+
+ org.apache.spark
+ spark-core_${spark.comp.version}
+
+
+
+ net.sourceforge.argparse4j
+ argparse4j
+
+
+
+ org.apache.spark
+ spark-sql_${spark.comp.version}
+
+
+
+ com.sun
+ tools
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.6.1
+
+
+ ${compile.version}
+
+
+
+ maven-assembly-plugin
+ 2.4
+
+
+
+ com.k2data.governance.manager.ExceptionManagerSpark
+
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/ExceptionManagerSpark.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/ExceptionManagerSpark.java
new file mode 100644
index 0000000000000000000000000000000000000000..34a303de0e6b4416c14e976a15e551aba09eb1c4
--- /dev/null
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/ExceptionManagerSpark.java
@@ -0,0 +1,243 @@
+package com.k2data.governance.manager;
+
+import com.k2data.governance.manager.result.IAnalyzeResult;
+import com.k2data.governance.manager.common.constants.ArgumentsConstants;
+import com.k2data.governance.manager.error.IErrorType;
+import com.k2data.governance.manager.filter.IFilter;
+import com.k2data.governance.manager.metadata.IMetaDatabase;
+import com.k2data.governance.manager.util.InitUtil;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.impl.Arguments;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.Tuple2;
+
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Created by wangyihan on 2017/12/6 下午3:29.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public class ExceptionManagerSpark {
+ private Logger logger = Logger.getLogger(ExceptionManagerSpark.class);
+ private final String originDataOutputPath = "OriginalData";
+ private final String typeDetailOutputPath = "TypeDetailResult";
+ private final String typeCountOutputPath = "TypeCountResult";
+ private final String protocolCountOutputPath = "ProtocolResult";
+ private static IFilter errorTypeFilter;
+ private static IMetaDatabase metadataBase;
+
+ public Logger getLogger() {
+ return logger;
+ }
+
+ public static void setFilter(IFilter filter) {
+ errorTypeFilter = filter;
+ }
+
+ public static void setMetadataBase(IMetaDatabase database) {
+ metadataBase = database;
+ }
+
+ public static Namespace parseArguments(String[] args) {
+ ArgumentParser argumentParser = ArgumentParsers.newArgumentParser("ExceptionManager")
+ .defaultHelp(true)
+ .description("k2data exception data statistics tool");
+ argumentParser.addArgument(ArgumentsConstants.INPUT_PATH)
+ .type(String.class)
+ .metavar("InputPath")
+ .help("Input data path, HDFS or local path");
+ argumentParser.addArgument(ArgumentsConstants.OUTPUT_PATH)
+ .type(String.class)
+ .metavar("OutputPath")
+ .help("Output data path, HDFS of local path");
+ argumentParser.addArgument("-m", "--metadata-file")
+ .dest(ArgumentsConstants.METADATA_FILE)
+ .help("Metadata SQLite file");
+ argumentParser.addArgument("-t", "--file-type")
+ .dest(ArgumentsConstants.FILE_TYPE)
+ .choices(ArgumentsConstants.FILE_TYPE_TEXT, ArgumentsConstants.FILE_TYPE_SEQUENCE)
+ .setDefault(ArgumentsConstants.FILE_TYPE_SEQUENCE)
+ .help("Input file format (text, sequence)");
+ argumentParser.addArgument("-l", "--local")
+ .dest(ArgumentsConstants.LOCAL)
+ .action(Arguments.storeConst())
+ .setConst(true)
+ .setDefault(false)
+ .help("Run spark with local mode");
+ Namespace ns = null;
+ try {
+ ns = argumentParser.parseArgs(args);
+ } catch (ArgumentParserException e) {
+ argumentParser.handleError(e);
+ System.exit(1);
+ }
+ return ns;
+ }
+
+ public static void run(String[] args) {
+ Namespace ns = parseArguments(args);
+ run(ns);
+ }
+
+ public static void run(Namespace ns) {
+ ExceptionManagerSpark app = new ExceptionManagerSpark();
+ Logger logger = app.getLogger();
+
+ String inputFilePath = ns.getString(ArgumentsConstants.INPUT_PATH);
+ String outputFilePath = ns.getString(ArgumentsConstants.OUTPUT_PATH);
+ String sourceFileType = ns.getString(ArgumentsConstants.FILE_TYPE);
+ boolean local = ns.getBoolean(ArgumentsConstants.LOCAL);
+
+ // Initial SparkSession & SparkContext
+ SparkSession session = InitUtil.initSparkContext(local);
+ JavaSparkContext sc = new JavaSparkContext(session.sparkContext());
+ JavaRDD dataRdd = null;
+ // Read data
+ switch (sourceFileType) {
+ case ArgumentsConstants.FILE_TYPE_SEQUENCE: // convert SequenceFile to JavaRDD
+ dataRdd = sc.sequenceFile(inputFilePath, Text.class, Text.class)
+ .map(new Function, String>() {
+ @Override
+ public String call(Tuple2 textTextTuple2) throws Exception {
+ return textTextTuple2._2().toString();
+ }
+ });
+ break;
+ case ArgumentsConstants.FILE_TYPE_TEXT:
+ dataRdd = sc.textFile(inputFilePath);
+ break;
+ default:
+ logger.error("Invalid file type argument!");
+ System.exit(-1);
+ }
+
+ final Broadcast broadcastFilter = sc.broadcast(errorTypeFilter);
+ final Broadcast broadcastMetadataBase = sc.broadcast(metadataBase);
+ // analyze exceptions
+ assert dataRdd != null;
+ JavaPairRDD exceptionRdd = dataRdd.mapToPair(new PairFunction() {
+ IFilter filter = broadcastFilter.value();
+ IMetaDatabase metaDatabase = broadcastMetadataBase.value();
+
+ @Override
+ public Tuple2 call(String message) throws Exception {
+ Class errorTypeClass = filter.getErrorClass(message);
+ // TODO: handle null pointer or invalid errorTypeClass
+ IErrorType errorType = errorTypeClass.getEnumConstants()[0];
+ IAnalyzeResult result = errorType.analyzeRecord(message, metaDatabase);
+ return new Tuple2<>(result, message);
+ }
+ }).cache();
+
+ // Save by type
+ String originOutputPath = Paths.get(outputFilePath, app.originDataOutputPath).toString();
+ JavaRDD rowRdd = exceptionRdd.map(new Function, Row>() {
+ @Override
+ public Row call(Tuple2 pair) throws Exception {
+ return RowFactory.create(pair._1().getErrorType().toString(), pair._2());
+ }
+ });
+ StructType schema = DataTypes.createStructType(new StructField[]{
+ DataTypes.createStructField("ErrorType", DataTypes.StringType, false),
+ DataTypes.createStructField("Data", DataTypes.StringType, false)
+ });
+ Dataset df = session.createDataFrame(rowRdd, schema);
+ df.write().partitionBy("ErrorType").text(originOutputPath);
+
+ JavaRDD exceptionRddWithoutData = exceptionRdd.map(new Function, IAnalyzeResult>() {
+ @Override
+ public IAnalyzeResult call(Tuple2 tuple) throws Exception {
+ return tuple._1();
+ }
+ }).cache();
+
+ exceptionRdd.unpersist();
+
+ // Count exception type
+ JavaPairRDD typeDetailRdd = exceptionRddWithoutData
+ .mapToPair(new PairFunction() {
+ @Override
+ public Tuple2 call(IAnalyzeResult result) throws Exception {
+ return new Tuple2<>(result, 1);
+ }
+ })
+ .reduceByKey(new Function2() {
+ @Override
+ public Integer call(Integer value1, Integer value2) throws Exception {
+ return value1 + value2;
+ }
+ });
+ String typeDetailOutputPath = Paths.get(outputFilePath, app.typeDetailOutputPath).toString();
+ typeDetailRdd.saveAsTextFile(typeDetailOutputPath);
+
+ // Aggregate count by type
+ JavaPairRDD typeTotalRdd = exceptionRddWithoutData
+ .mapToPair(new PairFunction() {
+ @Override
+ public Tuple2 call(IAnalyzeResult result) throws Exception {
+ return new Tuple2<>(result.getErrorType(), 1);
+ }
+ })
+ .reduceByKey(new Function2() {
+ @Override
+ public Integer call(Integer count1, Integer count2) throws Exception {
+ return count1 + count2;
+ }
+ });
+ String typeOutputPath = Paths.get(outputFilePath, app.typeCountOutputPath).toString();
+ typeTotalRdd.saveAsTextFile(typeOutputPath);
+
+ typeDetailRdd.unpersist();
+
+ // Count by protocol
+ JavaPairRDD protocolRdd = exceptionRddWithoutData
+ .groupBy(new Function() {
+ @Override
+ public String call(IAnalyzeResult gwAnalyzeResult) throws Exception {
+ return gwAnalyzeResult.getFieldGroup();
+ }
+ })
+ .mapToPair(new PairFunction>, String, String>() {
+ @Override
+ public Tuple2 call(Tuple2> pair) throws Exception {
+ String protocolId = pair._1();
+ Iterable results = pair._2();
+ int recordCount = 0;
+ Set wtSet = new HashSet<>();
+ for (IAnalyzeResult result : results) {
+ recordCount += 1;
+ wtSet.add(result.getAssetId());
+ }
+ return new Tuple2<>(protocolId, String.format("%s,%s", wtSet.size(), recordCount));
+ }
+ });
+ String protocolOutputPath = Paths.get(outputFilePath, app.protocolCountOutputPath).toString();
+ protocolRdd.saveAsTextFile(protocolOutputPath);
+
+ exceptionRdd.unpersist();
+ session.stop();
+ }
+}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/Pair.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/Pair.java
similarity index 100%
rename from k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/Pair.java
rename to k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/Pair.java
diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/ArgumentsConstants.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/ArgumentsConstants.java
new file mode 100644
index 0000000000000000000000000000000000000000..91f84ee4a3aa85ae4dd152cad8ba79720210e509
--- /dev/null
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/ArgumentsConstants.java
@@ -0,0 +1,18 @@
+package com.k2data.governance.manager.common.constants;
+
+/**
+ * Created by wangyihan on 2017/12/28 下午10:58.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public class ArgumentsConstants {
+ public final static String INPUT_PATH = "inputPath";
+ public final static String OUTPUT_PATH = "outputPath";
+ public final static String METADATA_FILE = "metadataFile";
+ public final static String FILE_TYPE = "fileType";
+ public final static String LOCAL = "local";
+ public final static String FILE_TYPE_TEXT = "text";
+ public final static String FILE_TYPE_SEQUENCE = "sequence";
+}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/IMessage.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/Constants.java
similarity index 36%
rename from k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/IMessage.java
rename to k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/Constants.java
index e2fad558e84fd6bf9bc168b7b1ddd9d31afb9af5..8cbd88bfe377a3777293ecdfb4ced26fbe01d7fd 100644
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/IMessage.java
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/Constants.java
@@ -1,15 +1,12 @@
-package com.k2data.governance.manager.message;
+package com.k2data.governance.manager.common.constants;
/**
- * Created by stoke on 2017/11/17.
+ * Created by stoke on 2017/11/16.
* E-mail address is zaqthss2009@gmail.com
* Copyright © stoke. All Rights Reserved.
*
* @author stoke
*/
-public interface IMessage {
- void setKey(String key);
- String getKey();
- void setMsg(String msg);
- String getMsg();
+public class Constants {
+ public static String GW_META_SQLITE_FILE_DEFAULT = "/tmp/metadata.db";
}
diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/SparkConstants.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/SparkConstants.java
new file mode 100644
index 0000000000000000000000000000000000000000..d1642b9494490b570e98aca1f0dd2bdfebe05c0d
--- /dev/null
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/SparkConstants.java
@@ -0,0 +1,14 @@
+package com.k2data.governance.manager.common.constants;
+
+/**
+ * Created by wangyihan on 2017/12/6 下午4:04.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public class SparkConstants {
+ public final static String SPARK_LOCAL = "spark.local";
+ public final static String SPARK_MASTER = "spark.master";
+ public final static String SPARK_APP_NAME = "spark.app.name";
+}
diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/error/IErrorType.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/error/IErrorType.java
new file mode 100644
index 0000000000000000000000000000000000000000..c6a8464b179d486a3dccc3d5b39102bf2df1aff7
--- /dev/null
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/error/IErrorType.java
@@ -0,0 +1,21 @@
+package com.k2data.governance.manager.error;
+
+import com.k2data.governance.manager.result.IAnalyzeResult;
+import com.k2data.governance.manager.metadata.IMetaDatabase;
+
+/**
+ * Created by wangyihan on 2017/11/30 下午2:08.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public interface IErrorType {
+ String toErrorString();
+
+ int getValue();
+
+ IAnalyzeResult analyzeRecord(String msg);
+
+ IAnalyzeResult analyzeRecord(String msg, IMetaDatabase metaDatabase);
+}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/filter/IFilter.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/filter/IFilter.java
similarity index 47%
rename from k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/filter/IFilter.java
rename to k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/filter/IFilter.java
index 0cd3d2da6ca5da4522f93a7d9f5f3cb6be61f661..e10fd08b8bf5bd61b34bf59b894a4c357bafe659 100644
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/filter/IFilter.java
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/filter/IFilter.java
@@ -1,8 +1,6 @@
package com.k2data.governance.manager.filter;
-import com.k2data.governance.manager.common.type.GWSourceType;
-import com.k2data.governance.manager.message.IMessage;
-import java.util.List;
+import com.k2data.governance.manager.error.IErrorType;
/**
* Created by stoke on 2017/11/16.
@@ -12,7 +10,5 @@ import java.util.List;
* @author stoke
*/
public interface IFilter {
- void filter();
- List getMessageList();
- List getCertainSourceTypeList(GWSourceType type);
+ Class getErrorClass(String msg);
}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/AbstractGWMetaDatabase.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/AbstractMetaDatabase.java
similarity index 46%
rename from k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/AbstractGWMetaDatabase.java
rename to k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/AbstractMetaDatabase.java
index 8daa730bc1d009f8df035b68c77d74e71fc4e2b5..e6c3a8a311ce4d8a0190befe295022fa66b34ff0 100644
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/AbstractGWMetaDatabase.java
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/AbstractMetaDatabase.java
@@ -2,7 +2,9 @@ package com.k2data.governance.manager.metadata;
import java.util.List;
import java.util.Map;
+
import org.apache.log4j.Logger;
+import scala.Serializable;
/**
* Created by wangyihan on 2017/11/17 上午10:20.
@@ -11,15 +13,13 @@ import org.apache.log4j.Logger;
*
* @author wangyihan
*/
-abstract class AbstractGWMetaDatabase implements IMetaDatabase {
- Logger logger = getLogger();
+abstract class AbstractMetaDatabase implements IMetaDatabase, Serializable {
- Map wtidFieldGroupMap;
+ Map assetFieldGroupMap;
Map> fieldGroupFieldsMap;
- Map wtidWfidMap;
protected Logger getLogger() {
- return Logger.getLogger(AbstractGWMetaDatabase.class);
+ return Logger.getLogger(AbstractMetaDatabase.class);
}
@Override
@@ -36,28 +36,16 @@ abstract class AbstractGWMetaDatabase implements IMetaDatabase {
}
@Override
- public String getFieldGroup(String wtid) {
- if (wtid == null || wtid.isEmpty()) {
- logger.error("Empty wtid: " + wtid);
+ public String getFieldGroup(String assetId) {
+ if (assetId == null || assetId.isEmpty()) {
+ getLogger().error("Empty AssetID: " + assetId);
return null;
}
- String fieldGroup = wtidFieldGroupMap.get(wtid);
-// if (fieldGroup == null) {
-// logger.error("Could not find wtid info from database: " + wtid);
-// }
- return fieldGroup;
+ return assetFieldGroupMap.get(assetId);
}
@Override
public List getFieldsByFieldGroup(String fieldGroup) {
-// if (fieldGroup == null || fieldGroup.isEmpty()) {
-// logger.error("Empty fieldGroup: " + fieldGroup);
-// return null;
-// }
- List fields = fieldGroupFieldsMap.get(fieldGroup);
-// if (fields == null) {
-// logger.error("Could not find fields info from database: " + fieldGroup);
-// }
- return fields;
+ return fieldGroupFieldsMap.get(fieldGroup);
}
}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/IMetaDatabase.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/IMetaDatabase.java
similarity index 99%
rename from k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/IMetaDatabase.java
rename to k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/IMetaDatabase.java
index 67f2983a93dabdca75cf044bd23e1296a55dd5bc..ec2ac0c9beb45d74f44700ee5ce2e9ec62f16298 100644
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/IMetaDatabase.java
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/IMetaDatabase.java
@@ -11,7 +11,10 @@ import java.util.List;
*/
public interface IMetaDatabase {
void init();
+
List getFields(String assetId);
+
String getFieldGroup(String assetId);
+
List getFieldsByFieldGroup(String fieldGroup);
}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/GWSqliteMetaDatabase.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/SqliteMetaDatabase.java
similarity index 57%
rename from k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/GWSqliteMetaDatabase.java
rename to k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/SqliteMetaDatabase.java
index 41fde8b950f911a4783ddd8495bac55a1c1a981b..0db53b96d81ab599d0e2627d5b167347e9919b36 100644
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/GWSqliteMetaDatabase.java
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/SqliteMetaDatabase.java
@@ -1,7 +1,8 @@
package com.k2data.governance.manager.metadata;
-import com.k2data.governance.manager.common.Constants;
+import com.k2data.governance.manager.common.constants.Constants;
import com.k2data.governance.manager.common.Pair;
+
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -12,6 +13,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -25,35 +27,40 @@ import org.apache.log4j.Logger;
*
* @author wangyihan
*/
-public class GWSqliteMetaDatabase extends AbstractGWMetaDatabase implements IMetaDatabase {
+public class SqliteMetaDatabase extends AbstractMetaDatabase {
private static final String SQLITE_JDBC_DRIVER = "org.sqlite.JDBC";
private static final String SQLITE_BASE_URL = "jdbc:sqlite:";
-
private String dbFilePath;
- private Connection conn;
+ private String assetFieldGroupSql;
+ private String fieldGroupFieldsSql;
- public GWSqliteMetaDatabase(String dbFilePath) {
+ public SqliteMetaDatabase(String dbFilePath, String assetFieldGroupSql, String fieldGroupFieldsSql) {
this.dbFilePath = dbFilePath;
+ this.assetFieldGroupSql = assetFieldGroupSql;
+ this.fieldGroupFieldsSql = fieldGroupFieldsSql;
init();
}
@Override
protected Logger getLogger() {
- return Logger.getLogger(GWSqliteMetaDatabase.class);
+ return Logger.getLogger(SqliteMetaDatabase.class);
}
- public String getWfid(String wtid) {
- return wtidWfidMap.get(wtid);
- }
-
- private String getSqliteUrl() {
+ /**
+ * Concat jdbc:sqlite with sqlite file path
+ * If file path is a hdfs address, download the file to local
+ *
+ * @return full url of sqlite database
+ * @throws IOException when initial file system failed
+ */
+ private String getSqliteUrl() throws IOException {
if (!dbFilePath.toLowerCase().startsWith("hdfs")) {
return SQLITE_BASE_URL + dbFilePath;
}
Pattern p = Pattern.compile("^([A-Za-z]+:/+[^/]+).*");
Matcher m = p.matcher(dbFilePath);
if (!m.find()) {
- logger.error("Cannot parse the hdfs path:" + dbFilePath);
+ getLogger().error("Cannot parse the hdfs path:" + dbFilePath);
return SQLITE_BASE_URL + dbFilePath;
}
String hdfs = m.group(1);
@@ -65,127 +72,104 @@ public class GWSqliteMetaDatabase extends AbstractGWMetaDatabase implements IMet
FileSystem fs = FileSystem.get(conf);
fs.copyToLocalFile(false, hdfsPath, localPath);
} catch (IOException e) {
- logger.error("Initial file system failed");
- e.printStackTrace();
+ getLogger().error("Initial file system failed");
+ throw e;
}
return SQLITE_BASE_URL + Constants.GW_META_SQLITE_FILE_DEFAULT;
}
- private void connect() {
- String url = getSqliteUrl();
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException e) {
- logger.error("Close connection failed: " + e.getMessage());
- }
- }
+ private Connection connect() {
+ String url = null;
try {
+ url = getSqliteUrl();
Class.forName(SQLITE_JDBC_DRIVER);
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.exit(-1);
} catch (ClassNotFoundException e) {
- logger.error("Load SQLite driver error: "+e.getMessage());
+ getLogger().error("Load SQLite driver error: " + e.getMessage());
System.exit(-1);
}
try {
- conn = DriverManager.getConnection(url);
+ Connection conn = DriverManager.getConnection(url);
Statement statement = conn.createStatement();
statement.execute("PRAGMA synchronous=OFF");
statement.close();
+ return conn;
} catch (SQLException e) {
- logger.error("Build connection failed: " + e.getMessage());
+ getLogger().error("Build connection failed: " + e.getMessage());
System.exit(-1);
}
+ return null;
}
- private Pair executeSql(String sql) {
+ private Pair executeSql(Connection conn, String sql) {
if (conn == null) {
- connect();
+ conn = connect();
}
Statement stat = null;
ResultSet rs = null;
try {
+ assert conn != null;
stat = conn.createStatement();
rs = stat.executeQuery(sql);
} catch (SQLException e) {
String err = String.format("Execute SQL (%s) failed: %s", sql, e.getMessage());
- logger.error(err);
+ getLogger().error(err);
}
return new Pair<>(rs, stat);
}
- private void buildWtidFieldGroupMap() {
- if (wtidFieldGroupMap == null) {
- wtidFieldGroupMap = new HashMap<>();
+ private void buildAssetFieldGroupMap(Connection conn) {
+ if (assetFieldGroupMap == null) {
+ assetFieldGroupMap = new HashMap<>();
}
- String sql = "SELECT wtid, protocolid FROM wtinfo";
- Pair resultPair = executeSql(sql);
+ Pair resultPair = executeSql(conn, assetFieldGroupSql);
ResultSet rs = resultPair.fst;
try {
while (rs.next()) {
- String wtid = rs.getString(1).replace(" ","").toLowerCase();
- String fieldGroup = rs.getString(2).replace(" ","").toLowerCase();
- wtidFieldGroupMap.put(wtid, fieldGroup);
+ String wtid = rs.getString(1).replace(" ", "").toLowerCase();
+ String fieldGroup = rs.getString(2).replace(" ", "").toLowerCase();
+ assetFieldGroupMap.put(wtid, fieldGroup);
}
rs.close();
resultPair.snd.close();
} catch (SQLException e) {
- logger.error("Read field group result set error: " + e.getMessage());
+ getLogger().error("Read field group result set error: " + e.getMessage());
e.printStackTrace();
}
}
- private void buildFieldGroupFieldsMap() {
+ private void buildFieldGroupFieldsMap(Connection conn) {
if (fieldGroupFieldsMap == null) {
fieldGroupFieldsMap = new HashMap<>();
}
String sql = "SELECT protocolid, iecpath FROM propaths " +
"WHERE iecpath <> 'WTUR.Tm.Rw.Dt' AND transtype='1' AND bsave=1 " +
"ORDER BY protocolid, pathid";
- Pair resultPair = executeSql(sql);
+ Pair resultPair = executeSql(conn, fieldGroupFieldsSql);
ResultSet rs = resultPair.fst;
try {
while (rs.next()) {
- String fieldGroup = rs.getString(1).replace(" ","").toLowerCase();
+ String fieldGroup = rs.getString(1).replace(" ", "").toLowerCase();
String fieldName = rs.getString(2).replace(" ", "").toLowerCase();
if (!fieldGroupFieldsMap.containsKey(fieldGroup)) {
- fieldGroupFieldsMap.put(fieldGroup, new ArrayList<>());
+ fieldGroupFieldsMap.put(fieldGroup, new ArrayList());
}
fieldGroupFieldsMap.get(fieldGroup).add(fieldName);
}
rs.close();
resultPair.snd.close();
} catch (SQLException e) {
- logger.error("Read fields result set error: " + e.getMessage());
- e.printStackTrace();
- }
- }
-
- private void buildWfidMap() {
- if (wtidWfidMap == null) {
- wtidWfidMap = new HashMap<>();
- }
- String sql = "SELECT wtid,wfid FROM wtinfo";
- Pair resultPair = executeSql(sql);
- ResultSet rs = resultPair.fst;
- try {
- while (rs.next()) {
- String wtid = rs.getString(1).replace(" ", "").toLowerCase();
- String wfid = rs.getString(2).replace(" ", "").toLowerCase();
- wtidWfidMap.put(wtid, wfid);
- }
- rs.close();
- resultPair.snd.close();
- } catch (SQLException e) {
- logger.error("Read WFID result set error: " + e.getMessage());
+ getLogger().error("Read fields result set error: " + e.getMessage());
e.printStackTrace();
}
}
@Override
public void init() {
- connect();
- buildWtidFieldGroupMap();
- buildFieldGroupFieldsMap();
- buildWfidMap();
+ Connection conn = connect();
+ buildAssetFieldGroupMap(conn);
+ buildFieldGroupFieldsMap(conn);
}
}
diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/AnalyzeResultImpl.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/AnalyzeResultImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..e6aabf2b2704df5fdfa3d690ef60b21adc061561
--- /dev/null
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/AnalyzeResultImpl.java
@@ -0,0 +1,113 @@
+package com.k2data.governance.manager.result;
+
+import com.k2data.governance.manager.error.IErrorType;
+import scala.Serializable;
+
+/**
+ * Created by wangyihan on 2017/12/19 下午6:32.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public class AnalyzeResultImpl implements IAnalyzeResult, Serializable {
+
+ private IErrorType errorType;
+ private IAsset asset;
+
+ public AnalyzeResultImpl() {
+ }
+
+ public AnalyzeResultImpl(IErrorType errorType) {
+ this(errorType, null);
+ }
+
+ public AnalyzeResultImpl(IErrorType errorType, IAsset asset) {
+ this.errorType = errorType;
+ this.asset = asset;
+ }
+
+ @Override
+ public void setErrorType(IErrorType errorType) {
+ this.errorType = errorType;
+ }
+
+ @Override
+ public void setAsset(IAsset asset) {
+ this.asset = asset;
+ }
+
+ @Override
+ public void setFieldGroup(String fieldGroup) {
+ if (this.asset == null) {
+ this.asset = new AssetImpl();
+ }
+ this.asset.setFieldGroup(fieldGroup);
+ }
+
+ @Override
+ public void setAssetId(String assetId) {
+ if (this.asset == null) {
+ this.asset = new AssetImpl();
+ }
+ this.asset.setAssetId(assetId);
+ }
+
+ @Override
+ public IErrorType getErrorType() {
+ return errorType;
+ }
+
+ @Override
+ public IAsset getAsset() {
+ return asset;
+ }
+
+ @Override
+ public String getFieldGroup() {
+ if (this.asset == null) {
+ return null;
+ }
+ return this.asset.getFieldGroup();
+ }
+
+ @Override
+ public String getAssetId() {
+ if (this.asset == null) {
+ return null;
+ }
+ return this.asset.getAssetId();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s,%s", errorType, asset);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.toString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof AnalyzeResultImpl)) {
+ return false;
+ }
+ AnalyzeResultImpl resultObj = (AnalyzeResultImpl) obj;
+ if (this.asset == null && resultObj.getAsset() != null) {
+ return false;
+ } else if (this.asset != null && !this.asset.equals(resultObj.getAsset())) {
+ return false;
+ } else if (this.errorType != resultObj.getErrorType()) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/AssetImpl.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/AssetImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..60976ed89feac434673ca6532ab66c1cbe2a417d
--- /dev/null
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/AssetImpl.java
@@ -0,0 +1,78 @@
+package com.k2data.governance.manager.result;
+
+import scala.Serializable;
+
+/**
+ * Created by wangyihan on 2017/12/19 下午4:53.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public class AssetImpl implements IAsset, Serializable {
+ private String fieldGroup;
+ private String assetId;
+
+ public AssetImpl() {
+ }
+
+ public AssetImpl(String fieldGroup, String assetId) {
+ this.fieldGroup = fieldGroup;
+ this.assetId = assetId;
+ }
+
+ @Override
+ public String getFieldGroup() {
+ return fieldGroup;
+ }
+
+ @Override
+ public String getAssetId() {
+ return assetId;
+ }
+
+ @Override
+ public void setFieldGroup(String fieldGroup) {
+ this.fieldGroup = fieldGroup;
+ }
+
+ @Override
+ public void setAssetId(String assetId) {
+ this.assetId = assetId;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s,%s", fieldGroup, assetId);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.toString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof AssetImpl)) {
+ return false;
+ }
+ AssetImpl assetObj = (AssetImpl) obj;
+ if (this.assetId != null && !this.assetId.equals(assetObj.getAssetId())) {
+ return false;
+ } else if (this.assetId == null && assetObj.getAssetId() != null) {
+ return false;
+ }
+ if (this.fieldGroup != null && !this.fieldGroup.equals(assetObj.getFieldGroup())) {
+ return false;
+ } else if (this.fieldGroup == null && assetObj.getFieldGroup() != null) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/IAnalyzeResult.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/IAnalyzeResult.java
new file mode 100644
index 0000000000000000000000000000000000000000..d520e69b6a405bf004d4366f79898451e558a8a1
--- /dev/null
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/IAnalyzeResult.java
@@ -0,0 +1,28 @@
+package com.k2data.governance.manager.result;
+
+import com.k2data.governance.manager.error.IErrorType;
+
+/**
+ * Created by wangyihan on 2017/12/19 下午4:39.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public interface IAnalyzeResult {
+ void setErrorType(IErrorType errorType);
+
+ void setAsset(IAsset asset);
+
+ void setFieldGroup(String fieldGroup);
+
+ void setAssetId(String assetId);
+
+ IErrorType getErrorType();
+
+ IAsset getAsset();
+
+ String getFieldGroup();
+
+ String getAssetId();
+}
diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/IAsset.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/IAsset.java
new file mode 100644
index 0000000000000000000000000000000000000000..3473a8fd36ad43e088c5939e3044760cbbf5c9f0
--- /dev/null
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/IAsset.java
@@ -0,0 +1,18 @@
+package com.k2data.governance.manager.result;
+
+/**
+ * Created by wangyihan on 2017/12/19 下午4:44.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public interface IAsset {
+ void setAssetId(String assetId);
+
+ void setFieldGroup(String fieldGroup);
+
+ String getAssetId();
+
+ String getFieldGroup();
+}
diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/ConfigureUtil.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/ConfigureUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..929a03c1b23764999b8def3c595f33dbea876ad1
--- /dev/null
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/ConfigureUtil.java
@@ -0,0 +1,220 @@
+package com.k2data.governance.manager.util;
+
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Created by wangyihan on 2017/12/6 下午3:58.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public class ConfigureUtil {
+ private static Properties properties = new Properties();
+ private static final Logger logger = Logger.getLogger(ConfigureUtil.class);
+
+ static {
+ InputStream in = ConfigureUtil.class.getClassLoader().getResourceAsStream("config.properties");
+ try {
+ properties.load(in);
+ } catch (IOException e) {
+ logger.warn("No User Custom Properties File");
+ }
+ }
+
+ /**
+ * get property value with property key
+ *
+ * @param key property key
+ * @return
+ */
+ private static String getProperty(String key) {
+ try {
+ return properties.getProperty(key);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ }
+
+ /**
+ * get property value with property key
+ *
+ * @param key property key
+ * @param defaultValue
+ * @return
+ */
+ private static String getProperty(String key, String defaultValue) {
+ try {
+ return properties.getProperty(key, defaultValue);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ }
+
+ /**
+ * get string value with property key
+ *
+ * @param key
+ * @return
+ */
+ public static String getString(String key) {
+ return getProperty(key);
+ }
+
+
+ /**
+ * get string value with property key
+ *
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public static String getString(String key, String defaultValue) {
+ return getProperty(key, defaultValue);
+ }
+
+ /**
+ * get integer value with property key
+ *
+ * @param key
+ * @return
+ */
+ public static Integer getInteger(String key) {
+ String value = getProperty(key);
+ try {
+ return Integer.valueOf(value);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ return 0;
+ }
+
+ /**
+ * get integer value with property key
+ *
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public static Integer getInteger(String key, Integer defaultValue) {
+ String value = getProperty(key);
+ try {
+ return (value == null) ? defaultValue : Integer.valueOf(value);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ return 0;
+ }
+
+
+ /**
+ * get boolean value with property key
+ *
+ * @param key
+ * @return
+ */
+ public static Boolean getBoolean(String key) {
+ String value = getProperty(key);
+ try {
+ return Boolean.valueOf(value);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+
+ /**
+ * get boolean value with property key
+ *
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public static Boolean getBoolean(String key, Boolean defaultValue) {
+ String value = getProperty(key);
+ try {
+ return (value == null) ? defaultValue : Boolean.valueOf(value);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ /**
+ * get long value with property key
+ *
+ * @param key
+ * @return
+ */
+ public static Long getLong(String key) {
+ String value = getProperty(key);
+ try {
+ return Long.valueOf(value);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return 0L;
+ }
+
+ /**
+ * get long value with property key
+ *
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public static Long getLong(String key, Long defaultValue) {
+ String value = getProperty(key);
+ try {
+ return (value == null) ? defaultValue : Long.valueOf(value);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return 0L;
+ }
+
+ /**
+ * get double value with property key
+ *
+ * @param key
+ * @return
+ */
+ public static Double getDouble(String key) {
+ String value = getProperty(key);
+ try {
+ return Double.valueOf(value);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return 0.0D;
+ }
+
+ /**
+ * get double value with property key
+ *
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public static Double getDouble(String key, Double defaultValue) {
+ String value = getProperty(key);
+ try {
+ return (value == null) ? defaultValue : Double.valueOf(value);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return 0.0D;
+ }
+
+}
diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/DateUtil.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/DateUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..6ab24a2cbd1e3517c5bef3c72ffd81da5cf10c6c
--- /dev/null
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/DateUtil.java
@@ -0,0 +1,38 @@
+package com.k2data.governance.manager.util;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Created by wangyihan on 2017/12/29 下午6:46.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public class DateUtil {
+ private static ThreadLocal local = new ThreadLocal();
+ private static String formatString = "yyyy-MM-dd HH:mm:ss";
+
+ public static void setDateFormat(String formatStr) {
+ formatString = formatStr;
+ }
+
+ private static SimpleDateFormat getDateFormat() {
+ SimpleDateFormat dateFormat = local.get();
+ if (dateFormat == null) {
+ dateFormat = new SimpleDateFormat(formatString);
+ local.set(dateFormat);
+ }
+ return dateFormat;
+ }
+
+ public static String format(Date date) {
+ return getDateFormat().format(date);
+ }
+
+ public static Date parse(String dateStr) throws ParseException {
+ return getDateFormat().parse(dateStr);
+ }
+}
diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/InitUtil.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/InitUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..5a96f8253aa28b92c37733bdbd85139b79d8c77d
--- /dev/null
+++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/InitUtil.java
@@ -0,0 +1,34 @@
+package com.k2data.governance.manager.util;
+
+import com.k2data.governance.manager.common.constants.SparkConstants;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Created by wangyihan on 2017/12/6 下午3:42.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public class InitUtil {
+ public static SparkSession initSparkContext() {
+ SparkSession.Builder builder = SparkSession.builder();
+ boolean local = ConfigureUtil.getBoolean(SparkConstants.SPARK_LOCAL);
+ builder.appName(ConfigureUtil.getString(SparkConstants.SPARK_APP_NAME));
+ if (local) {
+ builder.master(ConfigureUtil.getString(SparkConstants.SPARK_MASTER));
+ }
+ return builder.getOrCreate();
+ }
+
+ public static SparkSession initSparkContext(boolean local) {
+ SparkSession.Builder builder = SparkSession.builder();
+ builder.appName(ConfigureUtil.getString(SparkConstants.SPARK_APP_NAME));
+ if (local) {
+ builder.master(ConfigureUtil.getString(SparkConstants.SPARK_MASTER));
+ }
+ return builder.getOrCreate();
+ }
+}
diff --git a/k2de-governance-exception-manager-base/src/main/resources/config.properties b/k2de-governance-exception-manager-base/src/main/resources/config.properties
new file mode 100644
index 0000000000000000000000000000000000000000..966d41988e5ecae4168346e92a874479cda835eb
--- /dev/null
+++ b/k2de-governance-exception-manager-base/src/main/resources/config.properties
@@ -0,0 +1,3 @@
+spark.local=false
+spark.master=local[4]
+spark.app.name=GWExceptionManager
\ No newline at end of file
diff --git a/k2de-governance-exception-manager/src/main/resources/log4j.properties b/k2de-governance-exception-manager-base/src/main/resources/log4j.properties
similarity index 87%
rename from k2de-governance-exception-manager/src/main/resources/log4j.properties
rename to k2de-governance-exception-manager-base/src/main/resources/log4j.properties
index a1426d2bc2e61a387ea703be3f5c73a9aa809f4e..68429adc65d3b1c50b8dc202f9fb0dd704a8ab1d 100644
--- a/k2de-governance-exception-manager/src/main/resources/log4j.properties
+++ b/k2de-governance-exception-manager-base/src/main/resources/log4j.properties
@@ -1,6 +1,6 @@
# You should copy this file to resources, and rename it as log4j.properties
-log4j.rootLogger=INFO,stdout
+log4j.rootLogger=ERROR,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
\ No newline at end of file
diff --git a/k2de-governance-exception-manager-huineng/pom.xml b/k2de-governance-exception-manager-huineng/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..7fcddcd49dd559cd1b02c47e3002c41d6c2daa9b
--- /dev/null
+++ b/k2de-governance-exception-manager-huineng/pom.xml
@@ -0,0 +1,71 @@
+
+
+
+ k2de-governance-parent
+ com.k2data
+ 1.0-SNAPSHOT
+
+ 4.0.0
+
+ k2de-governance-exception-manager-huineng
+
+
+
+ com.k2data
+ k2de-governance-exception-manager-base
+ 1.0-SNAPSHOT
+
+
+ com.k2data
+ k2de-governance-exception-manager-kmx
+ 1.0-SNAPSHOT
+
+
+ org.apache.spark
+ spark-core_${spark.comp.version}
+
+
+ org.apache.spark
+ spark-sql_${spark.comp.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.6.1
+
+
+ ${compile.version}
+
+
+
+ maven-assembly-plugin
+ 2.4
+
+
+
+ com.k2data.governance.manager.ExceptionManagerSpark
+
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnErrorType.java b/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnErrorType.java
new file mode 100644
index 0000000000000000000000000000000000000000..99255c34fa29b668fa5b9624ffab960a9107a298
--- /dev/null
+++ b/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnErrorType.java
@@ -0,0 +1,118 @@
+package com.k2data.governance.manager.huineng;
+
+import com.k2data.governance.manager.error.IErrorType;
+import com.k2data.governance.manager.metadata.IMetaDatabase;
+import com.k2data.governance.manager.result.AnalyzeResultImpl;
+import com.k2data.governance.manager.result.IAnalyzeResult;
+import com.k2data.governance.manager.util.DateUtil;
+import scala.Serializable;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+
+/**
+ * Created by wangyihan on 2017/12/29 下午12:01.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public enum HnErrorType implements IErrorType, Serializable {
+ FORMAT(0), // no realtime| in the msg
+ WTID_NULL(1), // wtid is not in DB
+ TIME(2), // time format wrong
+ PROTO_NULL(3), // protocolId is not in DB
+ FIELDS_NUM_MORE(4), // number of data more than in the DB
+ FIELDS_NUM_LESS(5), // number of data less than in the DB
+ WTID_EMPTY(6),
+ OTHER(7);
+
+ public static String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+ private final int value;
+ private static String HN_DATA_START_TAG = "realtimedata|";
+ private static int fieldDiffTolerance = 30;
+ private static SimpleDateFormat format = new SimpleDateFormat(TIME_FORMAT);
+
+ HnErrorType(int value) {
+ this.value = value;
+ }
+
+ public String toErrorString() {
+ return String.format("HN_ERROR_%s", this);
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public static HnErrorType fromInteger(int id) {
+ return HnErrorType.values()[id];
+ }
+
+
+ public void setFiledDiffTolerance(int tolerance) {
+ fieldDiffTolerance = tolerance;
+ }
+
+ @Override
+ public IAnalyzeResult analyzeRecord(String msg) {
+ return this.analyzeRecord(msg, null);
+ }
+
+ @Override
+ public IAnalyzeResult analyzeRecord(String msg, IMetaDatabase metaDatabase) {
+ IAnalyzeResult result = new AnalyzeResultImpl();
+ int formatIndex = msg.indexOf(HN_DATA_START_TAG);
+ if (formatIndex < 0) {
+ result.setErrorType(HnErrorType.FORMAT);
+ return result;
+ }
+ int startIndex = formatIndex + HN_DATA_START_TAG.length();
+ String dataSubstr = msg.substring(startIndex);
+ String[] splitData = dataSubstr.split("\\|");
+ String wtid = splitData[0].trim();
+ String[] data = splitData[1].split(",");
+ String ts = data[0];
+ try {
+ DateUtil.parse(ts);
+ } catch (ParseException e) {
+ result.setErrorType(HnErrorType.TIME);
+ }
+ if (wtid.trim().isEmpty()) {
+ result.setErrorType(HnErrorType.WTID_EMPTY);
+ return result;
+ }
+ result.setAssetId(wtid);
+ if (metaDatabase == null) {
+ result.setErrorType(HnErrorType.OTHER);
+ return result;
+ }
+ String fieldGroup = metaDatabase.getFieldGroup(wtid);
+ if (fieldGroup == null) {
+ result.setErrorType(HnErrorType.WTID_NULL);
+ return result;
+ }
+ result.setFieldGroup(fieldGroup);
+ List fields = metaDatabase.getFieldsByFieldGroup(fieldGroup);
+ if (fields == null) {
+ result.setErrorType(HnErrorType.PROTO_NULL);
+ result.setAssetId(null); // this type of exception dose not need wtid
+ return result;
+ }
+ if (fields.size() - (data.length - 1) > fieldDiffTolerance) {
+ result.setErrorType(HnErrorType.FIELDS_NUM_LESS);
+ return result;
+ }
+ if ((data.length - 1) - fields.size() > fieldDiffTolerance) {
+ result.setErrorType(HnErrorType.FIELDS_NUM_MORE);
+ return result;
+ }
+
+ // if not in all errors above, then put it in OTHER
+ if (result.getErrorType() != HnErrorType.TIME) {
+ result.setErrorType(HnErrorType.OTHER);
+ }
+ return result;
+ }
+}
diff --git a/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnExceptionManagerMain.java b/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnExceptionManagerMain.java
new file mode 100644
index 0000000000000000000000000000000000000000..fca9034e17eacacb20f50b1f092de418d26c67d0
--- /dev/null
+++ b/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnExceptionManagerMain.java
@@ -0,0 +1,34 @@
+package com.k2data.governance.manager.huineng;
+
+import com.k2data.governance.manager.ExceptionManagerSpark;
+import com.k2data.governance.manager.common.constants.ArgumentsConstants;
+import com.k2data.governance.manager.filter.IFilter;
+import com.k2data.governance.manager.metadata.IMetaDatabase;
+import com.k2data.governance.manager.metadata.SqliteMetaDatabase;
+import com.k2data.governance.manager.util.DateUtil;
+import net.sourceforge.argparse4j.inf.Namespace;
+
+/**
+ * Created by wangyihan on 2017/12/29 下午5:10.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public class HnExceptionManagerMain {
+
+ public static void main(String[] args) {
+ DateUtil.setDateFormat(HnErrorType.TIME_FORMAT);
+ Namespace ns = ExceptionManagerSpark.parseArguments(args);
+ String metadataFilePath = ns.getString(ArgumentsConstants.METADATA_FILE);
+ String wtidProtocolSql = "SELECT wtid, protocolid FROM wtinfo";
+ String protocolFieldsSql = "SELECT protocolid, iecpath FROM propaths " +
+ "WHERE iecpath <> 'WTUR.Tm.Rw.Dt' AND transtype='1' AND bsave=1 " +
+ "ORDER BY protocolid, pathid";
+ IMetaDatabase metadataBase = new SqliteMetaDatabase(metadataFilePath, wtidProtocolSql, protocolFieldsSql);
+ ExceptionManagerSpark.setMetadataBase(metadataBase);
+ IFilter filter = new HnFilter();
+ ExceptionManagerSpark.setFilter(filter);
+ ExceptionManagerSpark.run(ns);
+ }
+}
diff --git a/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnFilter.java b/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnFilter.java
new file mode 100644
index 0000000000000000000000000000000000000000..1fed7e0801f21a9c564d67d1ff25e61b30d7836c
--- /dev/null
+++ b/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnFilter.java
@@ -0,0 +1,31 @@
+package com.k2data.governance.manager.huineng;
+
+import com.k2data.governance.manager.filter.IFilter;
+import com.k2data.governance.manager.kmx.KmxErrorType;
+import org.apache.log4j.Logger;
+import scala.Serializable;
+
+/**
+ * Created by wangyihan on 2017/12/29 下午2:43.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public class HnFilter implements IFilter, Serializable {
+ private String KMX_START_TAG = "[EXCEPTION]";
+ private String HN_START_TAG = "sediment";
+
+ public HnFilter() {
+ }
+
+ @Override
+ public Class getErrorClass(String message) {
+ if (message.startsWith(KMX_START_TAG)) {
+ return KmxErrorType.class;
+ } else if (message.startsWith(HN_START_TAG)) {
+ return HnErrorType.class;
+ }
+ return null;
+ }
+}
diff --git a/k2de-governance-exception-manager-huineng/src/main/resources/log4j.properties b/k2de-governance-exception-manager-huineng/src/main/resources/log4j.properties
new file mode 100644
index 0000000000000000000000000000000000000000..68429adc65d3b1c50b8dc202f9fb0dd704a8ab1d
--- /dev/null
+++ b/k2de-governance-exception-manager-huineng/src/main/resources/log4j.properties
@@ -0,0 +1,6 @@
+# You should copy this file to resources, and rename it as log4j.properties
+
+log4j.rootLogger=ERROR,stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
\ No newline at end of file
diff --git a/k2de-governance-exception-manager-kmx/pom.xml b/k2de-governance-exception-manager-kmx/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..3c545fb5b34262cd85e0f626cb6eeba3db61a77d
--- /dev/null
+++ b/k2de-governance-exception-manager-kmx/pom.xml
@@ -0,0 +1,67 @@
+
+
+
+ com.k2data
+ k2de-governance-parent
+ 1.0-SNAPSHOT
+
+ 4.0.0
+
+ k2de-governance-exception-manager-kmx
+
+
+
+ com.k2data
+ k2de-governance-exception-manager-base
+ 1.0-SNAPSHOT
+
+
+ org.apache.spark
+ spark-core_${spark.comp.version}
+
+
+ org.apache.spark
+ spark-sql_${spark.comp.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.6.1
+
+
+ ${compile.version}
+
+
+
+ maven-assembly-plugin
+ 2.4
+
+
+
+ com.k2data.governance.manager.ExceptionManagerSpark
+
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxErrorType.java b/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxErrorType.java
new file mode 100644
index 0000000000000000000000000000000000000000..71f8c6c244cc4c971d4801ae467859d4d04ffe08
--- /dev/null
+++ b/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxErrorType.java
@@ -0,0 +1,91 @@
+package com.k2data.governance.manager.kmx;
+
+import com.k2data.governance.manager.error.IErrorType;
+import com.k2data.governance.manager.metadata.IMetaDatabase;
+import com.k2data.governance.manager.result.AnalyzeResultImpl;
+import com.k2data.governance.manager.result.IAnalyzeResult;
+import org.apache.log4j.Logger;
+import scala.Serializable;
+
+/**
+ * Created by wangyihan on 2017/12/29 上午11:32.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public enum KmxErrorType implements IErrorType, Serializable {
+ DOUBLE(0), BOOLEAN(1), LONG(2), STRING(3), INT(4), FLOAT(5), OTHER(6);
+
+ private final int value;
+ private static final String TYPE_PREFIX = ": ";
+ private static final String TYPE_SUFFIX = "_EX";
+ private final Logger logger = Logger.getLogger(KmxErrorType.class);
+
+ KmxErrorType() {
+ this.value = 6;
+ }
+
+ KmxErrorType(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public int getValue() {
+ return value;
+ }
+
+ @Override
+ public String toErrorString() {
+ return String.format("KMX_ERROR_DATATYPE_%s", this);
+ }
+
+ public static KmxErrorType fromInteger(int id) {
+ return KmxErrorType.values()[id];
+ }
+
+ public static KmxErrorType fromString(String type) {
+ KmxErrorType dataType;
+ switch (type.toLowerCase()) {
+ case "double":
+ dataType = KmxErrorType.DOUBLE;
+ break;
+ case "boolean":
+ dataType = KmxErrorType.BOOLEAN;
+ break;
+ case "long":
+ dataType = KmxErrorType.LONG;
+ break;
+ case "string":
+ dataType = KmxErrorType.STRING;
+ break;
+ case "int":
+ dataType = KmxErrorType.INT;
+ break;
+ case "float":
+ dataType = KmxErrorType.FLOAT;
+ break;
+ default:
+ dataType = KmxErrorType.OTHER;
+ break;
+ }
+
+ return dataType;
+ }
+
+ @Override
+ public IAnalyzeResult analyzeRecord(String msg) {
+ int indexType = msg.indexOf(TYPE_SUFFIX);
+ if (indexType == -1) {
+ return new AnalyzeResultImpl(KmxErrorType.OTHER);
+ }
+ String type = msg.substring(msg.indexOf(TYPE_PREFIX) + TYPE_PREFIX.length(), indexType);
+ return new AnalyzeResultImpl(KmxErrorType.fromString(type));
+ }
+
+ @Override
+ public IAnalyzeResult analyzeRecord(String msg, IMetaDatabase database) {
+ return analyzeRecord(msg);
+ }
+
+}
diff --git a/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxExceptionManagerMain.java b/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxExceptionManagerMain.java
new file mode 100644
index 0000000000000000000000000000000000000000..37a05022073a7df685692a51fd6b3aa3d418821e
--- /dev/null
+++ b/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxExceptionManagerMain.java
@@ -0,0 +1,20 @@
+package com.k2data.governance.manager.kmx;
+
+import com.k2data.governance.manager.ExceptionManagerSpark;
+import com.k2data.governance.manager.filter.IFilter;
+
+/**
+ * Created by wangyihan on 2017/12/29 下午5:18.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public class KmxExceptionManagerMain {
+
+ public static void main(String[] args) {
+ IFilter filter = new KmxFilter();
+ ExceptionManagerSpark.setFilter(filter);
+ ExceptionManagerSpark.run(args);
+ }
+}
diff --git a/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxFilter.java b/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxFilter.java
new file mode 100644
index 0000000000000000000000000000000000000000..22d3b84d063321dcc76918c423a7587828d88b62
--- /dev/null
+++ b/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxFilter.java
@@ -0,0 +1,19 @@
+package com.k2data.governance.manager.kmx;
+
+import com.k2data.governance.manager.filter.IFilter;
+import scala.Serializable;
+
+/**
+ * Created by wangyihan on 2017/12/29 上午11:27.
+ * E-mail address is yihanwang22@163.com.
+ * Copyright © 2017 wangyihan. All Rights Reserved.
+ *
+ * @author wangyihan
+ */
+public class KmxFilter implements IFilter, Serializable {
+
+ @Override
+ public Class getErrorClass(String message) {
+ return KmxErrorType.class;
+ }
+}
diff --git a/k2de-governance-exception-manager-kmx/src/main/resources/log4j.properties b/k2de-governance-exception-manager-kmx/src/main/resources/log4j.properties
new file mode 100644
index 0000000000000000000000000000000000000000..68429adc65d3b1c50b8dc202f9fb0dd704a8ab1d
--- /dev/null
+++ b/k2de-governance-exception-manager-kmx/src/main/resources/log4j.properties
@@ -0,0 +1,6 @@
+# You should copy this file to resources, and rename it as log4j.properties
+
+log4j.rootLogger=ERROR,stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
\ No newline at end of file
diff --git a/k2de-governance-exception-manager/pom.xml b/k2de-governance-exception-manager/pom.xml
deleted file mode 100644
index dc1cef1972f5e20e1d983d231ffe18dba6e1250c..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/pom.xml
+++ /dev/null
@@ -1,92 +0,0 @@
-
- 4.0.0
-
-
- com.k2data
- k2de-governance-parent
- 1.0-SNAPSHOT
-
-
- k2de-governance-exception-manager
- 0.0.1-SNAPSHOT
- jar
-
- ExceptionManager
- http://maven.apache.org
-
-
- UTF-8
- 1.8
- 2.6.0
-
-
-
-
- org.apache.hadoop
- hadoop-client
-
-
-
- org.apache.hadoop
- hadoop-common
-
-
-
- org.apache.hadoop
- hadoop-hdfs
-
-
-
- org.xerial
- sqlite-jdbc
-
-
-
- com.sun
- tools
- 1.4.2
- system
- ${java.home}/../lib/tools.jar
-
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.6.1
-
-
- ${compile.version}
-
-
-
- maven-assembly-plugin
- 2.4
-
-
-
- com.k2data.governance.manager.ExceptionManagerSingle
-
-
-
- jar-with-dependencies
-
-
-
-
- make-assembly
- package
-
- single
-
-
-
-
-
-
-
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/ExceptionManagerSingle.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/ExceptionManagerSingle.java
deleted file mode 100644
index 2d8d64763afb2bd8bb2815d66680640dcabbdeb6..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/ExceptionManagerSingle.java
+++ /dev/null
@@ -1,167 +0,0 @@
-package com.k2data.governance.manager;
-
-import com.k2data.governance.manager.analyzer.GWhnAnalyzer;
-import com.k2data.governance.manager.analyzer.GWkmxAnalyzer;
-import com.k2data.governance.manager.analyzer.IAnalyzer;
-import com.k2data.governance.manager.common.Constants;
-import com.k2data.governance.manager.common.type.GWDataType;
-import com.k2data.governance.manager.common.type.GWSourceType;
-import com.k2data.governance.manager.common.type.HNErrorType;
-import com.k2data.governance.manager.filter.GWFilter;
-import com.k2data.governance.manager.filter.IFilter;
-import com.k2data.governance.manager.message.IMessage;
-import com.k2data.governance.manager.parser.GWParserSingle;
-import com.k2data.governance.manager.util.FileHandler;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.List;
-
-/**
- * Created by stoke on 2017/11/16.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * Main process for exception manager
- *
- * @author stoke
- */
-public class ExceptionManagerSingle {
-
- private int[] kmxTypeNums;
- private int[] hnTypeNums;
-
- public ExceptionManagerSingle() {
- kmxTypeNums = new int[GWDataType.values().length];
- hnTypeNums = new int[HNErrorType.values().length];
- }
-
- public int[] getKmxTypeNums() {
- return kmxTypeNums;
- }
-
- public int[] getHnTypeNums() {
- return hnTypeNums;
- }
-
- private void sumTypeNums(int[] singleTypeNums, int[] finalTypeNums) {
- if (singleTypeNums.length != finalTypeNums.length) {
- System.out.println("Length does not match");
- return;
- }
- for (int i = 0; i < finalTypeNums.length; ++i) {
- finalTypeNums[i] += singleTypeNums[i];
- }
- }
-
- private void printResult(String outputPath) {
- String kmxFile = outputPath + "kmx.out";
- String hnFile = outputPath + "hn.out";
-
- try {
- FileWriter fw = new FileWriter(kmxFile);
- PrintWriter pw = new PrintWriter(fw);
- for (GWDataType type : GWDataType.values()) {
- pw.println(type + ":" + kmxTypeNums[type.getValue()]);
- }
- pw.close();
- fw.close();
-
- fw = new FileWriter(hnFile);
- pw = new PrintWriter(fw);
- for (HNErrorType type : HNErrorType.values()) {
- pw.println(type + ":" + hnTypeNums[type.getValue()]);
- }
- pw.close();
- fw.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public static void main(String[] args) {
- System.out.println("ExceptionManagerSingle begin...");
-
- if (args.length != 5) {
- System.out.println("Input arguments number error:");
- System.out.println("args[0]: 0:sequence file; 1:file after parse;");
- System.out.println("args[1]: the input directory;");
- System.out.println("args[2]: the number of example to be printed;");
- System.out.println("args[3]: the configdata.file directory;");
- System.out.println("args[4]: the output directory;");
- return;
- }
-
- // String parsePath = "result/anomaly-10k";
- List messageList;
- int sel = Integer.parseInt(args[0]);
- String inputPath = args[1];
- // -1 for printing all
- int exampleNum = Integer.parseInt(args[2]);
- String dbPath = args[3];
- if (!dbPath.trim().equals("")) {
- Constants.PROTOCOL_MAP_DIR = dbPath;
- }
- String outputPath = args[4];
-
- File dir = new File(inputPath);
- String[] fileNames;
- if (dir.exists() && dir.isDirectory()) {
- fileNames = dir.list();
- if (fileNames == null) {
- System.out.println("No files in " + inputPath);
- }
- } else {
- System.out.println("Can not find the directory " + inputPath);
- return;
- }
-
- ExceptionManagerSingle ems = new ExceptionManagerSingle();
-
- for (String fileName : fileNames) {
- if (!FileHandler.isGWFileValid(fileName)) {
- continue;
- }
- if (sel == 1) {
- messageList = FileHandler.readAnomalyString(inputPath + fileName);
- } else {
- GWParserSingle parser = new GWParserSingle(inputPath + fileName);
- parser.parse();
- messageList = parser.getMessageList();
-// FileHandler.writeAnomalyString(outputPath + fileName + ".out", messageList);
- }
-
- IFilter filter = new GWFilter(messageList);
- filter.filter();
-
- // KMX error
- IAnalyzer kmxAnalyzer = new GWkmxAnalyzer(
- filter.getCertainSourceTypeList(GWSourceType.KMX));
- kmxAnalyzer.printNumber();
- for (IMessage message : kmxAnalyzer.getMessageList()) {
- kmxAnalyzer.analyze(message);
- }
- int[] kmxNums = kmxAnalyzer.printType();
- ems.sumTypeNums(kmxNums, ems.getKmxTypeNums());
- // kmxAnalyzer.printCertainTypeList(GWDataType.DOUBLE.getValue(), exampleNum);
-
- // HN error
- IAnalyzer hnAnalyzer = new GWhnAnalyzer(Constants.PROTOCOL_MAP_DIR,
- filter.getCertainSourceTypeList(GWSourceType.HN));
- hnAnalyzer.printNumber();
- for (IMessage message : hnAnalyzer.getMessageList()) {
- hnAnalyzer.analyze(message);
- }
- int[] hnNums = hnAnalyzer.printType();
- ems.sumTypeNums(hnNums, ems.getHnTypeNums());
-// for (HNErrorType hType : HNErrorType.values()) {
-// hnAnalyzer.printCertainTypeList(hType.getValue(), exampleNum);
-// }
- }
-
- ems.printResult(outputPath);
- System.out.println("ExceptionManagerSingle end");
- }
-}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/GWhnAnalyzer.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/GWhnAnalyzer.java
deleted file mode 100644
index e0c5610fe38fc7831ed6d527dfb8c6e6c98646f2..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/GWhnAnalyzer.java
+++ /dev/null
@@ -1,159 +0,0 @@
-package com.k2data.governance.manager.analyzer;
-
-import com.k2data.governance.manager.common.Constants;
-import com.k2data.governance.manager.common.type.HNErrorType;
-import com.k2data.governance.manager.message.IMessage;
-import com.k2data.governance.manager.metadata.GWSqliteMetaDatabase;
-import com.k2data.governance.manager.metadata.IMetaDatabase;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by stoke on 2017/11/16.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * @author stoke
- */
-public class GWhnAnalyzer implements IAnalyzer {
-
- private IMetaDatabase metaDatabase;
- private SimpleDateFormat format = new SimpleDateFormat(Constants.TIME_FORMAT);
-
- private List messageList;
- private Map> hnTypeMessageMap;
-
- public GWhnAnalyzer(String dbFilePath, List messageList) {
- metaDatabase = new GWSqliteMetaDatabase(dbFilePath);
- this.messageList = messageList;
-
- hnTypeMessageMap = new HashMap<>();
- for (HNErrorType type : HNErrorType.values()) {
- hnTypeMessageMap.put(type, new ArrayList<>());
- }
- }
-
- @Override
- public List getMessageList() {
- return messageList;
- }
-
- /**
- * Message Pattern:
- * time: yyyy-MM-dd hh:mm:ss.SSS
- * sendiment|time|(realtimedata|wtid|time,data[0],data[1],…,
- * @param message message
- */
- @Override
- public void analyze(IMessage message) {
- String key = message.getKey();
- if (!key.trim().equals("")) {
- System.out.println(key);
- }
-
- String msg = message.getMsg();
-
- int formatIndex = msg.indexOf(Constants.GW_HN_DATA_START_TAG);
- if (formatIndex < 0) {
- hnTypeMessageMap.get(HNErrorType.FORMAT).add(message);
- return;
- }
-
- int startIndex = formatIndex + Constants.GW_HN_DATA_START_TAG.length();
- String dataSubstr = msg.substring(startIndex);
- String[] splitData = dataSubstr.split("\\|");
- String wtid = splitData[0].trim();
- String[] data = splitData[1].split(",");
-
- String ts = data[0];
- try {
- format.parse(ts);
- } catch (ParseException e) {
- hnTypeMessageMap.get(HNErrorType.TIME).add(message);
- return;
- }
-
- if (wtid.trim().isEmpty()) {
- hnTypeMessageMap.get(HNErrorType.WTID_EMPTY).add(message);
- return;
- }
-
- // String wfid = ((GWSqliteMetaDatabase)metaDatabase).getWfid(wtid);
- String fieldGroup = metaDatabase.getFieldGroup(wtid);
- if (fieldGroup == null) {
- hnTypeMessageMap.get(HNErrorType.PROTO_NULL).add(message);
- return;
- }
-
- List fields = metaDatabase.getFieldsByFieldGroup(fieldGroup);
- if (fields == null) {
- hnTypeMessageMap.get(HNErrorType.FIELDS_NULL).add(message);
- return;
- }
-
- int fSize = fields.size();
- if (fSize > data.length - 1) {
- hnTypeMessageMap.get(HNErrorType.FIELDS_NUM_LESS).add(message);
- return;
- } else if (fSize < data.length - 1) {
- hnTypeMessageMap.get(HNErrorType.FIELDS_NUM_MORE).add(message);
- return;
- }
-
- // if not in all errors above, then put it in OTHER
- hnTypeMessageMap.get(HNErrorType.OHTER).add(message);
- }
-
- @Override
- public void printNumber() {
- System.out.println("HN anomaly number : " + messageList.size());
- }
-
- @Override
- public void printAll() {
- printNumber();
-
- for (IMessage message : messageList) {
- System.out.println(message.toString());
- }
- }
-
- @Override
- public int[] printType() {
- int[] typeNums = new int[HNErrorType.values().length];
-
- hnTypeMessageMap.forEach((type, certainList) -> {
- int size = certainList.size();
- typeNums[type.getValue()] = size;
- System.out.println(
- String.format("Exception type %s has %d messages", type.toString(), size));
- });
- int sumNum = 0;
- for (int num : typeNums) {
- sumNum += num;
- }
- System.out.println("There are " + sumNum + " hit exception messages in total");
- return typeNums;
- }
-
- /**
- *
- * @param id type.value()
- * @param size default msgList.size()
- */
- public void printCertainTypeList(int id, int size) {
- HNErrorType type = HNErrorType.fromInteger(id);
- System.out.println("Error type : " + type);
- List msgList = hnTypeMessageMap.get(type);
- int realSize = msgList.size();
- int num = size > realSize ? realSize : size;
- for (int i = 0; i < num; ++i) {
- System.out.println(msgList.get(i).toString());
- }
- }
-}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/GWkmxAnalyzer.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/GWkmxAnalyzer.java
deleted file mode 100644
index 9ba1553ea0f0163bb4f2a0e800844bfa15058d52..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/GWkmxAnalyzer.java
+++ /dev/null
@@ -1,121 +0,0 @@
-package com.k2data.governance.manager.analyzer;
-
-import com.k2data.governance.manager.common.type.GWDataType;
-import com.k2data.governance.manager.message.IMessage;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.log4j.Logger;
-
-/**
- * Created by stoke on 2017/11/16.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * Analyze KMX type Exception
- *
- * @author stoke
- */
-public class GWkmxAnalyzer implements IAnalyzer {
-
- private static final String TYPE_PREFIX = ": ";
- private static final String TYPE_SUFFIX = "_EX";
- private static final String TYPE_PREFIX_SMALL = "Invalid ";
- private static final String TYPE_SUFFIX_SMALL = " value";
- private static final String VALUE_PREFIX = "value ";
- private static final String VALUE_SUFFIX = " for field";
- private static final String FIELD_PREFIX = "field ";
- private static final String FIELD_SUFFIX = " [GENERATED";
-
- private Logger logger = Logger.getLogger(GWkmxAnalyzer.class);
-
- private List messageList;
- private Map> kmxTypeMessageMap;
-
- public GWkmxAnalyzer(List messageList) {
- this.messageList = messageList;
- kmxTypeMessageMap = new HashMap<>();
- }
-
- @Override
- public List getMessageList() {
- return messageList;
- }
-
- /**
- * Message Pattern:
- * [EXCEPTION] [Increase-Counters: _EX, RECORD_DROP] Invalid value for field
- * [GENERATED AVRO RECORDS] []
- *
- * @param message the exception message
- */
- @Override
- public void analyze(IMessage message) {
- String key = message.getKey();
- if (!key.trim().equals("")) {
- System.out.println(key);
- }
-
- String msg = message.getMsg();
- int indexType = msg.indexOf(TYPE_SUFFIX);
- if (indexType == -1) {
- String err = String.format("The pattern is wrong since no _EX included in %s", msg);
- logger.error(err);
- }
- String type = msg.substring(msg.indexOf(TYPE_PREFIX) + TYPE_PREFIX.length(), indexType);
- GWDataType dataType = GWDataType.fromString(type);
- if (!kmxTypeMessageMap.containsKey(dataType)) {
- kmxTypeMessageMap.put(dataType, new ArrayList<>());
- }
- kmxTypeMessageMap.get(dataType).add(message);
-
-// String type2 = msg.substring(msg.indexOf(TYPE_PREFIX_SMALL) + TYPE_PREFIX_SMALL.length(),
-// msg.indexOf(TYPE_SUFFIX_SMALL));
-// String field = msg
-// .substring(msg.indexOf(FIELD_PREFIX) + FIELD_PREFIX.length(), msg.indexOf(FIELD_SUFFIX));
-// String value = msg
-// .substring(msg.indexOf(VALUE_PREFIX) + VALUE_PREFIX.length(), msg.indexOf(VALUE_SUFFIX));
-// // TODO check the value type
- }
-
- @Override
- public void printNumber() {
- System.out.println("KMX anomaly number : " + messageList.size());
- }
-
- @Override
- public void printAll() {
- printNumber();
-
- for (IMessage message : messageList) {
- System.out.println(message.toString());
- }
- }
-
- @Override
- public int[] printType() {
- int[] typeNums = new int[GWDataType.values().length];
-
- kmxTypeMessageMap.forEach((type, certainList) -> {
- int size = certainList.size();
- typeNums[type.getValue()] = size;
- System.out.println(
- String.format("Exception type %s has %d messages", type.toString(), certainList.size()));
- });
-
- return typeNums;
- }
-
- @Override
- public void printCertainTypeList(int id, int size) {
- GWDataType type = GWDataType.fromInteger(id);
- System.out.println("Error type : " + type);
- List msgList = kmxTypeMessageMap.get(type);
- int realsize = msgList.size();
- int num = size > realsize ? realsize : size;
- for (int i = 0; i < num; ++i) {
- System.out.println(msgList.get(i).toString());
- }
- }
-}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/IAnalyzer.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/IAnalyzer.java
deleted file mode 100644
index 82ba7debd816bc042ecc83e2cb86d03b610ae639..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/IAnalyzer.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.k2data.governance.manager.analyzer;
-
-import com.k2data.governance.manager.message.IMessage;
-import java.util.List;
-
-/**
- * Created by stoke on 2017/11/16.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * @author stoke
- */
-public interface IAnalyzer {
- List getMessageList();
- void analyze(IMessage message);
- void printNumber();
- void printAll();
- int[] printType();
- void printCertainTypeList(int id, int size);
-}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/Constants.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/Constants.java
deleted file mode 100644
index 64e08fb861d0d4e30a190c7963ad87243c46dbb8..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/Constants.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.k2data.governance.manager.common;
-
-/**
- * Created by stoke on 2017/11/16.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * @author stoke
- */
-public class Constants {
-
- public static String GW_MSG_SPILT_OP = ",";
- public static String GW_EXCEPTION_END = ".complete";
-
- public static String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
-
- public static String GW_HN_START_TAG = "sediment";
- public static String GW_KMX_START_TAG = "[EXCEPTION]";
- public static String GW_HN_DATA_START_TAG = "realtimedata|";
-
-// public static String PROTOCOL_MAP_DIR = "/Users/stoke/code/github/GWDataEngineering/"
-// + "ExceptionManager/src/main/resources/configdata.file";
- public static String PROTOCOL_MAP_DIR = "src/main/resources/configdata.file";
- public static String GW_META_SQLITE_FILE_DEFAULT = "metadata.db";
-}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/GWDataType.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/GWDataType.java
deleted file mode 100644
index 39539c517dbc486c4270decbad39404b10a10507..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/GWDataType.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package com.k2data.governance.manager.common.type;
-
-/**
- * Created by stoke on 2017/11/16.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * @author stoke
- */
-public enum GWDataType {
- DOUBLE(0), BOOLEAN(1), LONG(2), STRING(3), INT(4), FLOAT(5), OTHER(6);
-
- private final int value;
-
- GWDataType(int value) {
- this.value = value;
- }
-
- public int getValue() {
- return value;
- }
-
- public static GWDataType fromInteger(int id) {
- return GWDataType.values()[id];
- }
-
- public static GWDataType fromString(String type) {
- GWDataType dataType;
- switch (type.toLowerCase()) {
- case "double":
- dataType = GWDataType.DOUBLE;
- break;
- case "boolean":
- dataType = GWDataType.BOOLEAN;
- break;
- case "long":
- dataType = GWDataType.LONG;
- break;
- case "string":
- dataType = GWDataType.STRING;
- break;
- case "int":
- dataType = GWDataType.INT;
- break;
- case "float":
- dataType = GWDataType.FLOAT;
- break;
- default:
- dataType = GWDataType.OTHER;
- break;
- }
-
- return dataType;
- }
-
-}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/GWSourceType.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/GWSourceType.java
deleted file mode 100644
index e04a94774a13e93b144c1254433422ff8832c902..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/GWSourceType.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package com.k2data.governance.manager.common.type;
-
-/**
- * Created by stoke on 2017/11/16.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * @author stoke
- */
-public enum GWSourceType {
- KMX(0), HN(1), OTHER(2);
-
- private final int value;
-
- GWSourceType(int value) {
- this.value = value;
- }
-
- public int getValue() {
- return value;
- }
-
- public static GWSourceType fromInteger(int id) {
- return GWSourceType.values()[id];
- }
-
- public static GWSourceType fromString(String type) {
- GWSourceType sourceType;
- switch (type.toLowerCase()) {
- case "kmx":
- sourceType = GWSourceType.KMX;
- break;
- case "hn":
- sourceType = GWSourceType.HN;
- break;
- default:
- sourceType = GWSourceType.OTHER;
- break;
- }
-
- return sourceType;
- }
-
-}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/HNErrorType.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/HNErrorType.java
deleted file mode 100644
index d63e965231b7c0ee6c5bd3efe9a1966e9cbe1135..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/HNErrorType.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.k2data.governance.manager.common.type;
-
-/**
- * Created by stoke on 2017/11/16.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * @author stoke
- */
-public enum HNErrorType {
- FORMAT(0), // no realtime| in the msg
- WTID_NULL(1), // wtid is not in DB
- TIME(2), // time format wrong
- PROTO_NULL(3), // protocolid is not in DB
- FIELDS_NUM_MORE(4), // number of data more than in the DB
- FIELDS_NUM_LESS(5), // number of data less than in the DB
- WTID_EMPTY(6),
- FIELDS_NULL(7), // fields null
- OHTER(8);
-
- private final int value;
-
- HNErrorType(int value) {
- this.value = value;
- }
-
- public int getValue() {
- return value;
- }
-
- public static HNErrorType fromInteger(int id) {
- return HNErrorType.values()[id];
- }
-
-}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/filter/GWFilter.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/filter/GWFilter.java
deleted file mode 100644
index 2b62d401fe5e3335c9296044dd28aa255ec4ea8c..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/filter/GWFilter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package com.k2data.governance.manager.filter;
-
-import com.k2data.governance.manager.common.Constants;
-import com.k2data.governance.manager.common.type.GWSourceType;
-import com.k2data.governance.manager.message.IMessage;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-/**
- * Created by stoke on 2017/11/16.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * @author stoke
- */
-public class GWFilter implements IFilter {
- private Logger logger = Logger.getLogger(GWFilter.class);
-
- private List messageList;
- private Map> gwSourceTypeMessageMap;
-
- public GWFilter(List messageList) {
- this.messageList = messageList;
- gwSourceTypeMessageMap = new HashMap<>();
-
- for (GWSourceType type : GWSourceType.values()) {
- gwSourceTypeMessageMap.put(type, new ArrayList<>());
- }
- }
-
- @Override
- public List getMessageList() {
- return messageList;
- }
-
- public Map> getGwSourceTypeMessageMap() {
- return gwSourceTypeMessageMap;
- }
-
- @Override
- public List getCertainSourceTypeList(GWSourceType type) {
- return gwSourceTypeMessageMap.get(type);
- }
-
- @Override
- public void filter() {
- for (IMessage message : messageList) {
- String msg = message.getMsg();
-
- if (msg.startsWith(Constants.GW_KMX_START_TAG)) {
- gwSourceTypeMessageMap.get(GWSourceType.KMX).add(message);
- } else if (msg.startsWith(Constants.GW_HN_START_TAG)) {
- gwSourceTypeMessageMap.get(GWSourceType.HN).add(message);
- } else {
- gwSourceTypeMessageMap.get(GWSourceType.OTHER).add(message);
- String err = String.format("Messages for other type, %s", message.toString());
- logger.error(err);
- }
- }
- }
-
-}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/AbstractMessage.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/AbstractMessage.java
deleted file mode 100644
index ae0b901268f6ad1c31df6a643715e84f7e134f77..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/AbstractMessage.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package com.k2data.governance.manager.message;
-
-/**
- * Created by stoke on 2017/11/16.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * @author stoke
- */
-public abstract class AbstractMessage implements IMessage {
- private String key;
- private String msg;
-
- @Override
- public String getKey() {
- return key;
- }
-
- @Override
- public void setKey(String key) {
- this.key = key;
- }
-
- @Override
- public String getMsg() {
- return msg;
- }
-
- @Override
- public void setMsg(String msg) {
- this.msg = msg;
- }
-
- @Override
- public String toString() {
- return String.format("%s,%s", getKey(), getMsg());
- }
-}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/GWMessage.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/GWMessage.java
deleted file mode 100644
index cf991ec10943b89b48142d33f0b646977fa38ac7..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/GWMessage.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.k2data.governance.manager.message;
-
-import com.k2data.governance.manager.common.Constants;
-
-/**
- * Created by stoke on 2017/11/16.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * @author stoke
- */
-public class GWMessage extends AbstractMessage {
-
- public GWMessage(String key, String msg) {
- setKey(key);
- setMsg(msg);
- }
-
- @Override
- public String toString() {
- return String.format("%s" + Constants.GW_MSG_SPILT_OP + "%s", getKey(), getMsg());
- }
-}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/parser/GWParserSingle.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/parser/GWParserSingle.java
deleted file mode 100644
index 64c06e2a880f48d89bc0617f5667570a25fbe6b9..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/parser/GWParserSingle.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.k2data.governance.manager.parser;
-
-import com.k2data.governance.manager.message.GWMessage;
-import com.k2data.governance.manager.message.IMessage;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-
-/**
- * Created by stoke on 2017/11/20.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * @author stoke
- */
-public class GWParserSingle {
- private static Configuration conf = new Configuration();
- private String path;
- private List messageList;
-
- public List getMessageList() {
- return messageList;
- }
-
- public GWParserSingle(String path) {
- messageList = new ArrayList<>();
- this.path = path;
- }
-
- public void parse() {
- Path filePath = new Path(path);
- try {
- FileSystem fs = FileSystem.get(conf);
- Reader reader = new Reader(fs, filePath, conf);
- Text key = new Text();
- Text value = new Text();
- while (reader.next(key, value)) {
- messageList.add(new GWMessage(key.toString(), value.toString()));
- }
- IOUtils.closeStream(reader);
-
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
-}
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/parser/ParserMR.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/parser/ParserMR.java
deleted file mode 100644
index b638217c567987d07db9055de5c755c9096dca1e..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/parser/ParserMR.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package com.k2data.governance.manager.parser;
-
-import com.k2data.governance.manager.util.FileHandler;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * Created by stoke on 2017/11/15.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * TODO: to refactor
- *
- * @author stoke
- */
-public class ParserMR {
-
- private static Reader reader = null;
- private static Configuration conf = new Configuration();
-
- // ref: http://blog.csdn.net/inte_sleeper/article/details/7010966
- public static void main(String[] args)
- throws IOException, ClassNotFoundException, InterruptedException {
- String inputPath = "/Users/stoke/lab/NEL/k2data/thu/abnormal/hdfs-bolt-anomaly-9-2-1509685150773.log.complete";
- // String outputPath = "result";
- // String inputPath = args[0];
- String outputPath = args[1];
-
- // remove the output path
- FileHandler.removeOutputDir(outputPath);
-
- System.out.println(inputPath);
- System.out.println(outputPath);
-
- Job job = new Job(conf, "read sequence file");
- job.setJarByClass(ParserMR.class);
- job.setMapperClass(ReadFileMapper.class);
- // This should be matched with the file
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
-
- Path path = new Path(inputPath);
- FileSystem fs = FileSystem.get(conf);
- reader = new Reader(fs, path, conf);
- FileInputFormat.addInputPath(job, path);
- FileOutputFormat.setOutputPath(job, new Path(outputPath));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
-
- public static class ReadFileMapper extends Mapper {
-
- @Override
- public void map(Writable key, Text value, Context context) {
- key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-
- try {
- while (reader.next(key, value)) {
- context.write(key, value);
- }
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-}
\ No newline at end of file
diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/util/FileHandler.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/util/FileHandler.java
deleted file mode 100644
index 0bf30aec2387ce5e16dff6ae2ae4a727affb2a0f..0000000000000000000000000000000000000000
--- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/util/FileHandler.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package com.k2data.governance.manager.util;
-
-import com.k2data.governance.manager.message.GWMessage;
-import com.k2data.governance.manager.message.IMessage;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Created by stoke on 2017/11/15.
- * E-mail address is zaqthss2009@gmail.com
- * Copyright © stoke. All Rights Reserved.
- *
- * @author stoke
- */
-public class FileHandler {
- public static final String SPLIT_OP = "\t";
-
- /**
- * used for MR job, remove the outputDir in advance
- * @param outputPath outputPath
- */
- public static void removeOutputDir(String outputPath) {
- File dir = new File(outputPath);
- if (dir.exists()) {
- File[] files = dir.listFiles();
- if (files != null) {
- for (File file : files) {
- file.delete();
- }
- }
- dir.delete();
- }
- }
-
- /**
- * @param path
- */
- public static List readAnomalyString(String path) {
- List messageList = new ArrayList<>();
- try {
- FileReader fr = new FileReader(path);
- BufferedReader br = new BufferedReader(fr);
-
- String line;
- while ((line = br.readLine()) != null) {
- String[] vals = line.split(SPLIT_OP);
- if (vals.length <= 1) {
- System.out.println(line);
- }
- String key = vals[0];
- String msg = vals[1];
- messageList.add(new GWMessage(key, msg));
- }
-
- br.close();
- fr.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- return messageList;
- }
-
- public static void writeAnomalyString(String outputPath, List messageList) {
- try {
- FileWriter fw = new FileWriter(outputPath);
- PrintWriter pw = new PrintWriter(fw);
-
- for (IMessage message : messageList) {
- pw.println(message.getKey() + SPLIT_OP + message.getMsg());
- }
-
- pw.close();
- fw.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public static boolean isGWFileValid(String fileName) {
- return !fileName.startsWith(".");
-// return fileName.endsWith(Constants.GW_EXCEPTION_END);
- }
-
- public static void main(String[] args) {
- String[] outputPaths = { "result/", "output/" };
-
- for(String outputPath : outputPaths) {
- removeOutputDir(outputPath);
- }
- }
-}
diff --git a/pom.xml b/pom.xml
index 680fb150547607ca763addf0c7209769bbd6fae4..1bc6dde4428119b60d61481af4f443dd743d2d17 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,13 +11,18 @@
UTF-8
- 1.8
+ 1.7
2.6.0
+ 2.1.0
+ 2.11
+ 1.4.2
k2de-governance-data-extractor
- k2de-governance-exception-manager
+ k2de-governance-exception-manager-base
+ k2de-governance-exception-manager-kmx
+ k2de-governance-exception-manager-huineng
@@ -45,6 +50,34 @@
sqlite-jdbc
3.20.1
+
+
+ net.sourceforge.argparse4j
+ argparse4j
+ 0.7.0
+
+
+
+ org.apache.spark
+ spark-core_${spark.comp.version}
+ ${spark.version}
+ provided
+
+
+
+ org.apache.spark
+ spark-sql_${spark.comp.version}
+ ${spark.version}
+ provided
+
+
+
+ com.sun
+ tools
+ 1.4.2
+ system
+ ${java.home}/../lib/tools.jar
+