diff --git a/.gitignore b/.gitignore index bf33abfdbce7ddc6fa97e7209845fee559cb3de0..13042f9cefeacc6fafc3bba7d61d3542c80aaa74 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,5 @@ *.csv *META-INF/ + +**/LocalTest.java diff --git a/k2de-governance-data-extractor/pom.xml b/k2de-governance-data-extractor/pom.xml index d8055df60d7144c8ebda2dd244a4eaeb45e6118a..c262e4c01c48b6da947cc2a5c6b9a8fee4775ede 100644 --- a/k2de-governance-data-extractor/pom.xml +++ b/k2de-governance-data-extractor/pom.xml @@ -9,14 +9,14 @@ k2de-governance-data-extractor - 0.0.1-SNAPSHOT + 1.0-SNAPSHOT jar - DataExtractor + k2de-governance-data-extractor UTF-8 - 1.8 + 1.7 2.6.0 @@ -44,8 +44,6 @@ com.sun tools - 1.4.2 - system ${java.home}/../lib/tools.jar diff --git a/k2de-governance-data-extractor/src/main/java/com/k2data/governance/extractor/metadata/GWSqliteMetaDatabase.java b/k2de-governance-data-extractor/src/main/java/com/k2data/governance/extractor/metadata/GWSqliteMetaDatabase.java index 2b880d38ef50415fa9519abbd96ad459ccaea97f..bb7f275811b5fc58101be25686b92be4acbfefe6 100644 --- a/k2de-governance-data-extractor/src/main/java/com/k2data/governance/extractor/metadata/GWSqliteMetaDatabase.java +++ b/k2de-governance-data-extractor/src/main/java/com/k2data/governance/extractor/metadata/GWSqliteMetaDatabase.java @@ -145,7 +145,7 @@ public class GWSqliteMetaDatabase extends AbstractGWMetaDatabase implements IMet String fieldGroup = rs.getString(1).replace(" ","").toLowerCase(); String fieldName = rs.getString(2).replace(" ", "").toLowerCase(); if (!fieldGroupFieldsMap.containsKey(fieldGroup)) { - fieldGroupFieldsMap.put(fieldGroup, new ArrayList<>()); + fieldGroupFieldsMap.put(fieldGroup, new ArrayList()); } fieldGroupFieldsMap.get(fieldGroup).add(fieldName); } diff --git a/k2de-governance-exception-manager-base/README.md b/k2de-governance-exception-manager-base/README.md new file mode 100644 index 0000000000000000000000000000000000000000..c64fee5353930b0b0cf27370a6e5e3ec704b4ed3 --- /dev/null +++ b/k2de-governance-exception-manager-base/README.md @@ -0,0 +1,34 @@ +# 金风 KMX 异常统计工具 + +## 环境说明 +需再 JDK1.7 及以上环境下运行 + +## 使用说明 +* spark 版本基项目入口函数为 `com.k2data.governance.manager.ExceptionManagerSpark.run` +* 运行参数: + usage: ExceptionManager [-h] [-m METADATAFILE] [-t {text,sequence}] [-l] + InputPath OutputPath + * InputPath:文件输入路径 + * OutputPath:结果输出路径 + * -m METADATAFILE:协议点表 SQLite 文件路径 + * -t:输入文件格式,文本文件格式,或 SequenceFile 格式,默认为 seqence + * -l:以 Spark Local 模式运行 + * -h:运行参数说明 + +## 功能概述 +* 输入:异常区异常文件(SequenceFile) +* 输出 + 1. 异常分类储存(暂未实现,输出格式待明确) + 2. 异常类型统计(异常类型,异常计数),输出路径为 `outputPath/TypeCountResult/` + 3. 异常类型细节统计(异常类型,协议号,风机号,异常计数),输出路径为 `outputPath/TypeDetailResult/` + 4. 异常协议统计(协议号,风机计数,异常计数),输出路径为 `outputPath/ProtocolResult/` + +## TODO +* 测试大数据量的效率表现 +* 数据倾斜优化 +* 单元测试 +* 似乎重构之后效率慢了一些,需后续检查测试 +* 执行任务可配置 +* 文档及报告 + + diff --git a/k2de-governance-exception-manager-base/pom.xml b/k2de-governance-exception-manager-base/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..92ed68eec5ddcdbc7a4604c263ea993c0a346511 --- /dev/null +++ b/k2de-governance-exception-manager-base/pom.xml @@ -0,0 +1,113 @@ + + 4.0.0 + + + com.k2data + k2de-governance-parent + 1.0-SNAPSHOT + + + k2de-governance-exception-manager-base + 1.0-SNAPSHOT + jar + + k2de-governance-exception-manager-base + + + + org.apache.hadoop + hadoop-client + + + javax.servlet + * + + + + + + org.apache.hadoop + hadoop-common + + + javax.servlet + * + + + + + + org.apache.hadoop + hadoop-hdfs + + + javax.servlet + * + + + + + + org.xerial + sqlite-jdbc + + + + org.apache.spark + spark-core_${spark.comp.version} + + + + net.sourceforge.argparse4j + argparse4j + + + + org.apache.spark + spark-sql_${spark.comp.version} + + + + com.sun + tools + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + ${compile.version} + ${compile.version} + + + + maven-assembly-plugin + 2.4 + + + + com.k2data.governance.manager.ExceptionManagerSpark + + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/ExceptionManagerSpark.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/ExceptionManagerSpark.java new file mode 100644 index 0000000000000000000000000000000000000000..34a303de0e6b4416c14e976a15e551aba09eb1c4 --- /dev/null +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/ExceptionManagerSpark.java @@ -0,0 +1,243 @@ +package com.k2data.governance.manager; + +import com.k2data.governance.manager.result.IAnalyzeResult; +import com.k2data.governance.manager.common.constants.ArgumentsConstants; +import com.k2data.governance.manager.error.IErrorType; +import com.k2data.governance.manager.filter.IFilter; +import com.k2data.governance.manager.metadata.IMetaDatabase; +import com.k2data.governance.manager.util.InitUtil; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import scala.Tuple2; + +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Set; + +/** + * Created by wangyihan on 2017/12/6 下午3:29. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public class ExceptionManagerSpark { + private Logger logger = Logger.getLogger(ExceptionManagerSpark.class); + private final String originDataOutputPath = "OriginalData"; + private final String typeDetailOutputPath = "TypeDetailResult"; + private final String typeCountOutputPath = "TypeCountResult"; + private final String protocolCountOutputPath = "ProtocolResult"; + private static IFilter errorTypeFilter; + private static IMetaDatabase metadataBase; + + public Logger getLogger() { + return logger; + } + + public static void setFilter(IFilter filter) { + errorTypeFilter = filter; + } + + public static void setMetadataBase(IMetaDatabase database) { + metadataBase = database; + } + + public static Namespace parseArguments(String[] args) { + ArgumentParser argumentParser = ArgumentParsers.newArgumentParser("ExceptionManager") + .defaultHelp(true) + .description("k2data exception data statistics tool"); + argumentParser.addArgument(ArgumentsConstants.INPUT_PATH) + .type(String.class) + .metavar("InputPath") + .help("Input data path, HDFS or local path"); + argumentParser.addArgument(ArgumentsConstants.OUTPUT_PATH) + .type(String.class) + .metavar("OutputPath") + .help("Output data path, HDFS of local path"); + argumentParser.addArgument("-m", "--metadata-file") + .dest(ArgumentsConstants.METADATA_FILE) + .help("Metadata SQLite file"); + argumentParser.addArgument("-t", "--file-type") + .dest(ArgumentsConstants.FILE_TYPE) + .choices(ArgumentsConstants.FILE_TYPE_TEXT, ArgumentsConstants.FILE_TYPE_SEQUENCE) + .setDefault(ArgumentsConstants.FILE_TYPE_SEQUENCE) + .help("Input file format (text, sequence)"); + argumentParser.addArgument("-l", "--local") + .dest(ArgumentsConstants.LOCAL) + .action(Arguments.storeConst()) + .setConst(true) + .setDefault(false) + .help("Run spark with local mode"); + Namespace ns = null; + try { + ns = argumentParser.parseArgs(args); + } catch (ArgumentParserException e) { + argumentParser.handleError(e); + System.exit(1); + } + return ns; + } + + public static void run(String[] args) { + Namespace ns = parseArguments(args); + run(ns); + } + + public static void run(Namespace ns) { + ExceptionManagerSpark app = new ExceptionManagerSpark(); + Logger logger = app.getLogger(); + + String inputFilePath = ns.getString(ArgumentsConstants.INPUT_PATH); + String outputFilePath = ns.getString(ArgumentsConstants.OUTPUT_PATH); + String sourceFileType = ns.getString(ArgumentsConstants.FILE_TYPE); + boolean local = ns.getBoolean(ArgumentsConstants.LOCAL); + + // Initial SparkSession & SparkContext + SparkSession session = InitUtil.initSparkContext(local); + JavaSparkContext sc = new JavaSparkContext(session.sparkContext()); + JavaRDD dataRdd = null; + // Read data + switch (sourceFileType) { + case ArgumentsConstants.FILE_TYPE_SEQUENCE: // convert SequenceFile to JavaRDD + dataRdd = sc.sequenceFile(inputFilePath, Text.class, Text.class) + .map(new Function, String>() { + @Override + public String call(Tuple2 textTextTuple2) throws Exception { + return textTextTuple2._2().toString(); + } + }); + break; + case ArgumentsConstants.FILE_TYPE_TEXT: + dataRdd = sc.textFile(inputFilePath); + break; + default: + logger.error("Invalid file type argument!"); + System.exit(-1); + } + + final Broadcast broadcastFilter = sc.broadcast(errorTypeFilter); + final Broadcast broadcastMetadataBase = sc.broadcast(metadataBase); + // analyze exceptions + assert dataRdd != null; + JavaPairRDD exceptionRdd = dataRdd.mapToPair(new PairFunction() { + IFilter filter = broadcastFilter.value(); + IMetaDatabase metaDatabase = broadcastMetadataBase.value(); + + @Override + public Tuple2 call(String message) throws Exception { + Class errorTypeClass = filter.getErrorClass(message); + // TODO: handle null pointer or invalid errorTypeClass + IErrorType errorType = errorTypeClass.getEnumConstants()[0]; + IAnalyzeResult result = errorType.analyzeRecord(message, metaDatabase); + return new Tuple2<>(result, message); + } + }).cache(); + + // Save by type + String originOutputPath = Paths.get(outputFilePath, app.originDataOutputPath).toString(); + JavaRDD rowRdd = exceptionRdd.map(new Function, Row>() { + @Override + public Row call(Tuple2 pair) throws Exception { + return RowFactory.create(pair._1().getErrorType().toString(), pair._2()); + } + }); + StructType schema = DataTypes.createStructType(new StructField[]{ + DataTypes.createStructField("ErrorType", DataTypes.StringType, false), + DataTypes.createStructField("Data", DataTypes.StringType, false) + }); + Dataset df = session.createDataFrame(rowRdd, schema); + df.write().partitionBy("ErrorType").text(originOutputPath); + + JavaRDD exceptionRddWithoutData = exceptionRdd.map(new Function, IAnalyzeResult>() { + @Override + public IAnalyzeResult call(Tuple2 tuple) throws Exception { + return tuple._1(); + } + }).cache(); + + exceptionRdd.unpersist(); + + // Count exception type + JavaPairRDD typeDetailRdd = exceptionRddWithoutData + .mapToPair(new PairFunction() { + @Override + public Tuple2 call(IAnalyzeResult result) throws Exception { + return new Tuple2<>(result, 1); + } + }) + .reduceByKey(new Function2() { + @Override + public Integer call(Integer value1, Integer value2) throws Exception { + return value1 + value2; + } + }); + String typeDetailOutputPath = Paths.get(outputFilePath, app.typeDetailOutputPath).toString(); + typeDetailRdd.saveAsTextFile(typeDetailOutputPath); + + // Aggregate count by type + JavaPairRDD typeTotalRdd = exceptionRddWithoutData + .mapToPair(new PairFunction() { + @Override + public Tuple2 call(IAnalyzeResult result) throws Exception { + return new Tuple2<>(result.getErrorType(), 1); + } + }) + .reduceByKey(new Function2() { + @Override + public Integer call(Integer count1, Integer count2) throws Exception { + return count1 + count2; + } + }); + String typeOutputPath = Paths.get(outputFilePath, app.typeCountOutputPath).toString(); + typeTotalRdd.saveAsTextFile(typeOutputPath); + + typeDetailRdd.unpersist(); + + // Count by protocol + JavaPairRDD protocolRdd = exceptionRddWithoutData + .groupBy(new Function() { + @Override + public String call(IAnalyzeResult gwAnalyzeResult) throws Exception { + return gwAnalyzeResult.getFieldGroup(); + } + }) + .mapToPair(new PairFunction>, String, String>() { + @Override + public Tuple2 call(Tuple2> pair) throws Exception { + String protocolId = pair._1(); + Iterable results = pair._2(); + int recordCount = 0; + Set wtSet = new HashSet<>(); + for (IAnalyzeResult result : results) { + recordCount += 1; + wtSet.add(result.getAssetId()); + } + return new Tuple2<>(protocolId, String.format("%s,%s", wtSet.size(), recordCount)); + } + }); + String protocolOutputPath = Paths.get(outputFilePath, app.protocolCountOutputPath).toString(); + protocolRdd.saveAsTextFile(protocolOutputPath); + + exceptionRdd.unpersist(); + session.stop(); + } +} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/Pair.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/Pair.java similarity index 100% rename from k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/Pair.java rename to k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/Pair.java diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/ArgumentsConstants.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/ArgumentsConstants.java new file mode 100644 index 0000000000000000000000000000000000000000..91f84ee4a3aa85ae4dd152cad8ba79720210e509 --- /dev/null +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/ArgumentsConstants.java @@ -0,0 +1,18 @@ +package com.k2data.governance.manager.common.constants; + +/** + * Created by wangyihan on 2017/12/28 下午10:58. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public class ArgumentsConstants { + public final static String INPUT_PATH = "inputPath"; + public final static String OUTPUT_PATH = "outputPath"; + public final static String METADATA_FILE = "metadataFile"; + public final static String FILE_TYPE = "fileType"; + public final static String LOCAL = "local"; + public final static String FILE_TYPE_TEXT = "text"; + public final static String FILE_TYPE_SEQUENCE = "sequence"; +} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/IMessage.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/Constants.java similarity index 36% rename from k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/IMessage.java rename to k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/Constants.java index e2fad558e84fd6bf9bc168b7b1ddd9d31afb9af5..8cbd88bfe377a3777293ecdfb4ced26fbe01d7fd 100644 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/IMessage.java +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/Constants.java @@ -1,15 +1,12 @@ -package com.k2data.governance.manager.message; +package com.k2data.governance.manager.common.constants; /** - * Created by stoke on 2017/11/17. + * Created by stoke on 2017/11/16. * E-mail address is zaqthss2009@gmail.com * Copyright © stoke. All Rights Reserved. * * @author stoke */ -public interface IMessage { - void setKey(String key); - String getKey(); - void setMsg(String msg); - String getMsg(); +public class Constants { + public static String GW_META_SQLITE_FILE_DEFAULT = "/tmp/metadata.db"; } diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/SparkConstants.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/SparkConstants.java new file mode 100644 index 0000000000000000000000000000000000000000..d1642b9494490b570e98aca1f0dd2bdfebe05c0d --- /dev/null +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/common/constants/SparkConstants.java @@ -0,0 +1,14 @@ +package com.k2data.governance.manager.common.constants; + +/** + * Created by wangyihan on 2017/12/6 下午4:04. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public class SparkConstants { + public final static String SPARK_LOCAL = "spark.local"; + public final static String SPARK_MASTER = "spark.master"; + public final static String SPARK_APP_NAME = "spark.app.name"; +} diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/error/IErrorType.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/error/IErrorType.java new file mode 100644 index 0000000000000000000000000000000000000000..c6a8464b179d486a3dccc3d5b39102bf2df1aff7 --- /dev/null +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/error/IErrorType.java @@ -0,0 +1,21 @@ +package com.k2data.governance.manager.error; + +import com.k2data.governance.manager.result.IAnalyzeResult; +import com.k2data.governance.manager.metadata.IMetaDatabase; + +/** + * Created by wangyihan on 2017/11/30 下午2:08. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public interface IErrorType { + String toErrorString(); + + int getValue(); + + IAnalyzeResult analyzeRecord(String msg); + + IAnalyzeResult analyzeRecord(String msg, IMetaDatabase metaDatabase); +} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/filter/IFilter.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/filter/IFilter.java similarity index 47% rename from k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/filter/IFilter.java rename to k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/filter/IFilter.java index 0cd3d2da6ca5da4522f93a7d9f5f3cb6be61f661..e10fd08b8bf5bd61b34bf59b894a4c357bafe659 100644 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/filter/IFilter.java +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/filter/IFilter.java @@ -1,8 +1,6 @@ package com.k2data.governance.manager.filter; -import com.k2data.governance.manager.common.type.GWSourceType; -import com.k2data.governance.manager.message.IMessage; -import java.util.List; +import com.k2data.governance.manager.error.IErrorType; /** * Created by stoke on 2017/11/16. @@ -12,7 +10,5 @@ import java.util.List; * @author stoke */ public interface IFilter { - void filter(); - List getMessageList(); - List getCertainSourceTypeList(GWSourceType type); + Class getErrorClass(String msg); } diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/AbstractGWMetaDatabase.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/AbstractMetaDatabase.java similarity index 46% rename from k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/AbstractGWMetaDatabase.java rename to k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/AbstractMetaDatabase.java index 8daa730bc1d009f8df035b68c77d74e71fc4e2b5..e6c3a8a311ce4d8a0190befe295022fa66b34ff0 100644 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/AbstractGWMetaDatabase.java +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/AbstractMetaDatabase.java @@ -2,7 +2,9 @@ package com.k2data.governance.manager.metadata; import java.util.List; import java.util.Map; + import org.apache.log4j.Logger; +import scala.Serializable; /** * Created by wangyihan on 2017/11/17 上午10:20. @@ -11,15 +13,13 @@ import org.apache.log4j.Logger; * * @author wangyihan */ -abstract class AbstractGWMetaDatabase implements IMetaDatabase { - Logger logger = getLogger(); +abstract class AbstractMetaDatabase implements IMetaDatabase, Serializable { - Map wtidFieldGroupMap; + Map assetFieldGroupMap; Map> fieldGroupFieldsMap; - Map wtidWfidMap; protected Logger getLogger() { - return Logger.getLogger(AbstractGWMetaDatabase.class); + return Logger.getLogger(AbstractMetaDatabase.class); } @Override @@ -36,28 +36,16 @@ abstract class AbstractGWMetaDatabase implements IMetaDatabase { } @Override - public String getFieldGroup(String wtid) { - if (wtid == null || wtid.isEmpty()) { - logger.error("Empty wtid: " + wtid); + public String getFieldGroup(String assetId) { + if (assetId == null || assetId.isEmpty()) { + getLogger().error("Empty AssetID: " + assetId); return null; } - String fieldGroup = wtidFieldGroupMap.get(wtid); -// if (fieldGroup == null) { -// logger.error("Could not find wtid info from database: " + wtid); -// } - return fieldGroup; + return assetFieldGroupMap.get(assetId); } @Override public List getFieldsByFieldGroup(String fieldGroup) { -// if (fieldGroup == null || fieldGroup.isEmpty()) { -// logger.error("Empty fieldGroup: " + fieldGroup); -// return null; -// } - List fields = fieldGroupFieldsMap.get(fieldGroup); -// if (fields == null) { -// logger.error("Could not find fields info from database: " + fieldGroup); -// } - return fields; + return fieldGroupFieldsMap.get(fieldGroup); } } diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/IMetaDatabase.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/IMetaDatabase.java similarity index 99% rename from k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/IMetaDatabase.java rename to k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/IMetaDatabase.java index 67f2983a93dabdca75cf044bd23e1296a55dd5bc..ec2ac0c9beb45d74f44700ee5ce2e9ec62f16298 100644 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/IMetaDatabase.java +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/IMetaDatabase.java @@ -11,7 +11,10 @@ import java.util.List; */ public interface IMetaDatabase { void init(); + List getFields(String assetId); + String getFieldGroup(String assetId); + List getFieldsByFieldGroup(String fieldGroup); } diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/GWSqliteMetaDatabase.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/SqliteMetaDatabase.java similarity index 57% rename from k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/GWSqliteMetaDatabase.java rename to k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/SqliteMetaDatabase.java index 41fde8b950f911a4783ddd8495bac55a1c1a981b..0db53b96d81ab599d0e2627d5b167347e9919b36 100644 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/metadata/GWSqliteMetaDatabase.java +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/metadata/SqliteMetaDatabase.java @@ -1,7 +1,8 @@ package com.k2data.governance.manager.metadata; -import com.k2data.governance.manager.common.Constants; +import com.k2data.governance.manager.common.constants.Constants; import com.k2data.governance.manager.common.Pair; + import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; @@ -12,6 +13,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -25,35 +27,40 @@ import org.apache.log4j.Logger; * * @author wangyihan */ -public class GWSqliteMetaDatabase extends AbstractGWMetaDatabase implements IMetaDatabase { +public class SqliteMetaDatabase extends AbstractMetaDatabase { private static final String SQLITE_JDBC_DRIVER = "org.sqlite.JDBC"; private static final String SQLITE_BASE_URL = "jdbc:sqlite:"; - private String dbFilePath; - private Connection conn; + private String assetFieldGroupSql; + private String fieldGroupFieldsSql; - public GWSqliteMetaDatabase(String dbFilePath) { + public SqliteMetaDatabase(String dbFilePath, String assetFieldGroupSql, String fieldGroupFieldsSql) { this.dbFilePath = dbFilePath; + this.assetFieldGroupSql = assetFieldGroupSql; + this.fieldGroupFieldsSql = fieldGroupFieldsSql; init(); } @Override protected Logger getLogger() { - return Logger.getLogger(GWSqliteMetaDatabase.class); + return Logger.getLogger(SqliteMetaDatabase.class); } - public String getWfid(String wtid) { - return wtidWfidMap.get(wtid); - } - - private String getSqliteUrl() { + /** + * Concat jdbc:sqlite with sqlite file path + * If file path is a hdfs address, download the file to local + * + * @return full url of sqlite database + * @throws IOException when initial file system failed + */ + private String getSqliteUrl() throws IOException { if (!dbFilePath.toLowerCase().startsWith("hdfs")) { return SQLITE_BASE_URL + dbFilePath; } Pattern p = Pattern.compile("^([A-Za-z]+:/+[^/]+).*"); Matcher m = p.matcher(dbFilePath); if (!m.find()) { - logger.error("Cannot parse the hdfs path:" + dbFilePath); + getLogger().error("Cannot parse the hdfs path:" + dbFilePath); return SQLITE_BASE_URL + dbFilePath; } String hdfs = m.group(1); @@ -65,127 +72,104 @@ public class GWSqliteMetaDatabase extends AbstractGWMetaDatabase implements IMet FileSystem fs = FileSystem.get(conf); fs.copyToLocalFile(false, hdfsPath, localPath); } catch (IOException e) { - logger.error("Initial file system failed"); - e.printStackTrace(); + getLogger().error("Initial file system failed"); + throw e; } return SQLITE_BASE_URL + Constants.GW_META_SQLITE_FILE_DEFAULT; } - private void connect() { - String url = getSqliteUrl(); - if (conn != null) { - try { - conn.close(); - } catch (SQLException e) { - logger.error("Close connection failed: " + e.getMessage()); - } - } + private Connection connect() { + String url = null; try { + url = getSqliteUrl(); Class.forName(SQLITE_JDBC_DRIVER); + } catch (IOException e) { + e.printStackTrace(); + System.exit(-1); } catch (ClassNotFoundException e) { - logger.error("Load SQLite driver error: "+e.getMessage()); + getLogger().error("Load SQLite driver error: " + e.getMessage()); System.exit(-1); } try { - conn = DriverManager.getConnection(url); + Connection conn = DriverManager.getConnection(url); Statement statement = conn.createStatement(); statement.execute("PRAGMA synchronous=OFF"); statement.close(); + return conn; } catch (SQLException e) { - logger.error("Build connection failed: " + e.getMessage()); + getLogger().error("Build connection failed: " + e.getMessage()); System.exit(-1); } + return null; } - private Pair executeSql(String sql) { + private Pair executeSql(Connection conn, String sql) { if (conn == null) { - connect(); + conn = connect(); } Statement stat = null; ResultSet rs = null; try { + assert conn != null; stat = conn.createStatement(); rs = stat.executeQuery(sql); } catch (SQLException e) { String err = String.format("Execute SQL (%s) failed: %s", sql, e.getMessage()); - logger.error(err); + getLogger().error(err); } return new Pair<>(rs, stat); } - private void buildWtidFieldGroupMap() { - if (wtidFieldGroupMap == null) { - wtidFieldGroupMap = new HashMap<>(); + private void buildAssetFieldGroupMap(Connection conn) { + if (assetFieldGroupMap == null) { + assetFieldGroupMap = new HashMap<>(); } - String sql = "SELECT wtid, protocolid FROM wtinfo"; - Pair resultPair = executeSql(sql); + Pair resultPair = executeSql(conn, assetFieldGroupSql); ResultSet rs = resultPair.fst; try { while (rs.next()) { - String wtid = rs.getString(1).replace(" ","").toLowerCase(); - String fieldGroup = rs.getString(2).replace(" ","").toLowerCase(); - wtidFieldGroupMap.put(wtid, fieldGroup); + String wtid = rs.getString(1).replace(" ", "").toLowerCase(); + String fieldGroup = rs.getString(2).replace(" ", "").toLowerCase(); + assetFieldGroupMap.put(wtid, fieldGroup); } rs.close(); resultPair.snd.close(); } catch (SQLException e) { - logger.error("Read field group result set error: " + e.getMessage()); + getLogger().error("Read field group result set error: " + e.getMessage()); e.printStackTrace(); } } - private void buildFieldGroupFieldsMap() { + private void buildFieldGroupFieldsMap(Connection conn) { if (fieldGroupFieldsMap == null) { fieldGroupFieldsMap = new HashMap<>(); } String sql = "SELECT protocolid, iecpath FROM propaths " + "WHERE iecpath <> 'WTUR.Tm.Rw.Dt' AND transtype='1' AND bsave=1 " + "ORDER BY protocolid, pathid"; - Pair resultPair = executeSql(sql); + Pair resultPair = executeSql(conn, fieldGroupFieldsSql); ResultSet rs = resultPair.fst; try { while (rs.next()) { - String fieldGroup = rs.getString(1).replace(" ","").toLowerCase(); + String fieldGroup = rs.getString(1).replace(" ", "").toLowerCase(); String fieldName = rs.getString(2).replace(" ", "").toLowerCase(); if (!fieldGroupFieldsMap.containsKey(fieldGroup)) { - fieldGroupFieldsMap.put(fieldGroup, new ArrayList<>()); + fieldGroupFieldsMap.put(fieldGroup, new ArrayList()); } fieldGroupFieldsMap.get(fieldGroup).add(fieldName); } rs.close(); resultPair.snd.close(); } catch (SQLException e) { - logger.error("Read fields result set error: " + e.getMessage()); - e.printStackTrace(); - } - } - - private void buildWfidMap() { - if (wtidWfidMap == null) { - wtidWfidMap = new HashMap<>(); - } - String sql = "SELECT wtid,wfid FROM wtinfo"; - Pair resultPair = executeSql(sql); - ResultSet rs = resultPair.fst; - try { - while (rs.next()) { - String wtid = rs.getString(1).replace(" ", "").toLowerCase(); - String wfid = rs.getString(2).replace(" ", "").toLowerCase(); - wtidWfidMap.put(wtid, wfid); - } - rs.close(); - resultPair.snd.close(); - } catch (SQLException e) { - logger.error("Read WFID result set error: " + e.getMessage()); + getLogger().error("Read fields result set error: " + e.getMessage()); e.printStackTrace(); } } @Override public void init() { - connect(); - buildWtidFieldGroupMap(); - buildFieldGroupFieldsMap(); - buildWfidMap(); + Connection conn = connect(); + buildAssetFieldGroupMap(conn); + buildFieldGroupFieldsMap(conn); } } diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/AnalyzeResultImpl.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/AnalyzeResultImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..e6aabf2b2704df5fdfa3d690ef60b21adc061561 --- /dev/null +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/AnalyzeResultImpl.java @@ -0,0 +1,113 @@ +package com.k2data.governance.manager.result; + +import com.k2data.governance.manager.error.IErrorType; +import scala.Serializable; + +/** + * Created by wangyihan on 2017/12/19 下午6:32. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public class AnalyzeResultImpl implements IAnalyzeResult, Serializable { + + private IErrorType errorType; + private IAsset asset; + + public AnalyzeResultImpl() { + } + + public AnalyzeResultImpl(IErrorType errorType) { + this(errorType, null); + } + + public AnalyzeResultImpl(IErrorType errorType, IAsset asset) { + this.errorType = errorType; + this.asset = asset; + } + + @Override + public void setErrorType(IErrorType errorType) { + this.errorType = errorType; + } + + @Override + public void setAsset(IAsset asset) { + this.asset = asset; + } + + @Override + public void setFieldGroup(String fieldGroup) { + if (this.asset == null) { + this.asset = new AssetImpl(); + } + this.asset.setFieldGroup(fieldGroup); + } + + @Override + public void setAssetId(String assetId) { + if (this.asset == null) { + this.asset = new AssetImpl(); + } + this.asset.setAssetId(assetId); + } + + @Override + public IErrorType getErrorType() { + return errorType; + } + + @Override + public IAsset getAsset() { + return asset; + } + + @Override + public String getFieldGroup() { + if (this.asset == null) { + return null; + } + return this.asset.getFieldGroup(); + } + + @Override + public String getAssetId() { + if (this.asset == null) { + return null; + } + return this.asset.getAssetId(); + } + + @Override + public String toString() { + return String.format("%s,%s", errorType, asset); + } + + @Override + public int hashCode() { + return this.toString().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof AnalyzeResultImpl)) { + return false; + } + AnalyzeResultImpl resultObj = (AnalyzeResultImpl) obj; + if (this.asset == null && resultObj.getAsset() != null) { + return false; + } else if (this.asset != null && !this.asset.equals(resultObj.getAsset())) { + return false; + } else if (this.errorType != resultObj.getErrorType()) { + return false; + } + return true; + } +} diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/AssetImpl.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/AssetImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..60976ed89feac434673ca6532ab66c1cbe2a417d --- /dev/null +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/AssetImpl.java @@ -0,0 +1,78 @@ +package com.k2data.governance.manager.result; + +import scala.Serializable; + +/** + * Created by wangyihan on 2017/12/19 下午4:53. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public class AssetImpl implements IAsset, Serializable { + private String fieldGroup; + private String assetId; + + public AssetImpl() { + } + + public AssetImpl(String fieldGroup, String assetId) { + this.fieldGroup = fieldGroup; + this.assetId = assetId; + } + + @Override + public String getFieldGroup() { + return fieldGroup; + } + + @Override + public String getAssetId() { + return assetId; + } + + @Override + public void setFieldGroup(String fieldGroup) { + this.fieldGroup = fieldGroup; + } + + @Override + public void setAssetId(String assetId) { + this.assetId = assetId; + } + + @Override + public String toString() { + return String.format("%s,%s", fieldGroup, assetId); + } + + @Override + public int hashCode() { + return this.toString().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof AssetImpl)) { + return false; + } + AssetImpl assetObj = (AssetImpl) obj; + if (this.assetId != null && !this.assetId.equals(assetObj.getAssetId())) { + return false; + } else if (this.assetId == null && assetObj.getAssetId() != null) { + return false; + } + if (this.fieldGroup != null && !this.fieldGroup.equals(assetObj.getFieldGroup())) { + return false; + } else if (this.fieldGroup == null && assetObj.getFieldGroup() != null) { + return false; + } + return true; + } +} diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/IAnalyzeResult.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/IAnalyzeResult.java new file mode 100644 index 0000000000000000000000000000000000000000..d520e69b6a405bf004d4366f79898451e558a8a1 --- /dev/null +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/IAnalyzeResult.java @@ -0,0 +1,28 @@ +package com.k2data.governance.manager.result; + +import com.k2data.governance.manager.error.IErrorType; + +/** + * Created by wangyihan on 2017/12/19 下午4:39. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public interface IAnalyzeResult { + void setErrorType(IErrorType errorType); + + void setAsset(IAsset asset); + + void setFieldGroup(String fieldGroup); + + void setAssetId(String assetId); + + IErrorType getErrorType(); + + IAsset getAsset(); + + String getFieldGroup(); + + String getAssetId(); +} diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/IAsset.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/IAsset.java new file mode 100644 index 0000000000000000000000000000000000000000..3473a8fd36ad43e088c5939e3044760cbbf5c9f0 --- /dev/null +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/result/IAsset.java @@ -0,0 +1,18 @@ +package com.k2data.governance.manager.result; + +/** + * Created by wangyihan on 2017/12/19 下午4:44. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public interface IAsset { + void setAssetId(String assetId); + + void setFieldGroup(String fieldGroup); + + String getAssetId(); + + String getFieldGroup(); +} diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/ConfigureUtil.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/ConfigureUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..929a03c1b23764999b8def3c595f33dbea876ad1 --- /dev/null +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/ConfigureUtil.java @@ -0,0 +1,220 @@ +package com.k2data.governance.manager.util; + +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +/** + * Created by wangyihan on 2017/12/6 下午3:58. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public class ConfigureUtil { + private static Properties properties = new Properties(); + private static final Logger logger = Logger.getLogger(ConfigureUtil.class); + + static { + InputStream in = ConfigureUtil.class.getClassLoader().getResourceAsStream("config.properties"); + try { + properties.load(in); + } catch (IOException e) { + logger.warn("No User Custom Properties File"); + } + } + + /** + * get property value with property key + * + * @param key property key + * @return + */ + private static String getProperty(String key) { + try { + return properties.getProperty(key); + } catch (Exception e) { + e.printStackTrace(); + } + + return null; + } + + /** + * get property value with property key + * + * @param key property key + * @param defaultValue + * @return + */ + private static String getProperty(String key, String defaultValue) { + try { + return properties.getProperty(key, defaultValue); + } catch (Exception e) { + e.printStackTrace(); + } + + return null; + } + + /** + * get string value with property key + * + * @param key + * @return + */ + public static String getString(String key) { + return getProperty(key); + } + + + /** + * get string value with property key + * + * @param key + * @param defaultValue + * @return + */ + public static String getString(String key, String defaultValue) { + return getProperty(key, defaultValue); + } + + /** + * get integer value with property key + * + * @param key + * @return + */ + public static Integer getInteger(String key) { + String value = getProperty(key); + try { + return Integer.valueOf(value); + } catch (Exception e) { + e.printStackTrace(); + } + + return 0; + } + + /** + * get integer value with property key + * + * @param key + * @param defaultValue + * @return + */ + public static Integer getInteger(String key, Integer defaultValue) { + String value = getProperty(key); + try { + return (value == null) ? defaultValue : Integer.valueOf(value); + } catch (Exception e) { + e.printStackTrace(); + } + + return 0; + } + + + /** + * get boolean value with property key + * + * @param key + * @return + */ + public static Boolean getBoolean(String key) { + String value = getProperty(key); + try { + return Boolean.valueOf(value); + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + + /** + * get boolean value with property key + * + * @param key + * @param defaultValue + * @return + */ + public static Boolean getBoolean(String key, Boolean defaultValue) { + String value = getProperty(key); + try { + return (value == null) ? defaultValue : Boolean.valueOf(value); + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + /** + * get long value with property key + * + * @param key + * @return + */ + public static Long getLong(String key) { + String value = getProperty(key); + try { + return Long.valueOf(value); + } catch (Exception e) { + e.printStackTrace(); + } + return 0L; + } + + /** + * get long value with property key + * + * @param key + * @param defaultValue + * @return + */ + public static Long getLong(String key, Long defaultValue) { + String value = getProperty(key); + try { + return (value == null) ? defaultValue : Long.valueOf(value); + } catch (Exception e) { + e.printStackTrace(); + } + return 0L; + } + + /** + * get double value with property key + * + * @param key + * @return + */ + public static Double getDouble(String key) { + String value = getProperty(key); + try { + return Double.valueOf(value); + } catch (Exception e) { + e.printStackTrace(); + } + return 0.0D; + } + + /** + * get double value with property key + * + * @param key + * @param defaultValue + * @return + */ + public static Double getDouble(String key, Double defaultValue) { + String value = getProperty(key); + try { + return (value == null) ? defaultValue : Double.valueOf(value); + } catch (Exception e) { + e.printStackTrace(); + } + return 0.0D; + } + +} diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/DateUtil.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/DateUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..6ab24a2cbd1e3517c5bef3c72ffd81da5cf10c6c --- /dev/null +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/DateUtil.java @@ -0,0 +1,38 @@ +package com.k2data.governance.manager.util; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Created by wangyihan on 2017/12/29 下午6:46. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public class DateUtil { + private static ThreadLocal local = new ThreadLocal(); + private static String formatString = "yyyy-MM-dd HH:mm:ss"; + + public static void setDateFormat(String formatStr) { + formatString = formatStr; + } + + private static SimpleDateFormat getDateFormat() { + SimpleDateFormat dateFormat = local.get(); + if (dateFormat == null) { + dateFormat = new SimpleDateFormat(formatString); + local.set(dateFormat); + } + return dateFormat; + } + + public static String format(Date date) { + return getDateFormat().format(date); + } + + public static Date parse(String dateStr) throws ParseException { + return getDateFormat().parse(dateStr); + } +} diff --git a/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/InitUtil.java b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/InitUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..5a96f8253aa28b92c37733bdbd85139b79d8c77d --- /dev/null +++ b/k2de-governance-exception-manager-base/src/main/java/com/k2data/governance/manager/util/InitUtil.java @@ -0,0 +1,34 @@ +package com.k2data.governance.manager.util; + +import com.k2data.governance.manager.common.constants.SparkConstants; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +/** + * Created by wangyihan on 2017/12/6 下午3:42. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public class InitUtil { + public static SparkSession initSparkContext() { + SparkSession.Builder builder = SparkSession.builder(); + boolean local = ConfigureUtil.getBoolean(SparkConstants.SPARK_LOCAL); + builder.appName(ConfigureUtil.getString(SparkConstants.SPARK_APP_NAME)); + if (local) { + builder.master(ConfigureUtil.getString(SparkConstants.SPARK_MASTER)); + } + return builder.getOrCreate(); + } + + public static SparkSession initSparkContext(boolean local) { + SparkSession.Builder builder = SparkSession.builder(); + builder.appName(ConfigureUtil.getString(SparkConstants.SPARK_APP_NAME)); + if (local) { + builder.master(ConfigureUtil.getString(SparkConstants.SPARK_MASTER)); + } + return builder.getOrCreate(); + } +} diff --git a/k2de-governance-exception-manager-base/src/main/resources/config.properties b/k2de-governance-exception-manager-base/src/main/resources/config.properties new file mode 100644 index 0000000000000000000000000000000000000000..966d41988e5ecae4168346e92a874479cda835eb --- /dev/null +++ b/k2de-governance-exception-manager-base/src/main/resources/config.properties @@ -0,0 +1,3 @@ +spark.local=false +spark.master=local[4] +spark.app.name=GWExceptionManager \ No newline at end of file diff --git a/k2de-governance-exception-manager/src/main/resources/log4j.properties b/k2de-governance-exception-manager-base/src/main/resources/log4j.properties similarity index 87% rename from k2de-governance-exception-manager/src/main/resources/log4j.properties rename to k2de-governance-exception-manager-base/src/main/resources/log4j.properties index a1426d2bc2e61a387ea703be3f5c73a9aa809f4e..68429adc65d3b1c50b8dc202f9fb0dd704a8ab1d 100644 --- a/k2de-governance-exception-manager/src/main/resources/log4j.properties +++ b/k2de-governance-exception-manager-base/src/main/resources/log4j.properties @@ -1,6 +1,6 @@ # You should copy this file to resources, and rename it as log4j.properties -log4j.rootLogger=INFO,stdout +log4j.rootLogger=ERROR,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c - %m%n \ No newline at end of file diff --git a/k2de-governance-exception-manager-huineng/pom.xml b/k2de-governance-exception-manager-huineng/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..7fcddcd49dd559cd1b02c47e3002c41d6c2daa9b --- /dev/null +++ b/k2de-governance-exception-manager-huineng/pom.xml @@ -0,0 +1,71 @@ + + + + k2de-governance-parent + com.k2data + 1.0-SNAPSHOT + + 4.0.0 + + k2de-governance-exception-manager-huineng + + + + com.k2data + k2de-governance-exception-manager-base + 1.0-SNAPSHOT + + + com.k2data + k2de-governance-exception-manager-kmx + 1.0-SNAPSHOT + + + org.apache.spark + spark-core_${spark.comp.version} + + + org.apache.spark + spark-sql_${spark.comp.version} + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + ${compile.version} + ${compile.version} + + + + maven-assembly-plugin + 2.4 + + + + com.k2data.governance.manager.ExceptionManagerSpark + + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + \ No newline at end of file diff --git a/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnErrorType.java b/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnErrorType.java new file mode 100644 index 0000000000000000000000000000000000000000..99255c34fa29b668fa5b9624ffab960a9107a298 --- /dev/null +++ b/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnErrorType.java @@ -0,0 +1,118 @@ +package com.k2data.governance.manager.huineng; + +import com.k2data.governance.manager.error.IErrorType; +import com.k2data.governance.manager.metadata.IMetaDatabase; +import com.k2data.governance.manager.result.AnalyzeResultImpl; +import com.k2data.governance.manager.result.IAnalyzeResult; +import com.k2data.governance.manager.util.DateUtil; +import scala.Serializable; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.List; + +/** + * Created by wangyihan on 2017/12/29 下午12:01. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public enum HnErrorType implements IErrorType, Serializable { + FORMAT(0), // no realtime| in the msg + WTID_NULL(1), // wtid is not in DB + TIME(2), // time format wrong + PROTO_NULL(3), // protocolId is not in DB + FIELDS_NUM_MORE(4), // number of data more than in the DB + FIELDS_NUM_LESS(5), // number of data less than in the DB + WTID_EMPTY(6), + OTHER(7); + + public static String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; + private final int value; + private static String HN_DATA_START_TAG = "realtimedata|"; + private static int fieldDiffTolerance = 30; + private static SimpleDateFormat format = new SimpleDateFormat(TIME_FORMAT); + + HnErrorType(int value) { + this.value = value; + } + + public String toErrorString() { + return String.format("HN_ERROR_%s", this); + } + + public int getValue() { + return value; + } + + public static HnErrorType fromInteger(int id) { + return HnErrorType.values()[id]; + } + + + public void setFiledDiffTolerance(int tolerance) { + fieldDiffTolerance = tolerance; + } + + @Override + public IAnalyzeResult analyzeRecord(String msg) { + return this.analyzeRecord(msg, null); + } + + @Override + public IAnalyzeResult analyzeRecord(String msg, IMetaDatabase metaDatabase) { + IAnalyzeResult result = new AnalyzeResultImpl(); + int formatIndex = msg.indexOf(HN_DATA_START_TAG); + if (formatIndex < 0) { + result.setErrorType(HnErrorType.FORMAT); + return result; + } + int startIndex = formatIndex + HN_DATA_START_TAG.length(); + String dataSubstr = msg.substring(startIndex); + String[] splitData = dataSubstr.split("\\|"); + String wtid = splitData[0].trim(); + String[] data = splitData[1].split(","); + String ts = data[0]; + try { + DateUtil.parse(ts); + } catch (ParseException e) { + result.setErrorType(HnErrorType.TIME); + } + if (wtid.trim().isEmpty()) { + result.setErrorType(HnErrorType.WTID_EMPTY); + return result; + } + result.setAssetId(wtid); + if (metaDatabase == null) { + result.setErrorType(HnErrorType.OTHER); + return result; + } + String fieldGroup = metaDatabase.getFieldGroup(wtid); + if (fieldGroup == null) { + result.setErrorType(HnErrorType.WTID_NULL); + return result; + } + result.setFieldGroup(fieldGroup); + List fields = metaDatabase.getFieldsByFieldGroup(fieldGroup); + if (fields == null) { + result.setErrorType(HnErrorType.PROTO_NULL); + result.setAssetId(null); // this type of exception dose not need wtid + return result; + } + if (fields.size() - (data.length - 1) > fieldDiffTolerance) { + result.setErrorType(HnErrorType.FIELDS_NUM_LESS); + return result; + } + if ((data.length - 1) - fields.size() > fieldDiffTolerance) { + result.setErrorType(HnErrorType.FIELDS_NUM_MORE); + return result; + } + + // if not in all errors above, then put it in OTHER + if (result.getErrorType() != HnErrorType.TIME) { + result.setErrorType(HnErrorType.OTHER); + } + return result; + } +} diff --git a/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnExceptionManagerMain.java b/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnExceptionManagerMain.java new file mode 100644 index 0000000000000000000000000000000000000000..fca9034e17eacacb20f50b1f092de418d26c67d0 --- /dev/null +++ b/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnExceptionManagerMain.java @@ -0,0 +1,34 @@ +package com.k2data.governance.manager.huineng; + +import com.k2data.governance.manager.ExceptionManagerSpark; +import com.k2data.governance.manager.common.constants.ArgumentsConstants; +import com.k2data.governance.manager.filter.IFilter; +import com.k2data.governance.manager.metadata.IMetaDatabase; +import com.k2data.governance.manager.metadata.SqliteMetaDatabase; +import com.k2data.governance.manager.util.DateUtil; +import net.sourceforge.argparse4j.inf.Namespace; + +/** + * Created by wangyihan on 2017/12/29 下午5:10. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public class HnExceptionManagerMain { + + public static void main(String[] args) { + DateUtil.setDateFormat(HnErrorType.TIME_FORMAT); + Namespace ns = ExceptionManagerSpark.parseArguments(args); + String metadataFilePath = ns.getString(ArgumentsConstants.METADATA_FILE); + String wtidProtocolSql = "SELECT wtid, protocolid FROM wtinfo"; + String protocolFieldsSql = "SELECT protocolid, iecpath FROM propaths " + + "WHERE iecpath <> 'WTUR.Tm.Rw.Dt' AND transtype='1' AND bsave=1 " + + "ORDER BY protocolid, pathid"; + IMetaDatabase metadataBase = new SqliteMetaDatabase(metadataFilePath, wtidProtocolSql, protocolFieldsSql); + ExceptionManagerSpark.setMetadataBase(metadataBase); + IFilter filter = new HnFilter(); + ExceptionManagerSpark.setFilter(filter); + ExceptionManagerSpark.run(ns); + } +} diff --git a/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnFilter.java b/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..1fed7e0801f21a9c564d67d1ff25e61b30d7836c --- /dev/null +++ b/k2de-governance-exception-manager-huineng/src/main/java/com/k2data/governance/manager/huineng/HnFilter.java @@ -0,0 +1,31 @@ +package com.k2data.governance.manager.huineng; + +import com.k2data.governance.manager.filter.IFilter; +import com.k2data.governance.manager.kmx.KmxErrorType; +import org.apache.log4j.Logger; +import scala.Serializable; + +/** + * Created by wangyihan on 2017/12/29 下午2:43. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public class HnFilter implements IFilter, Serializable { + private String KMX_START_TAG = "[EXCEPTION]"; + private String HN_START_TAG = "sediment"; + + public HnFilter() { + } + + @Override + public Class getErrorClass(String message) { + if (message.startsWith(KMX_START_TAG)) { + return KmxErrorType.class; + } else if (message.startsWith(HN_START_TAG)) { + return HnErrorType.class; + } + return null; + } +} diff --git a/k2de-governance-exception-manager-huineng/src/main/resources/log4j.properties b/k2de-governance-exception-manager-huineng/src/main/resources/log4j.properties new file mode 100644 index 0000000000000000000000000000000000000000..68429adc65d3b1c50b8dc202f9fb0dd704a8ab1d --- /dev/null +++ b/k2de-governance-exception-manager-huineng/src/main/resources/log4j.properties @@ -0,0 +1,6 @@ +# You should copy this file to resources, and rename it as log4j.properties + +log4j.rootLogger=ERROR,stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c - %m%n \ No newline at end of file diff --git a/k2de-governance-exception-manager-kmx/pom.xml b/k2de-governance-exception-manager-kmx/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..3c545fb5b34262cd85e0f626cb6eeba3db61a77d --- /dev/null +++ b/k2de-governance-exception-manager-kmx/pom.xml @@ -0,0 +1,67 @@ + + + + com.k2data + k2de-governance-parent + 1.0-SNAPSHOT + + 4.0.0 + + k2de-governance-exception-manager-kmx + + + + com.k2data + k2de-governance-exception-manager-base + 1.0-SNAPSHOT + + + org.apache.spark + spark-core_${spark.comp.version} + + + org.apache.spark + spark-sql_${spark.comp.version} + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + ${compile.version} + ${compile.version} + + + + maven-assembly-plugin + 2.4 + + + + com.k2data.governance.manager.ExceptionManagerSpark + + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + \ No newline at end of file diff --git a/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxErrorType.java b/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxErrorType.java new file mode 100644 index 0000000000000000000000000000000000000000..71f8c6c244cc4c971d4801ae467859d4d04ffe08 --- /dev/null +++ b/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxErrorType.java @@ -0,0 +1,91 @@ +package com.k2data.governance.manager.kmx; + +import com.k2data.governance.manager.error.IErrorType; +import com.k2data.governance.manager.metadata.IMetaDatabase; +import com.k2data.governance.manager.result.AnalyzeResultImpl; +import com.k2data.governance.manager.result.IAnalyzeResult; +import org.apache.log4j.Logger; +import scala.Serializable; + +/** + * Created by wangyihan on 2017/12/29 上午11:32. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public enum KmxErrorType implements IErrorType, Serializable { + DOUBLE(0), BOOLEAN(1), LONG(2), STRING(3), INT(4), FLOAT(5), OTHER(6); + + private final int value; + private static final String TYPE_PREFIX = ": "; + private static final String TYPE_SUFFIX = "_EX"; + private final Logger logger = Logger.getLogger(KmxErrorType.class); + + KmxErrorType() { + this.value = 6; + } + + KmxErrorType(int value) { + this.value = value; + } + + @Override + public int getValue() { + return value; + } + + @Override + public String toErrorString() { + return String.format("KMX_ERROR_DATATYPE_%s", this); + } + + public static KmxErrorType fromInteger(int id) { + return KmxErrorType.values()[id]; + } + + public static KmxErrorType fromString(String type) { + KmxErrorType dataType; + switch (type.toLowerCase()) { + case "double": + dataType = KmxErrorType.DOUBLE; + break; + case "boolean": + dataType = KmxErrorType.BOOLEAN; + break; + case "long": + dataType = KmxErrorType.LONG; + break; + case "string": + dataType = KmxErrorType.STRING; + break; + case "int": + dataType = KmxErrorType.INT; + break; + case "float": + dataType = KmxErrorType.FLOAT; + break; + default: + dataType = KmxErrorType.OTHER; + break; + } + + return dataType; + } + + @Override + public IAnalyzeResult analyzeRecord(String msg) { + int indexType = msg.indexOf(TYPE_SUFFIX); + if (indexType == -1) { + return new AnalyzeResultImpl(KmxErrorType.OTHER); + } + String type = msg.substring(msg.indexOf(TYPE_PREFIX) + TYPE_PREFIX.length(), indexType); + return new AnalyzeResultImpl(KmxErrorType.fromString(type)); + } + + @Override + public IAnalyzeResult analyzeRecord(String msg, IMetaDatabase database) { + return analyzeRecord(msg); + } + +} diff --git a/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxExceptionManagerMain.java b/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxExceptionManagerMain.java new file mode 100644 index 0000000000000000000000000000000000000000..37a05022073a7df685692a51fd6b3aa3d418821e --- /dev/null +++ b/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxExceptionManagerMain.java @@ -0,0 +1,20 @@ +package com.k2data.governance.manager.kmx; + +import com.k2data.governance.manager.ExceptionManagerSpark; +import com.k2data.governance.manager.filter.IFilter; + +/** + * Created by wangyihan on 2017/12/29 下午5:18. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public class KmxExceptionManagerMain { + + public static void main(String[] args) { + IFilter filter = new KmxFilter(); + ExceptionManagerSpark.setFilter(filter); + ExceptionManagerSpark.run(args); + } +} diff --git a/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxFilter.java b/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..22d3b84d063321dcc76918c423a7587828d88b62 --- /dev/null +++ b/k2de-governance-exception-manager-kmx/src/main/java/com/k2data/governance/manager/kmx/KmxFilter.java @@ -0,0 +1,19 @@ +package com.k2data.governance.manager.kmx; + +import com.k2data.governance.manager.filter.IFilter; +import scala.Serializable; + +/** + * Created by wangyihan on 2017/12/29 上午11:27. + * E-mail address is yihanwang22@163.com. + * Copyright © 2017 wangyihan. All Rights Reserved. + * + * @author wangyihan + */ +public class KmxFilter implements IFilter, Serializable { + + @Override + public Class getErrorClass(String message) { + return KmxErrorType.class; + } +} diff --git a/k2de-governance-exception-manager-kmx/src/main/resources/log4j.properties b/k2de-governance-exception-manager-kmx/src/main/resources/log4j.properties new file mode 100644 index 0000000000000000000000000000000000000000..68429adc65d3b1c50b8dc202f9fb0dd704a8ab1d --- /dev/null +++ b/k2de-governance-exception-manager-kmx/src/main/resources/log4j.properties @@ -0,0 +1,6 @@ +# You should copy this file to resources, and rename it as log4j.properties + +log4j.rootLogger=ERROR,stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c - %m%n \ No newline at end of file diff --git a/k2de-governance-exception-manager/pom.xml b/k2de-governance-exception-manager/pom.xml deleted file mode 100644 index dc1cef1972f5e20e1d983d231ffe18dba6e1250c..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/pom.xml +++ /dev/null @@ -1,92 +0,0 @@ - - 4.0.0 - - - com.k2data - k2de-governance-parent - 1.0-SNAPSHOT - - - k2de-governance-exception-manager - 0.0.1-SNAPSHOT - jar - - ExceptionManager - http://maven.apache.org - - - UTF-8 - 1.8 - 2.6.0 - - - - - org.apache.hadoop - hadoop-client - - - - org.apache.hadoop - hadoop-common - - - - org.apache.hadoop - hadoop-hdfs - - - - org.xerial - sqlite-jdbc - - - - com.sun - tools - 1.4.2 - system - ${java.home}/../lib/tools.jar - - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.6.1 - - ${compile.version} - ${compile.version} - - - - maven-assembly-plugin - 2.4 - - - - com.k2data.governance.manager.ExceptionManagerSingle - - - - jar-with-dependencies - - - - - make-assembly - package - - single - - - - - - - diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/ExceptionManagerSingle.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/ExceptionManagerSingle.java deleted file mode 100644 index 2d8d64763afb2bd8bb2815d66680640dcabbdeb6..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/ExceptionManagerSingle.java +++ /dev/null @@ -1,167 +0,0 @@ -package com.k2data.governance.manager; - -import com.k2data.governance.manager.analyzer.GWhnAnalyzer; -import com.k2data.governance.manager.analyzer.GWkmxAnalyzer; -import com.k2data.governance.manager.analyzer.IAnalyzer; -import com.k2data.governance.manager.common.Constants; -import com.k2data.governance.manager.common.type.GWDataType; -import com.k2data.governance.manager.common.type.GWSourceType; -import com.k2data.governance.manager.common.type.HNErrorType; -import com.k2data.governance.manager.filter.GWFilter; -import com.k2data.governance.manager.filter.IFilter; -import com.k2data.governance.manager.message.IMessage; -import com.k2data.governance.manager.parser.GWParserSingle; -import com.k2data.governance.manager.util.FileHandler; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.List; - -/** - * Created by stoke on 2017/11/16. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * Main process for exception manager - * - * @author stoke - */ -public class ExceptionManagerSingle { - - private int[] kmxTypeNums; - private int[] hnTypeNums; - - public ExceptionManagerSingle() { - kmxTypeNums = new int[GWDataType.values().length]; - hnTypeNums = new int[HNErrorType.values().length]; - } - - public int[] getKmxTypeNums() { - return kmxTypeNums; - } - - public int[] getHnTypeNums() { - return hnTypeNums; - } - - private void sumTypeNums(int[] singleTypeNums, int[] finalTypeNums) { - if (singleTypeNums.length != finalTypeNums.length) { - System.out.println("Length does not match"); - return; - } - for (int i = 0; i < finalTypeNums.length; ++i) { - finalTypeNums[i] += singleTypeNums[i]; - } - } - - private void printResult(String outputPath) { - String kmxFile = outputPath + "kmx.out"; - String hnFile = outputPath + "hn.out"; - - try { - FileWriter fw = new FileWriter(kmxFile); - PrintWriter pw = new PrintWriter(fw); - for (GWDataType type : GWDataType.values()) { - pw.println(type + ":" + kmxTypeNums[type.getValue()]); - } - pw.close(); - fw.close(); - - fw = new FileWriter(hnFile); - pw = new PrintWriter(fw); - for (HNErrorType type : HNErrorType.values()) { - pw.println(type + ":" + hnTypeNums[type.getValue()]); - } - pw.close(); - fw.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public static void main(String[] args) { - System.out.println("ExceptionManagerSingle begin..."); - - if (args.length != 5) { - System.out.println("Input arguments number error:"); - System.out.println("args[0]: 0:sequence file; 1:file after parse;"); - System.out.println("args[1]: the input directory;"); - System.out.println("args[2]: the number of example to be printed;"); - System.out.println("args[3]: the configdata.file directory;"); - System.out.println("args[4]: the output directory;"); - return; - } - - // String parsePath = "result/anomaly-10k"; - List messageList; - int sel = Integer.parseInt(args[0]); - String inputPath = args[1]; - // -1 for printing all - int exampleNum = Integer.parseInt(args[2]); - String dbPath = args[3]; - if (!dbPath.trim().equals("")) { - Constants.PROTOCOL_MAP_DIR = dbPath; - } - String outputPath = args[4]; - - File dir = new File(inputPath); - String[] fileNames; - if (dir.exists() && dir.isDirectory()) { - fileNames = dir.list(); - if (fileNames == null) { - System.out.println("No files in " + inputPath); - } - } else { - System.out.println("Can not find the directory " + inputPath); - return; - } - - ExceptionManagerSingle ems = new ExceptionManagerSingle(); - - for (String fileName : fileNames) { - if (!FileHandler.isGWFileValid(fileName)) { - continue; - } - if (sel == 1) { - messageList = FileHandler.readAnomalyString(inputPath + fileName); - } else { - GWParserSingle parser = new GWParserSingle(inputPath + fileName); - parser.parse(); - messageList = parser.getMessageList(); -// FileHandler.writeAnomalyString(outputPath + fileName + ".out", messageList); - } - - IFilter filter = new GWFilter(messageList); - filter.filter(); - - // KMX error - IAnalyzer kmxAnalyzer = new GWkmxAnalyzer( - filter.getCertainSourceTypeList(GWSourceType.KMX)); - kmxAnalyzer.printNumber(); - for (IMessage message : kmxAnalyzer.getMessageList()) { - kmxAnalyzer.analyze(message); - } - int[] kmxNums = kmxAnalyzer.printType(); - ems.sumTypeNums(kmxNums, ems.getKmxTypeNums()); - // kmxAnalyzer.printCertainTypeList(GWDataType.DOUBLE.getValue(), exampleNum); - - // HN error - IAnalyzer hnAnalyzer = new GWhnAnalyzer(Constants.PROTOCOL_MAP_DIR, - filter.getCertainSourceTypeList(GWSourceType.HN)); - hnAnalyzer.printNumber(); - for (IMessage message : hnAnalyzer.getMessageList()) { - hnAnalyzer.analyze(message); - } - int[] hnNums = hnAnalyzer.printType(); - ems.sumTypeNums(hnNums, ems.getHnTypeNums()); -// for (HNErrorType hType : HNErrorType.values()) { -// hnAnalyzer.printCertainTypeList(hType.getValue(), exampleNum); -// } - } - - ems.printResult(outputPath); - System.out.println("ExceptionManagerSingle end"); - } -} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/GWhnAnalyzer.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/GWhnAnalyzer.java deleted file mode 100644 index e0c5610fe38fc7831ed6d527dfb8c6e6c98646f2..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/GWhnAnalyzer.java +++ /dev/null @@ -1,159 +0,0 @@ -package com.k2data.governance.manager.analyzer; - -import com.k2data.governance.manager.common.Constants; -import com.k2data.governance.manager.common.type.HNErrorType; -import com.k2data.governance.manager.message.IMessage; -import com.k2data.governance.manager.metadata.GWSqliteMetaDatabase; -import com.k2data.governance.manager.metadata.IMetaDatabase; - -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Created by stoke on 2017/11/16. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * @author stoke - */ -public class GWhnAnalyzer implements IAnalyzer { - - private IMetaDatabase metaDatabase; - private SimpleDateFormat format = new SimpleDateFormat(Constants.TIME_FORMAT); - - private List messageList; - private Map> hnTypeMessageMap; - - public GWhnAnalyzer(String dbFilePath, List messageList) { - metaDatabase = new GWSqliteMetaDatabase(dbFilePath); - this.messageList = messageList; - - hnTypeMessageMap = new HashMap<>(); - for (HNErrorType type : HNErrorType.values()) { - hnTypeMessageMap.put(type, new ArrayList<>()); - } - } - - @Override - public List getMessageList() { - return messageList; - } - - /** - * Message Pattern: - * time: yyyy-MM-dd hh:mm:ss.SSS - * sendiment|time|(realtimedata|wtid|time,data[0],data[1],…, - * @param message message - */ - @Override - public void analyze(IMessage message) { - String key = message.getKey(); - if (!key.trim().equals("")) { - System.out.println(key); - } - - String msg = message.getMsg(); - - int formatIndex = msg.indexOf(Constants.GW_HN_DATA_START_TAG); - if (formatIndex < 0) { - hnTypeMessageMap.get(HNErrorType.FORMAT).add(message); - return; - } - - int startIndex = formatIndex + Constants.GW_HN_DATA_START_TAG.length(); - String dataSubstr = msg.substring(startIndex); - String[] splitData = dataSubstr.split("\\|"); - String wtid = splitData[0].trim(); - String[] data = splitData[1].split(","); - - String ts = data[0]; - try { - format.parse(ts); - } catch (ParseException e) { - hnTypeMessageMap.get(HNErrorType.TIME).add(message); - return; - } - - if (wtid.trim().isEmpty()) { - hnTypeMessageMap.get(HNErrorType.WTID_EMPTY).add(message); - return; - } - - // String wfid = ((GWSqliteMetaDatabase)metaDatabase).getWfid(wtid); - String fieldGroup = metaDatabase.getFieldGroup(wtid); - if (fieldGroup == null) { - hnTypeMessageMap.get(HNErrorType.PROTO_NULL).add(message); - return; - } - - List fields = metaDatabase.getFieldsByFieldGroup(fieldGroup); - if (fields == null) { - hnTypeMessageMap.get(HNErrorType.FIELDS_NULL).add(message); - return; - } - - int fSize = fields.size(); - if (fSize > data.length - 1) { - hnTypeMessageMap.get(HNErrorType.FIELDS_NUM_LESS).add(message); - return; - } else if (fSize < data.length - 1) { - hnTypeMessageMap.get(HNErrorType.FIELDS_NUM_MORE).add(message); - return; - } - - // if not in all errors above, then put it in OTHER - hnTypeMessageMap.get(HNErrorType.OHTER).add(message); - } - - @Override - public void printNumber() { - System.out.println("HN anomaly number : " + messageList.size()); - } - - @Override - public void printAll() { - printNumber(); - - for (IMessage message : messageList) { - System.out.println(message.toString()); - } - } - - @Override - public int[] printType() { - int[] typeNums = new int[HNErrorType.values().length]; - - hnTypeMessageMap.forEach((type, certainList) -> { - int size = certainList.size(); - typeNums[type.getValue()] = size; - System.out.println( - String.format("Exception type %s has %d messages", type.toString(), size)); - }); - int sumNum = 0; - for (int num : typeNums) { - sumNum += num; - } - System.out.println("There are " + sumNum + " hit exception messages in total"); - return typeNums; - } - - /** - * - * @param id type.value() - * @param size default msgList.size() - */ - public void printCertainTypeList(int id, int size) { - HNErrorType type = HNErrorType.fromInteger(id); - System.out.println("Error type : " + type); - List msgList = hnTypeMessageMap.get(type); - int realSize = msgList.size(); - int num = size > realSize ? realSize : size; - for (int i = 0; i < num; ++i) { - System.out.println(msgList.get(i).toString()); - } - } -} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/GWkmxAnalyzer.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/GWkmxAnalyzer.java deleted file mode 100644 index 9ba1553ea0f0163bb4f2a0e800844bfa15058d52..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/GWkmxAnalyzer.java +++ /dev/null @@ -1,121 +0,0 @@ -package com.k2data.governance.manager.analyzer; - -import com.k2data.governance.manager.common.type.GWDataType; -import com.k2data.governance.manager.message.IMessage; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.log4j.Logger; - -/** - * Created by stoke on 2017/11/16. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * Analyze KMX type Exception - * - * @author stoke - */ -public class GWkmxAnalyzer implements IAnalyzer { - - private static final String TYPE_PREFIX = ": "; - private static final String TYPE_SUFFIX = "_EX"; - private static final String TYPE_PREFIX_SMALL = "Invalid "; - private static final String TYPE_SUFFIX_SMALL = " value"; - private static final String VALUE_PREFIX = "value "; - private static final String VALUE_SUFFIX = " for field"; - private static final String FIELD_PREFIX = "field "; - private static final String FIELD_SUFFIX = " [GENERATED"; - - private Logger logger = Logger.getLogger(GWkmxAnalyzer.class); - - private List messageList; - private Map> kmxTypeMessageMap; - - public GWkmxAnalyzer(List messageList) { - this.messageList = messageList; - kmxTypeMessageMap = new HashMap<>(); - } - - @Override - public List getMessageList() { - return messageList; - } - - /** - * Message Pattern: - * [EXCEPTION] [Increase-Counters: _EX, RECORD_DROP] Invalid value for field - * [GENERATED AVRO RECORDS] [] - * - * @param message the exception message - */ - @Override - public void analyze(IMessage message) { - String key = message.getKey(); - if (!key.trim().equals("")) { - System.out.println(key); - } - - String msg = message.getMsg(); - int indexType = msg.indexOf(TYPE_SUFFIX); - if (indexType == -1) { - String err = String.format("The pattern is wrong since no _EX included in %s", msg); - logger.error(err); - } - String type = msg.substring(msg.indexOf(TYPE_PREFIX) + TYPE_PREFIX.length(), indexType); - GWDataType dataType = GWDataType.fromString(type); - if (!kmxTypeMessageMap.containsKey(dataType)) { - kmxTypeMessageMap.put(dataType, new ArrayList<>()); - } - kmxTypeMessageMap.get(dataType).add(message); - -// String type2 = msg.substring(msg.indexOf(TYPE_PREFIX_SMALL) + TYPE_PREFIX_SMALL.length(), -// msg.indexOf(TYPE_SUFFIX_SMALL)); -// String field = msg -// .substring(msg.indexOf(FIELD_PREFIX) + FIELD_PREFIX.length(), msg.indexOf(FIELD_SUFFIX)); -// String value = msg -// .substring(msg.indexOf(VALUE_PREFIX) + VALUE_PREFIX.length(), msg.indexOf(VALUE_SUFFIX)); -// // TODO check the value type - } - - @Override - public void printNumber() { - System.out.println("KMX anomaly number : " + messageList.size()); - } - - @Override - public void printAll() { - printNumber(); - - for (IMessage message : messageList) { - System.out.println(message.toString()); - } - } - - @Override - public int[] printType() { - int[] typeNums = new int[GWDataType.values().length]; - - kmxTypeMessageMap.forEach((type, certainList) -> { - int size = certainList.size(); - typeNums[type.getValue()] = size; - System.out.println( - String.format("Exception type %s has %d messages", type.toString(), certainList.size())); - }); - - return typeNums; - } - - @Override - public void printCertainTypeList(int id, int size) { - GWDataType type = GWDataType.fromInteger(id); - System.out.println("Error type : " + type); - List msgList = kmxTypeMessageMap.get(type); - int realsize = msgList.size(); - int num = size > realsize ? realsize : size; - for (int i = 0; i < num; ++i) { - System.out.println(msgList.get(i).toString()); - } - } -} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/IAnalyzer.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/IAnalyzer.java deleted file mode 100644 index 82ba7debd816bc042ecc83e2cb86d03b610ae639..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/analyzer/IAnalyzer.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.k2data.governance.manager.analyzer; - -import com.k2data.governance.manager.message.IMessage; -import java.util.List; - -/** - * Created by stoke on 2017/11/16. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * @author stoke - */ -public interface IAnalyzer { - List getMessageList(); - void analyze(IMessage message); - void printNumber(); - void printAll(); - int[] printType(); - void printCertainTypeList(int id, int size); -} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/Constants.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/Constants.java deleted file mode 100644 index 64e08fb861d0d4e30a190c7963ad87243c46dbb8..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/Constants.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.k2data.governance.manager.common; - -/** - * Created by stoke on 2017/11/16. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * @author stoke - */ -public class Constants { - - public static String GW_MSG_SPILT_OP = ","; - public static String GW_EXCEPTION_END = ".complete"; - - public static String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; - - public static String GW_HN_START_TAG = "sediment"; - public static String GW_KMX_START_TAG = "[EXCEPTION]"; - public static String GW_HN_DATA_START_TAG = "realtimedata|"; - -// public static String PROTOCOL_MAP_DIR = "/Users/stoke/code/github/GWDataEngineering/" -// + "ExceptionManager/src/main/resources/configdata.file"; - public static String PROTOCOL_MAP_DIR = "src/main/resources/configdata.file"; - public static String GW_META_SQLITE_FILE_DEFAULT = "metadata.db"; -} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/GWDataType.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/GWDataType.java deleted file mode 100644 index 39539c517dbc486c4270decbad39404b10a10507..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/GWDataType.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.k2data.governance.manager.common.type; - -/** - * Created by stoke on 2017/11/16. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * @author stoke - */ -public enum GWDataType { - DOUBLE(0), BOOLEAN(1), LONG(2), STRING(3), INT(4), FLOAT(5), OTHER(6); - - private final int value; - - GWDataType(int value) { - this.value = value; - } - - public int getValue() { - return value; - } - - public static GWDataType fromInteger(int id) { - return GWDataType.values()[id]; - } - - public static GWDataType fromString(String type) { - GWDataType dataType; - switch (type.toLowerCase()) { - case "double": - dataType = GWDataType.DOUBLE; - break; - case "boolean": - dataType = GWDataType.BOOLEAN; - break; - case "long": - dataType = GWDataType.LONG; - break; - case "string": - dataType = GWDataType.STRING; - break; - case "int": - dataType = GWDataType.INT; - break; - case "float": - dataType = GWDataType.FLOAT; - break; - default: - dataType = GWDataType.OTHER; - break; - } - - return dataType; - } - -} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/GWSourceType.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/GWSourceType.java deleted file mode 100644 index e04a94774a13e93b144c1254433422ff8832c902..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/GWSourceType.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.k2data.governance.manager.common.type; - -/** - * Created by stoke on 2017/11/16. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * @author stoke - */ -public enum GWSourceType { - KMX(0), HN(1), OTHER(2); - - private final int value; - - GWSourceType(int value) { - this.value = value; - } - - public int getValue() { - return value; - } - - public static GWSourceType fromInteger(int id) { - return GWSourceType.values()[id]; - } - - public static GWSourceType fromString(String type) { - GWSourceType sourceType; - switch (type.toLowerCase()) { - case "kmx": - sourceType = GWSourceType.KMX; - break; - case "hn": - sourceType = GWSourceType.HN; - break; - default: - sourceType = GWSourceType.OTHER; - break; - } - - return sourceType; - } - -} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/HNErrorType.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/HNErrorType.java deleted file mode 100644 index d63e965231b7c0ee6c5bd3efe9a1966e9cbe1135..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/common/type/HNErrorType.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.k2data.governance.manager.common.type; - -/** - * Created by stoke on 2017/11/16. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * @author stoke - */ -public enum HNErrorType { - FORMAT(0), // no realtime| in the msg - WTID_NULL(1), // wtid is not in DB - TIME(2), // time format wrong - PROTO_NULL(3), // protocolid is not in DB - FIELDS_NUM_MORE(4), // number of data more than in the DB - FIELDS_NUM_LESS(5), // number of data less than in the DB - WTID_EMPTY(6), - FIELDS_NULL(7), // fields null - OHTER(8); - - private final int value; - - HNErrorType(int value) { - this.value = value; - } - - public int getValue() { - return value; - } - - public static HNErrorType fromInteger(int id) { - return HNErrorType.values()[id]; - } - -} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/filter/GWFilter.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/filter/GWFilter.java deleted file mode 100644 index 2b62d401fe5e3335c9296044dd28aa255ec4ea8c..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/filter/GWFilter.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.k2data.governance.manager.filter; - -import com.k2data.governance.manager.common.Constants; -import com.k2data.governance.manager.common.type.GWSourceType; -import com.k2data.governance.manager.message.IMessage; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.log4j.Logger; - -/** - * Created by stoke on 2017/11/16. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * @author stoke - */ -public class GWFilter implements IFilter { - private Logger logger = Logger.getLogger(GWFilter.class); - - private List messageList; - private Map> gwSourceTypeMessageMap; - - public GWFilter(List messageList) { - this.messageList = messageList; - gwSourceTypeMessageMap = new HashMap<>(); - - for (GWSourceType type : GWSourceType.values()) { - gwSourceTypeMessageMap.put(type, new ArrayList<>()); - } - } - - @Override - public List getMessageList() { - return messageList; - } - - public Map> getGwSourceTypeMessageMap() { - return gwSourceTypeMessageMap; - } - - @Override - public List getCertainSourceTypeList(GWSourceType type) { - return gwSourceTypeMessageMap.get(type); - } - - @Override - public void filter() { - for (IMessage message : messageList) { - String msg = message.getMsg(); - - if (msg.startsWith(Constants.GW_KMX_START_TAG)) { - gwSourceTypeMessageMap.get(GWSourceType.KMX).add(message); - } else if (msg.startsWith(Constants.GW_HN_START_TAG)) { - gwSourceTypeMessageMap.get(GWSourceType.HN).add(message); - } else { - gwSourceTypeMessageMap.get(GWSourceType.OTHER).add(message); - String err = String.format("Messages for other type, %s", message.toString()); - logger.error(err); - } - } - } - -} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/AbstractMessage.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/AbstractMessage.java deleted file mode 100644 index ae0b901268f6ad1c31df6a643715e84f7e134f77..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/AbstractMessage.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.k2data.governance.manager.message; - -/** - * Created by stoke on 2017/11/16. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * @author stoke - */ -public abstract class AbstractMessage implements IMessage { - private String key; - private String msg; - - @Override - public String getKey() { - return key; - } - - @Override - public void setKey(String key) { - this.key = key; - } - - @Override - public String getMsg() { - return msg; - } - - @Override - public void setMsg(String msg) { - this.msg = msg; - } - - @Override - public String toString() { - return String.format("%s,%s", getKey(), getMsg()); - } -} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/GWMessage.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/GWMessage.java deleted file mode 100644 index cf991ec10943b89b48142d33f0b646977fa38ac7..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/message/GWMessage.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.k2data.governance.manager.message; - -import com.k2data.governance.manager.common.Constants; - -/** - * Created by stoke on 2017/11/16. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * @author stoke - */ -public class GWMessage extends AbstractMessage { - - public GWMessage(String key, String msg) { - setKey(key); - setMsg(msg); - } - - @Override - public String toString() { - return String.format("%s" + Constants.GW_MSG_SPILT_OP + "%s", getKey(), getMsg()); - } -} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/parser/GWParserSingle.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/parser/GWParserSingle.java deleted file mode 100644 index 64c06e2a880f48d89bc0617f5667570a25fbe6b9..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/parser/GWParserSingle.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.k2data.governance.manager.parser; - -import com.k2data.governance.manager.message.GWMessage; -import com.k2data.governance.manager.message.IMessage; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.SequenceFile.Reader; -import org.apache.hadoop.io.Text; - -/** - * Created by stoke on 2017/11/20. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * @author stoke - */ -public class GWParserSingle { - private static Configuration conf = new Configuration(); - private String path; - private List messageList; - - public List getMessageList() { - return messageList; - } - - public GWParserSingle(String path) { - messageList = new ArrayList<>(); - this.path = path; - } - - public void parse() { - Path filePath = new Path(path); - try { - FileSystem fs = FileSystem.get(conf); - Reader reader = new Reader(fs, filePath, conf); - Text key = new Text(); - Text value = new Text(); - while (reader.next(key, value)) { - messageList.add(new GWMessage(key.toString(), value.toString())); - } - IOUtils.closeStream(reader); - - } catch (IOException e) { - e.printStackTrace(); - } - - } -} diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/parser/ParserMR.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/parser/ParserMR.java deleted file mode 100644 index b638217c567987d07db9055de5c755c9096dca1e..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/parser/ParserMR.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.k2data.governance.manager.parser; - -import com.k2data.governance.manager.util.FileHandler; -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile.Reader; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.ReflectionUtils; - -/** - * Created by stoke on 2017/11/15. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * TODO: to refactor - * - * @author stoke - */ -public class ParserMR { - - private static Reader reader = null; - private static Configuration conf = new Configuration(); - - // ref: http://blog.csdn.net/inte_sleeper/article/details/7010966 - public static void main(String[] args) - throws IOException, ClassNotFoundException, InterruptedException { - String inputPath = "/Users/stoke/lab/NEL/k2data/thu/abnormal/hdfs-bolt-anomaly-9-2-1509685150773.log.complete"; - // String outputPath = "result"; - // String inputPath = args[0]; - String outputPath = args[1]; - - // remove the output path - FileHandler.removeOutputDir(outputPath); - - System.out.println(inputPath); - System.out.println(outputPath); - - Job job = new Job(conf, "read sequence file"); - job.setJarByClass(ParserMR.class); - job.setMapperClass(ReadFileMapper.class); - // This should be matched with the file - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Text.class); - - Path path = new Path(inputPath); - FileSystem fs = FileSystem.get(conf); - reader = new Reader(fs, path, conf); - FileInputFormat.addInputPath(job, path); - FileOutputFormat.setOutputPath(job, new Path(outputPath)); - System.exit(job.waitForCompletion(true) ? 0 : 1); - } - - public static class ReadFileMapper extends Mapper { - - @Override - public void map(Writable key, Text value, Context context) { - key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); - value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf); - - try { - while (reader.next(key, value)) { - context.write(key, value); - } - } catch (IOException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } -} \ No newline at end of file diff --git a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/util/FileHandler.java b/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/util/FileHandler.java deleted file mode 100644 index 0bf30aec2387ce5e16dff6ae2ae4a727affb2a0f..0000000000000000000000000000000000000000 --- a/k2de-governance-exception-manager/src/main/java/com/k2data/governance/manager/util/FileHandler.java +++ /dev/null @@ -1,99 +0,0 @@ -package com.k2data.governance.manager.util; - -import com.k2data.governance.manager.message.GWMessage; -import com.k2data.governance.manager.message.IMessage; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.List; - -/** - * Created by stoke on 2017/11/15. - * E-mail address is zaqthss2009@gmail.com - * Copyright © stoke. All Rights Reserved. - * - * @author stoke - */ -public class FileHandler { - public static final String SPLIT_OP = "\t"; - - /** - * used for MR job, remove the outputDir in advance - * @param outputPath outputPath - */ - public static void removeOutputDir(String outputPath) { - File dir = new File(outputPath); - if (dir.exists()) { - File[] files = dir.listFiles(); - if (files != null) { - for (File file : files) { - file.delete(); - } - } - dir.delete(); - } - } - - /** - * @param path - */ - public static List readAnomalyString(String path) { - List messageList = new ArrayList<>(); - try { - FileReader fr = new FileReader(path); - BufferedReader br = new BufferedReader(fr); - - String line; - while ((line = br.readLine()) != null) { - String[] vals = line.split(SPLIT_OP); - if (vals.length <= 1) { - System.out.println(line); - } - String key = vals[0]; - String msg = vals[1]; - messageList.add(new GWMessage(key, msg)); - } - - br.close(); - fr.close(); - } catch (IOException e) { - e.printStackTrace(); - } - - return messageList; - } - - public static void writeAnomalyString(String outputPath, List messageList) { - try { - FileWriter fw = new FileWriter(outputPath); - PrintWriter pw = new PrintWriter(fw); - - for (IMessage message : messageList) { - pw.println(message.getKey() + SPLIT_OP + message.getMsg()); - } - - pw.close(); - fw.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public static boolean isGWFileValid(String fileName) { - return !fileName.startsWith("."); -// return fileName.endsWith(Constants.GW_EXCEPTION_END); - } - - public static void main(String[] args) { - String[] outputPaths = { "result/", "output/" }; - - for(String outputPath : outputPaths) { - removeOutputDir(outputPath); - } - } -} diff --git a/pom.xml b/pom.xml index 680fb150547607ca763addf0c7209769bbd6fae4..1bc6dde4428119b60d61481af4f443dd743d2d17 100644 --- a/pom.xml +++ b/pom.xml @@ -11,13 +11,18 @@ UTF-8 - 1.8 + 1.7 2.6.0 + 2.1.0 + 2.11 + 1.4.2 k2de-governance-data-extractor - k2de-governance-exception-manager + k2de-governance-exception-manager-base + k2de-governance-exception-manager-kmx + k2de-governance-exception-manager-huineng @@ -45,6 +50,34 @@ sqlite-jdbc 3.20.1 + + + net.sourceforge.argparse4j + argparse4j + 0.7.0 + + + + org.apache.spark + spark-core_${spark.comp.version} + ${spark.version} + provided + + + + org.apache.spark + spark-sql_${spark.comp.version} + ${spark.version} + provided + + + + com.sun + tools + 1.4.2 + system + ${java.home}/../lib/tools.jar +