Commit eff193d3 authored by 王海鹰's avatar 王海鹰

支持“核心”参数

打包逻辑优化
parent f32f595c
Pipeline #14466 passed with stages
in 2 minutes and 59 seconds
...@@ -100,19 +100,16 @@ under the License. ...@@ -100,19 +100,16 @@ under the License.
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId> <artifactId>flink-java</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId> <artifactId>flink-shaded-hadoop-2</artifactId>
<version>2.6.5-10.0</version> <version>2.6.5-10.0</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
......
...@@ -2,8 +2,8 @@ package com.zorkdata.datamask; ...@@ -2,8 +2,8 @@ package com.zorkdata.datamask;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.zorkdata.datamask.constant.ParamConstants; import com.zorkdata.datamask.constant.ParamConstants;
import com.zorkdata.datamask.hadoop.HadoopMask; import com.zorkdata.datamask.hadoop.HdfsLogMaskUtil;
import com.zorkdata.datamask.kafka.KafkaMask; import com.zorkdata.datamask.kafka.KafkaMsgMaskUtil;
import com.zorkdata.datamask.util.ZorkParameterUtil; import com.zorkdata.datamask.util.ZorkParameterUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -32,9 +32,9 @@ public class TransactionLogMask { ...@@ -32,9 +32,9 @@ public class TransactionLogMask {
String source = conf.get(ParamConstants.SOURCE); String source = conf.get(ParamConstants.SOURCE);
if (ParamConstants.HDFS.equals(source)) { if (ParamConstants.HDFS.equals(source)) {
HadoopMask.maskHdfsLog(conf); HdfsLogMaskUtil.maskHdfsLog(conf);
} else if (ParamConstants.KAFKA.equals(source)) { } else if (ParamConstants.KAFKA.equals(source)) {
KafkaMask.maskKafkaMsg(conf); KafkaMsgMaskUtil.maskKafkaMsg(conf);
} }
} }
......
...@@ -49,8 +49,8 @@ import java.util.Map; ...@@ -49,8 +49,8 @@ import java.util.Map;
* @author: wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>) * @author: wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date: Create in 2020/9/23 9:30 * Date: Create in 2020/9/23 9:30
*/ */
public class HadoopMask { public class HdfsLogMaskUtil {
  • Add a private constructor to hide the implicit public one. 📘

Please register or sign in to reply
private static final Logger logger = LoggerFactory.getLogger(HadoopMask.class); private static final Logger logger = LoggerFactory.getLogger(HdfsLogMaskUtil.class);
  • Remove this unused "logger" private field. 📘

Please register or sign in to reply
/** /**
* hdfs日志文件脱敏 * hdfs日志文件脱敏
...@@ -73,6 +73,8 @@ public class HadoopMask { ...@@ -73,6 +73,8 @@ public class HadoopMask {
ArrayList< String> fieldsWhiteList = new ArrayList<String>(fieldsWhiteListArray.length); ArrayList< String> fieldsWhiteList = new ArrayList<String>(fieldsWhiteListArray.length);
Collections.addAll(fieldsWhiteList, fieldsWhiteListArray); Collections.addAll(fieldsWhiteList, fieldsWhiteListArray);
String core = String.valueOf(conf.get(ParamConstants.CORE)).trim();
List<String> logFiles = filterHdfsLogFiles(hdfsLogQueryParam.getHdfsSrc(), hdfsLogQueryParam.getDate(), List<String> logFiles = filterHdfsLogFiles(hdfsLogQueryParam.getHdfsSrc(), hdfsLogQueryParam.getDate(),
hdfsLogQueryParam.getStartTime(), hdfsLogQueryParam.getEndTime()); hdfsLogQueryParam.getStartTime(), hdfsLogQueryParam.getEndTime());
...@@ -96,7 +98,7 @@ public class HadoopMask { ...@@ -96,7 +98,7 @@ public class HadoopMask {
new TypeReference<LogData>() { new TypeReference<LogData>() {
}); });
//根据日志事件的核心信息做过滤 //根据日志事件的核心信息做过滤
if (null != hdfsLogQueryParam.getCore() && logData.getDimensions().get("hostname").indexOf("c9") > -1 ) { if (null != hdfsLogQueryParam.getCore() && logData.getDimensions().get("hostname").indexOf(core) > -1 ) {
//根据日志事件的timestamp做过滤 //根据日志事件的timestamp做过滤
Long timestamp = DateUtils.utc2timestamp(logData.getTimestamp()); Long timestamp = DateUtils.utc2timestamp(logData.getTimestamp());
boolean flag = null != timestamp && timestamp > hdfsLogQueryParam.getStartTime() boolean flag = null != timestamp && timestamp > hdfsLogQueryParam.getStartTime()
......
...@@ -21,7 +21,7 @@ import java.util.Properties; ...@@ -21,7 +21,7 @@ import java.util.Properties;
* @Email xiesen310@163.com * @Email xiesen310@163.com
* @Date 2020/10/21 14:51 * @Date 2020/10/21 14:51
*/ */
public class KafkaMask { public class KafkaMsgMaskUtil {
  • Add a private constructor to hide the implicit public one. 📘

Please register or sign in to reply
/** /**
* kafka消息数据脱敏 * kafka消息数据脱敏
* *
......
...@@ -5,7 +5,7 @@ source: "hdfs" ...@@ -5,7 +5,7 @@ source: "hdfs"
hdfs_src: "hdfs://cdh-2:8020/tmp/datawarehouse4/jzjy/kcbp_biz_log" hdfs_src: "hdfs://cdh-2:8020/tmp/datawarehouse4/jzjy/kcbp_biz_log"
# hdfs日志写入地址,非必传,默认写到hdfs-src目录下的output目录下 # hdfs日志写入地址,非必传,默认写到hdfs-src目录下的output目录下
hdfs_dest: "hdfs://cdh-2:8020/tmp/datawarehouse/jzjy/kcbp_biz_log/output5/" hdfs_dest: "hdfs://cdh-2:8020/tmp/datawarehouse/jzjy/kcbp_biz_log/output7/"
# 脱敏结果下载到的本地路径 # 脱敏结果下载到的本地路径
download_path: "/tmp" download_path: "/tmp"
...@@ -24,7 +24,7 @@ end_time: 1601348850000 ...@@ -24,7 +24,7 @@ end_time: 1601348850000
# 不做脱敏的字段白名单 # 不做脱敏的字段白名单
fieldsWhiteList: "messid,fundid,custid,orgid,brhid,secuid,bankcode,market,ordersno,ordergroup,count,poststr,stkcode,bsflag,\ fieldsWhiteList: "messid,fundid,custid,orgid,brhid,secuid,bankcode,market,ordersno,ordergroup,count,poststr,stkcode,bsflag,\
orderamt,price,qty,bankcode,tacode,ofcode,transacc,taacc,indexTime,logchecktime,end_logtime,collecttime,deserializerTime,versioninfo" orderamt,price,qty,bankcode,tacode,ofcode,transacc,taacc,indexTime,logchecktime,end_logtime,collecttime,deserializerTime,versioninfo,fmillsecond,smillsecond"
# 脱敏用的正则表达式 # 脱敏用的正则表达式
reg_exp: reg_exp:
......
...@@ -6,8 +6,7 @@ if [ ! -d "$deploy_path/logs" ]; then ...@@ -6,8 +6,7 @@ if [ ! -d "$deploy_path/logs" ]; then
mkdir -p $deploy_path/logs mkdir -p $deploy_path/logs
fi fi
FLINK_TASK_CONF=gmas-config.yaml FLINK_TASK_CONF=application.yml
parallelism=1 parallelism=1
flink run -p $parallelism $deploy_path/lib/transactionLogMask-0.1.jar flink run $deploy_path/lib/transactionLogMask-0.1.jar --conf $deploy_path/conf/$FLINK_TASK_CONF> $deploy_path/logs/transactionLogMask-0.1.log &
$$deploy_path/conf/$FLINK_TASK_CONF> $$deploy_path/logs/transactionLogMask-0.1.log & \ No newline at end of file
\ No newline at end of file
  • SonarQube analysis reported 149 issues

    • 🚫 19 critical
    • 80 major
    • 🔽 49 minor
    • 1 info

    Watch the comments in this conversation to review them.

    Top 30 extra issues

    Note: The following issues were found on lines that were not modified in the commit. Because these issues can't be reported as line comments, they are summarized here:

    1. 🚫 Move constants to a class or enum. 📘
    2. 🚫 Move constants to a class or enum. 📘
    3. 🚫 Move constants to a class or enum. 📘
    4. 🚫 Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation. 📘
    5. 🚫 Change this "try" to a try-with-resources. (sonar.java.source not set. Assuming 7 or greater.) 📘
    6. 🚫 Refactor this code to not throw exceptions in finally blocks. 📘
    7. 🚫 Refactor this code to not throw exceptions in finally blocks. 📘
    8. 🚫 Make "patterns" private or transient. 📘
    9. 🚫 Define a constant instead of duplicating this literal "序列化失败" 15 times. 📘
    10. 🚫 Define a constant instead of duplicating this literal " {\n" 7 times. 📘
    11. 🚫 [Define a constant instead of duplicating this literal " "type": \n" 7 times. 📘
    12. 🚫 Define a constant instead of duplicating this literal " "string",\n" 4 times. 📘
    13. 🚫 Define a constant instead of duplicating this literal " "null"\n" 4 times. 📘
    14. 🚫 [Define a constant instead of duplicating this literal " ]\n" 7 times.](https://git.zorkdata.com/xiesen/transactionlogmask/blob/eff193d302b69f0a01f80ff39d793752de2c4d4b/src/main/java/com/zorkdata/datamask/util/avro/LogAvroMacroDef.java#L20) 📘
    15. 🚫 Define a constant instead of duplicating this literal " },\n" 6 times. 📘
    16. 🚫 Define a constant instead of duplicating this literal " "null",\n" 3 times. 📘
    17. 🚫 Define a constant instead of duplicating this literal " {\n" 3 times. 📘
    18. 🚫 Define a constant instead of duplicating this literal " "type": "map",\n" 3 times. 📘
    19. 🚫 Define a constant instead of duplicating this literal " }\n" 3 times. 📘
    20. Define and throw a dedicated exception instead of using a generic one. 📘
    21. Remove this unused "source" private field. 📘
    22. Remove this unused "hdfsSrc" private field. 📘
    23. Remove this unused "hdfsDest" private field. 📘
    24. Remove this unused "core" private field. 📘
    25. Remove this unused "date" private field. 📘
    26. Remove this unused "startTime" private field. 📘
    27. Remove this unused "endTime" private field. 📘
    28. Remove this unused "servers" private field. 📘
    29. Remove this unused "zookeeper" private field. 📘
    30. Remove this unused "topic" private field. 📘
    • ... 116 more
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment