Commit 69f90fb7 authored by DeleMing's avatar DeleMing

<dev>

1. 初始化工程
parent 243313f1
Pipeline #15041 failed with stages
in 1 minute and 28 seconds
# transactionLogMask
# Guotai Transaction Log Desensitization Job
国泰交易日志脱敏
\ No newline at end of file
<!--
~ Copyright (c) 2020. log-merge
打包描述文件,以下内容不要修改
-->
<assembly>
<id>bin</id>
<formats>
<format>tar.gz</format>
</formats>
<fileSets>
<!-- ============ 以下内容不要修改 ============ -->
<fileSet>
<directory>target/transaction-log-deploy/bin</directory>
<includes>
<include>*.sh</include>
</includes>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>target/transaction-log-deploy/conf</directory>
<outputDirectory>conf</outputDirectory>
</fileSet>
<fileSet>
<directory>target/transaction-log-deploy/lib</directory>
<outputDirectory>lib</outputDirectory>
</fileSet>
</fileSets>
</assembly>
\ No newline at end of file
<assembly>
<id>bin</id>
<!-- 最终打包成一个用于发布的zip文件 -->
<formats>
<format>zip</format>
</formats>
<!-- Adds dependencies to zip package under lib directory -->
<!-- <dependencySets>-->
<!-- <dependencySet>-->
<!-- &lt;!&ndash; 不使用项目的artifact,第三方jar不要解压,打包进zip文件的lib目录 &ndash;&gt;-->
<!-- <useProjectArtifact>false</useProjectArtifact>-->
<!-- <outputDirectory>lib</outputDirectory>-->
<!-- <unpack>false</unpack>-->
<!-- </dependencySet>-->
<!-- </dependencySets>-->
<fileSets>
<fileSet>
<directory>${project.basedir}\package\scripts</directory>
<outputDirectory>/bin</outputDirectory>
<includes>
<include>*.sh</include>
<include>*.bat</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.basedir}\src\main\resources</directory>
<outputDirectory>/conf</outputDirectory>
<includes>
<include>*.yml</include>
<include>*.properties</include>
</includes>
</fileSet>
<!-- <fileSet>-->
<!-- <directory>target/transaction-log-deploy/conf</directory>-->
<!-- <outputDirectory>/conf</outputDirectory>-->
<!-- <includes>-->
<!-- <include>*.yml</include>-->
<!-- <include>*.properties</include>-->
<!-- </includes>-->
<!-- </fileSet>-->
<!-- 把项目自己编译出来的jar文件,打包进zip文件的lib根目录 -->
<fileSet>
<directory>${project.build.directory}</directory>
<outputDirectory>/lib</outputDirectory>
<includes>
<include>*.jar</include>
</includes>
</fileSet>
</fileSets>
</assembly>
#!/usr/bin/env bash
export BASE_PATH=$(cd `dirname $0`; pwd)
DEPLOY_PATH=${BASE_PATH%/*}
if [ ! -d "$DEPLOY_PATH/logs" ]; then
mkdir -p $DEPLOY_PATH/logs
fi
FLINK_TASK_CONF=application.yml
flink run $DEPLOY_PATH/lib/transaction-log-desensitization-0.1.jar --conf $DEPLOY_PATH/conf/$FLINK_TASK_CONF> $DEPLOY_PATH/logs/transaction-log-desensitization-0.1.log &
\ No newline at end of file
This diff is collapsed.
package com.zorkdata.datamask;
import com.alibaba.fastjson.JSON;
import com.zorkdata.datamask.constant.ParamConstants;
import com.zorkdata.datamask.hadoop.HdfsLogMaskUtil;
import com.zorkdata.datamask.kafka.KafkaMsgMaskUtil;
import com.zorkdata.datamask.util.ZorkParameterUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* Description : 国泰交易日志脱敏job
*
* @author : wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date : Create in 2020/9/18 17:35
*/
public class TransactionLogMask {
private static Logger logger = LoggerFactory.getLogger(TransactionLogMask.class);
public static final int PARAM_LENGTH = 2;
public static void main(String[] args) throws Exception {
if (args.length != PARAM_LENGTH) {
String error = "参数缺失,请输入配置文件,例如: --conf /opt/TransactionLogMask/application.yml";
logger.error(error);
throw new RuntimeException(error);
}
Map<String, String> conf = ZorkParameterUtil.readParameter(args);
logger.info("配置文件: {}", JSON.toJSONString(conf));
String source = conf.get(ParamConstants.SOURCE);
if (ParamConstants.HDFS.equals(source)) {
HdfsLogMaskUtil.maskHdfsLog(conf);
} else if (ParamConstants.KAFKA.equals(source)) {
KafkaMsgMaskUtil.maskKafkaMsg(conf);
}
}
}
package com.zorkdata.datamask.constant;
/**
* Description: 查询参数常量
*
* @author: wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date: Create in 2020/10/20 15:32
*/
public interface ParamConstants {
String SOURCE = "source";
String HDFS_SRC = "hdfs_src";
String HDFS_DEST = "hdfs_dest";
String CORE = "core";
String DATE = "date";
String START_TIME = "start_time";
String END_TIME = "end_time";
String SERVERS = "servers";
String ZOOKEEPER = "zookeeper";
String TOPIC ="topic";
String HDFS ="hdfs";
String KAFKA ="kafka";
String FIELDS_WHITE_LIST = "fieldsWhiteList";
}
package com.zorkdata.datamask.constant;
/**
* Description : 正则表达式常量
*
* @author : wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date : Create in 2020/10/20 15:32
*/
public interface RegExpConstants {
String REG_EXP = "reg_exp";
String NAME_REG_EXP = "name";
String MOBILE_REG_EXP = "mobile";
String PHONE_REG_EXP = "phone";
String EMAIL_REG_EXP = "email";
String ID15_REG_EXP = "id15";
String ID18_REG_EXP = "id18";
String BANK_CARD_REG_EXP = "bank_card";
String ADDRESS_REG_EXP = "address";
String IP_REG_EXP = "ip";
String MAC_REG_EXP = "mac";
}
package com.zorkdata.datamask.constant;
/**
* @author 谢森
* @Description 常量定义
* @Email xiesen310@163.com
* @Date 2020/10/21 15:50
*/
public interface StrConstants {
String FILE_SEPARATOR = "/";
String AVRO_SUFFIX = ".avro";
String EMPTY_STR = "";
}
package com.zorkdata.datamask.domain;
import lombok.Data;
import java.io.Serializable;
/**
* @author 谢森
* @Description 参数实体类
* @Email xiesen310@163.com
* @Date 2020/10/21 14:33
*/
@Data
public class HdfsLogQueryParam implements Serializable {
private static final long serialVersionUID = 1L;
private String source;
private String hdfsSrc;
private String hdfsDest;
private String core;
private String date;
private Long startTime;
private Long endTime;
public HdfsLogQueryParam(String source, String hdfsSrc, String hdfsDest, String core, String date, Long startTime,
Long endTime) {
this.source = source;
this.hdfsSrc = hdfsSrc;
this.hdfsDest = hdfsDest;
this.core = core;
this.date = date;
this.startTime = startTime;
this.endTime = endTime;
}
}
package com.zorkdata.datamask.domain;
import lombok.Data;
import java.io.Serializable;
/**
* @author 谢森
* @Description kafka 参数实体类
* @Email xiesen310@163.com
* @Date 2020/10/21 15:07
*/
@Data
public class KafkaMsgQueryParam implements Serializable {
private static final long serialVersionUID = 1L;
private String servers;
private String zookeeper;
private String topic;
private String hdfsDest;
private String core;
private String date;
private Long startTime;
private Long endTime;
public KafkaMsgQueryParam(String servers, String zookeeper, String topic, String hdfsDest, String core, String date,
Long startTime, Long endTime) {
this.servers = servers;
this.zookeeper = zookeeper;
this.topic = topic;
this.hdfsDest = hdfsDest;
this.core = core;
this.date = date;
this.startTime = startTime;
this.endTime = endTime;
}
}
{
"namespace": "com.zork.logs",
"type": "record",
"name": "logs",
"fields": [
{
"name": "logTypeName",
"type": [
"string",
"null"
]
},
{
"name": "timestamp",
"type": [
"string",
"null"
]
},
{
"name": "source",
"type": [
"string",
"null"
]
},
{
"name": "offset",
"type": [
"string",
"null"
]
},
{
"name": "dimensions",
"type": [
"null",
{
"type": "map",
"values": "string"
}
]
},
{
"name": "measures",
"type": [
"null",
{
"type": "map",
"values": "double"
}
]
},
{
"name": "normalFields",
"type": [
"null",
{
"type": "map",
"values": "string"
}
]
}
]
}
package com.zorkdata.datamask.hadoop;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.zorkdata.datamask.constant.ParamConstants;
import com.zorkdata.datamask.constant.StrConstants;
import com.zorkdata.datamask.domain.LogData;
import com.zorkdata.datamask.domain.HdfsLogQueryParam;
import com.zorkdata.datamask.domain.TransactionLog;
import com.zorkdata.datamask.util.DateUtils;
import com.zorkdata.datamask.util.MaskUtil;
import com.zorkdata.datamask.util.ParamUtils;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Description: hdfs日志文件脱敏
*
* @author: wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date: Create in 2020/9/23 9:30
*/
public class HdfsLogMaskUtil {
private static final Logger logger = LoggerFactory.getLogger(HdfsLogMaskUtil.class);
/**
* hdfs日志文件脱敏
*
* @param conf 请求参数
* @return void
*/
public static void maskHdfsLog(Map<String, String> conf) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
JobConf jobConf = new JobConf();
jobConf.set("avro.output.schema", TransactionLog.SCHEMA$.toString(true));
HdfsLogQueryParam hdfsLogQueryParam = ParamUtils.initHadoopConf(conf);
ParameterTool parameterTool = ParameterTool.fromMap(conf);
env.getConfig().setGlobalJobParameters(parameterTool);
MaskUtil maskUtil = ParamUtils.initMaskUtil(conf);
String[] fieldsWhiteListArray = String.valueOf(conf.get(ParamConstants.FIELDS_WHITE_LIST)).trim().split(",");
ArrayList< String> fieldsWhiteList = new ArrayList<String>(fieldsWhiteListArray.length);
Collections.addAll(fieldsWhiteList, fieldsWhiteListArray);
String core = String.valueOf(conf.get(ParamConstants.CORE)).trim();
List<String> logFiles = filterHdfsLogFiles(hdfsLogQueryParam.getHdfsSrc(), hdfsLogQueryParam.getDate(),
hdfsLogQueryParam.getStartTime(), hdfsLogQueryParam.getEndTime());
for (String logFile : logFiles) {
/**
* 读取hdfs日志文件,avro反序列化处理
*/
HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<Object, Object>
(new AvroInputFormat(), Object.class, Object.class, jobConf);
AvroInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(logFile));
DataSource<Tuple2<Object, Object>> hdfsLogInput = env.createInput(hadoopInputFormat);
/**
* 脱敏算子
*/
FlatMapOperator<Tuple2<Object, Object>, Object> maskFlatMapOperator =
hdfsLogInput.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
@Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> collector) throws Exception {
LogData logData = JSON.parseObject(value.getField(0).toString(),
new TypeReference<LogData>() {
});
//根据日志事件的核心信息做过滤
if (null != hdfsLogQueryParam.getCore() && logData.getDimensions().get("hostname").indexOf(core) > -1 ) {
//根据日志事件的timestamp做过滤
Long timestamp = DateUtils.utc2timestamp(logData.getTimestamp());
boolean flag = null != timestamp && timestamp > hdfsLogQueryParam.getStartTime()
&& timestamp < hdfsLogQueryParam.getEndTime();
if (flag) {
Map maskResult = maskUtil.mask(logData.getNormalFields(), fieldsWhiteList);
logData.setNormalFields(maskResult);
collector.collect(logData);
}
}
}
});
// 获取目标hdfs的输出目录
String logFileName =
logFile.split(StrConstants.FILE_SEPARATOR)[logFile.split(StrConstants.FILE_SEPARATOR).length - 1];
String filePath = hdfsLogQueryParam.getHdfsDest() + logFileName.replace(StrConstants.AVRO_SUFFIX,
StrConstants.EMPTY_STR);
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(new AvroOutputFormat(), jobConf);
FileOutputFormat.setOutputPath(jobConf, new Path(filePath));
/**
* avro序列化算子
*/
maskFlatMapOperator.map(new MapFunction<Object, Tuple2<AvroWrapper<LogData>, NullWritable>>() {
@Override
public Tuple2<AvroWrapper<LogData>, NullWritable> map(Object value) throws Exception {
AvroKey<LogData> key = new AvroKey<LogData>((LogData) value);
Tuple2<AvroWrapper<LogData>, NullWritable> tupple = new Tuple2<AvroWrapper<LogData>,
NullWritable>(key, NullWritable.get());
return tupple;
}
}).output(hadoopOutputFormat);
try {
env.execute("国泰交易日志脱敏job");
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 过滤hdfs日志文件
*
* @param hdfs hdfs地址
* @param date 日期
* @param startTime 起始时间
* @param endTime 结束时间
* @return hdfs文件列表
*/
private static List<String> filterHdfsLogFiles(String hdfs, String date, Long startTime, Long endTime) {
if (!hdfs.endsWith(StrConstants.FILE_SEPARATOR)) {
hdfs += StrConstants.FILE_SEPARATOR;
}
String path = hdfs;
if (null != date) {
path = hdfs + date;
}
Configuration conf = new Configuration();
List<String> logFiles = new ArrayList<>();
try {
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(new URI("hdfs://cdh-2:8020/"), conf, "hdfs");
} catch (InterruptedException e) {
e.printStackTrace();
}
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path(path),
false);
while (locatedFileStatusRemoteIterator.hasNext()) {
LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
long modificationTime = next.getModificationTime();
// 根据文件的修改时间做过滤,获取用户指定时间段内的文件
if (modificationTime > startTime && modificationTime < endTime) {
Path hdfsFilePath = next.getPath();
logFiles.add(hdfsFilePath.toString());
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
return logFiles;
}
}
package com.zorkdata.datamask.kafka;
import com.zorkdata.datamask.domain.KafkaMsgQueryParam;
import com.zorkdata.datamask.util.ParamUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.text.SimpleDateFormat;
import java.time.ZoneId;
import java.util.Map;
import java.util.Properties;
/**
* @author 谢森
* @Description kafka 数据脱敏
* @Email xiesen310@163.com
* @Date 2020/10/21 14:51
*/
public class KafkaMsgMaskUtil {
/**
* kafka消息数据脱敏
*
* @param conf 请求参数
* @return void
*/
public static void maskKafkaMsg(Map<String, String> conf) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
KafkaMsgQueryParam kafkaMsgQueryParam = ParamUtils.initKafkaConf(conf);
ParameterTool parameterTool = ParameterTool.fromMap(conf);
env.getConfig().setGlobalJobParameters(parameterTool);
Properties props = new Properties();
props.put("bootstrap.servers", kafkaMsgQueryParam.getServers());
props.put("zookeeper.connect", kafkaMsgQueryParam.getZookeeper());
props.put("group.id", "group1");
props.put("enable.auto.commit", false);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.records", 1000);
SingleOutputStreamOperator<String> dataStreamSource =
env.addSource(new FlinkKafkaConsumer<>(kafkaMsgQueryParam.getTopic(),
new SimpleStringSchema(), props)).setParallelism(1);
// TODO 根据date、startTime、endTime过滤
BucketingSink<String> hdfsSink = new BucketingSink<>(kafkaMsgQueryParam.getHdfsDest());
//创建一个按照时间创建目录的bucketer,默认是yyyy-MM-dd--HH,时区默认是美国时间。这里我都改了,一天创建一次目录,上海时间
hdfsSink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai")));
//设置每个文件的最大大小 ,默认是384M(1024 * 1024 * 384)
hdfsSink.setBatchSize(1024 * 1024 * 384);
//设置多少时间,就换一个文件写
hdfsSink.setBatchRolloverInterval(1000 * 60 * 60);
hdfsSink.setPendingSuffix("ccc");
hdfsSink.setInactiveBucketThreshold(60 * 1000L);
hdfsSink.setInactiveBucketCheckInterval(60 * 1000L);
hdfsSink.setAsyncTimeout(60 * 1000);
dataStreamSource.addSink(hdfsSink);
try {
env.execute("国泰交易日志脱敏job");
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.zorkdata.datamask.util;
/**
* @author 谢森
* @Description 配置文件工具类
* @Email xiesen@zork.com.cn
*/
public class ConfigUtils {
public static final String EMPTY_STR = "";
public static final String NULL_STR = "null";
public static String getString(String value, String defaultValue) {
String result = value == null || EMPTY_STR.equals(value) || NULL_STR.equals(value) ? defaultValue : value;
return result;
}
public static Integer getInteger(Integer value, Integer defaultValue) {
Integer result = value < 0 ? defaultValue : value;
return result;
}
public static Double getDouble(Double value, Double defaultValue) {
Double result = value == null ? defaultValue : value;
return result;
}
public static Float getFloat(Float value, Float defaultValue) {
Float result = value == null ? defaultValue : value;
return result;
}
public static Long getLong(Long value, Long defaultValue) {
Long result = value == null ? defaultValue : value;
return result;
}
public static Boolean getBoolean(Boolean value, Boolean defaultValue) {
Boolean result = value == null ? defaultValue : value;
return result;
}
}
package com.zorkdata.datamask.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
/**
* @author 谢森
* @Description 时间处理工具类
* @Email xiesen310@163.com
* @Date 2020/10/21 14:39
*/
public class DateUtils {
public static final Logger logger = LoggerFactory.getLogger(DateUtils.class);
private static SimpleDateFormat utcFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
/**
* UTC时间转
*
* @param utcTime UTC时间
* @return unix时间戳
*/
public static Long utc2timestamp(String utcTime) {
//时区定义并进行时间获取
utcFormatter.setTimeZone(TimeZone.getTimeZone("asia/shanghai"));
Date gpsUtcDate = null;
try {
gpsUtcDate = utcFormatter.parse(utcTime);
} catch (ParseException e) {
logger.error("时间戳格式转换异常:{} 原因: {}", utcTime, e.getMessage());
return null;
}
return gpsUtcDate.getTime();
}
}
package com.zorkdata.datamask.util;
import com.zorkdata.datamask.constant.ParamConstants;
import com.zorkdata.datamask.constant.RegExpConstants;
import com.zorkdata.datamask.domain.HdfsLogQueryParam;
import com.zorkdata.datamask.domain.KafkaMsgQueryParam;
import java.util.HashMap;
import java.util.Map;
/**
* @author 谢森
* @Description 参数工具类
* @Email xiesen310@163.com
* @Date 2020/10/21 14:42
*/
public class ParamUtils {
/**
* 初始化配置文件
*
* @param conf
*/
public static HdfsLogQueryParam initHadoopConf(Map conf) {
String source = String.valueOf(conf.get(ParamConstants.SOURCE)).trim();
String hdfsSrc = String.valueOf(conf.get(ParamConstants.HDFS_SRC)).trim();
String hdfsDest = String.valueOf(conf.get(ParamConstants.HDFS_DEST)).trim();
String core = String.valueOf(conf.get(ParamConstants.CORE)).trim();
String date = String.valueOf(conf.get(ParamConstants.DATE)).trim();
Long startTime = Long.parseLong(String.valueOf(conf.get(ParamConstants.START_TIME)).trim());
Long endTime = Long.parseLong(String.valueOf(conf.get(ParamConstants.END_TIME)).trim());
return new HdfsLogQueryParam(source, hdfsSrc, hdfsDest, core, date, startTime, endTime);
}
public static KafkaMsgQueryParam initKafkaConf(Map conf) {
String servers = String.valueOf(conf.get(ParamConstants.SERVERS)).trim();
String zookeeper = String.valueOf(conf.get(ParamConstants.ZOOKEEPER)).trim();
String topic = String.valueOf(conf.get(ParamConstants.TOPIC)).trim();
String hdfsDest = String.valueOf(conf.get(ParamConstants.HDFS_DEST)).trim();
String core = String.valueOf(conf.get(ParamConstants.CORE)).trim();
String date = String.valueOf(conf.get(ParamConstants.DATE)).trim();
Long startTime = Long.parseLong(String.valueOf(conf.get(ParamConstants.START_TIME)).trim());
Long endTime = Long.parseLong(String.valueOf(conf.get(ParamConstants.END_TIME)).trim());
return new KafkaMsgQueryParam(servers, zookeeper, topic, hdfsDest, core, date, startTime, endTime);
}
public static MaskUtil initMaskUtil(Map conf) {
Map regularExpressions = (HashMap)conf.get(RegExpConstants.REG_EXP);
String nameRegExp = String.valueOf(regularExpressions.get(RegExpConstants.NAME_REG_EXP)).trim();
String mobileRegExp = String.valueOf(regularExpressions.get(RegExpConstants.MOBILE_REG_EXP)).trim();
String phoneRegExp = String.valueOf(regularExpressions.get(RegExpConstants.PHONE_REG_EXP)).trim();
String emailRegExp = String.valueOf(regularExpressions.get(RegExpConstants.EMAIL_REG_EXP)).trim();
String idRegExp15 = String.valueOf(regularExpressions.get(RegExpConstants.ID15_REG_EXP)).trim();
String idRegExp18 = String.valueOf(regularExpressions.get(RegExpConstants.ID18_REG_EXP)).trim();
String bankCardRegExp = String.valueOf(regularExpressions.get(RegExpConstants.BANK_CARD_REG_EXP)).trim();
String addressRegExp = String.valueOf(regularExpressions.get(RegExpConstants.ADDRESS_REG_EXP)).trim();
String ipRegExp = String.valueOf(regularExpressions.get(RegExpConstants.IP_REG_EXP)).trim();
String macRegExp = String.valueOf(regularExpressions.get(RegExpConstants.MAC_REG_EXP)).trim();
return new MaskUtil(nameRegExp, mobileRegExp, phoneRegExp, emailRegExp, idRegExp15, idRegExp18, bankCardRegExp, addressRegExp, ipRegExp, macRegExp);
}
}
package com.zorkdata.datamask.util;
import org.apache.flink.api.java.utils.ParameterTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @author 谢森
* @Description 参数读取工具类
* @Email xiesen@zork.com.cn
*/
public class ZorkParameterUtil {
private static final Logger logger = LoggerFactory.getLogger(com.zorkdata.datamask.util.ZorkParameterUtil.class);
public static final String YML_SUFFIX = "yml";
/**
* 读取参数
*
* @param args 参数名称
* 这里默认使用 configPath 参数来标识配置文件的路径
* @return
*/
public static Map<String, String> readParameter(String[] args) {
Map<String, String> conf = null;
String configPath;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
configPath = parameterTool.get("conf");
} catch (Exception e) {
throw new RuntimeException("读取配置文件失败,请检查配置路径.");
}
logger.info("read config path is " + configPath);
if (!configPath.endsWith(YML_SUFFIX)) {
System.err.println("Please input correct configuration file and flink run mode!");
throw new RuntimeException("Please input correct configuration file and flink run mode!");
} else {
conf = LoadConf.loadYaml(configPath);
if (conf == null) {
logger.error("配置文件" + args[0] + "不存在,系统退出");
throw new RuntimeException("配置文件" + args[0] + "不存在,系统退出");
}
}
return conf;
}
}
package com.zorkdata.desensitization;
import com.alibaba.fastjson.JSON;
import java.util.ArrayList;
import java.util.List;
/**
* @author: LiaoMingtao
* @date: 2020/10/29
*/
public class Test {
public static void main(String[] args) {
// String a = "{a},{b}";
  • This block of commented-out lines of code should be removed. 📘

Please register or sign in to reply
// String[] pathStrings = getPathStrings(a);
// for (String b: pathStrings) {
// System.out.println(b);
// }
  • 及时清理不再使用的代码段或配置信息。 📘

Please register or sign in to reply
String a = "a,b,";
String[] pathStrings = getPathStrings(a);
for (String b: pathStrings) {
System.out.println(b);
  • Replace this use of System.out or System.err by a logger. 📘

Please register or sign in to reply
System.out.printf("--");
  • Replace this use of System.out or System.err by a logger. 📘 String contains no format specifiers. 📘

Please register or sign in to reply
}
}
private static String[] getPathStrings(String commaSeparatedPaths) {
int length = commaSeparatedPaths.length();
int curlyOpen = 0;
int pathStart = 0;
boolean globPattern = false;
List<String> pathStrings = new ArrayList();
for(int i = 0; i < length; ++i) {
char ch = commaSeparatedPaths.charAt(i);
switch(ch) {
  • 🚫 Add a default case to this switch. 📘 🚫 switch中每个case需要通过break/return等来终止 📘 🚫 switch块缺少default语句 📘

Please register or sign in to reply
case ',':
if (!globPattern) {
pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
pathStart = i + 1;
}
break;
case '{':
++curlyOpen;
if (!globPattern) {
globPattern = true;
}
break;
case '}':
--curlyOpen;
if (curlyOpen == 0 && globPattern) {
globPattern = false;
}
}
}
pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
System.out.println(JSON.toJSONString(pathStrings));
  • Replace this use of System.out or System.err by a logger. 📘

Please register or sign in to reply
return (String[])pathStrings.toArray(new String[0]);
  • 🔽 Remove this unnecessary cast to "String[]". 📘

Please register or sign in to reply
}
}
package com.zorkdata.desensitization;
import com.zorkdata.desensitization.constans.ConfigConstants;
import com.zorkdata.desensitization.exception.ZorkException;
import com.zorkdata.desensitization.hadoop.HdfsLogDesensitization;
import com.zorkdata.desensitization.utils.YmlUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* @author: LiaoMingtao
* @date: 2020/10/22
*/
@Slf4j
public class TransactionLogDesensitization {
private static final int PARAM_LENGTH = 2;
public static void main(String[] args) {
if (args.length != PARAM_LENGTH) {
String error = "参数缺失,请输入配置文件,例如: --conf /opt/TransactionLogDesensitization/application.yml";
log.error(error);
System.exit(1);
}
try {
Map<String, String> conf = YmlUtil.getParams(args);
String source = conf.get(ConfigConstants.SOURCE);
if (ConfigConstants.HDFS.equals(source)) {
new HdfsLogDesensitization().initConf(conf).desensitizationHdfsLog();
}
if (ConfigConstants.KAFKA.equals(source)) {
// TODO kafka
  • Complete the task associated to this TODO comment. 📘

Please register or sign in to reply
}
} catch (ZorkException e) {
log.info(String.valueOf(e));
System.exit(1);
}
}
}
package com.zorkdata.datamask.util.avro;
package com.zorkdata.desensitization.avro;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
......@@ -9,60 +10,52 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @author xiesen
* @Description 反序列化
* @Email xiesen310@163.com
* @Date 2020/6/28 9:31
* avro 数据反序列化
*
* @author Administrator
*/
@Slf4j
public class AvroDeserializer {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializer.class);
public JSONObject jsonObject;
public JSONArray jsonArray;
public Schema schema;
public String[] keys;
public AvroDeserializer(String schema) {
getKeysFromjson(schema);
}
public static final String FIELDS = "fields";
public static final String NAME = "name";
/**
* @param schema:Avro序列化所使用的schema
* @return void 返回类型
* @throws
* @Title: getKeysFromjson
* @Description:用于获取Avro的keys
*/
void getKeysFromjson(String schema) {
this.jsonObject = JSONObject.parseObject(schema);
private Schema schema;
private String[] keys;
private JSONObject jsonObject;
private JSONArray jsonArray;
public AvroDeserializer(String schema) {
this.schema = new Schema.Parser().parse(schema);
this.jsonArray = this.jsonObject.getJSONArray("fields");
this.keys = new String[this.jsonArray.size()];
for (int i = 0; i < this.jsonArray.size(); i++) {
this.keys[i] = this.jsonArray.getJSONObject(i).get("name").toString();
this.jsonObject = JSONObject.parseObject(schema);
this.jsonArray = jsonObject.getJSONArray(FIELDS);
this.keys = new String[jsonArray.size()];
for (int i = 0; i < jsonArray.size(); i++) {
keys[i] = jsonArray.getJSONObject(i).get(NAME).toString();
}
}
/**
* @param body 参数:byte[] body:kafka消息。
* @param @return 设定文件
* @return String 返回类型
* @throws
* @Title: deserializing
* @Description: 用于Avro的反序列化。
* Avro 的反序列化
*
* @param body 数:byte[] body:kafka消息
* @return GenericRecord
*/
public GenericRecord deserializing(byte[] body) {
DatumReader<GenericData.Record> datumReader = new GenericDatumReader<GenericData.Record>(this.schema);
public GenericRecord deserialize(byte[] body) {
DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(this.schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(body, null);
GenericData.Record result = null;
try {
result = datumReader.read(null, decoder);
} catch (Exception e) {
LOGGER.error(String.format("error Avro反序列化"), e);
} catch (IOException e) {
log.error("avro 反序列化失败", e);
}
return result;
}
}
package com.zorkdata.datamask.util.avro;
package com.zorkdata.desensitization.avro;
/**
* @author xiesen
* @Description 反序列化工厂类
* @Email xiesen310@163.com
* @Date 2020/6/28 9:31
* AvroDeserializerFactory
*
* @author Administrator
*/
public class AvroDeserializerFactory {
private static AvroDeserializer logs = null;
private AvroDeserializerFactory() {
}
private static AvroDeserializer logs = null;
private static AvroDeserializer metrics = null;
public static void init() {
......@@ -19,11 +21,11 @@ public class AvroDeserializerFactory {
/**
* getLogsDeserializer
*
* @return
* @return AvroDeserializer
*/
public static AvroDeserializer getLogsDeserializer() {
if (logs == null) {
logs = new AvroDeserializer(LogAvroMacroDef.metadata);
logs = new AvroDeserializer(AvroSchemaDef.ZORK_LOG_SCHEMA);
}
return logs;
}
......@@ -31,12 +33,12 @@ public class AvroDeserializerFactory {
/**
* getLogsDeserializer
*
* @return
* @return AvroDeserializer
*/
// public static AvroDeserializer getMetricDeserializer() {
// if (metrics == null) {
// metrics = new AvroDeserializer(MetricAvroMacroDef.metadata);
// }
// return metrics;
// }
public static AvroDeserializer getMetricDeserializer() {
if (metrics == null) {
metrics = new AvroDeserializer(AvroSchemaDef.ZORK_METRIC_SCHEMA);
}
return metrics;
}
}
package com.zorkdata.datamask.util.avro;
package com.zorkdata.desensitization.avro;
/**
* @author xiesen
* @Description 日志集 schema 定义
* @Email xiesen310@163.com
* @Date 2020/6/28 9:33
* avro schema 定义
*
* @author Administrator
*/
public class LogAvroMacroDef {
public static String metadata = "{\n" +
public final class AvroSchemaDef {
/**
* 指标schema定义
*/
public static final String ZORK_METRIC_SCHEMA = "{\n" +
" \"namespace\": \"com.zork.metrics\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"metrics\",\n" +
" \"fields\": [\n" +
" {\n" +
  • 🚫 Define a constant instead of duplicating this literal " {\n" 11 times. 📘

Please register or sign in to reply
" \"name\": \"metricsetname\",\n" +
" \"type\": [\n" +
  • 🚫 Define a constant instead of duplicating this literal " "type": [\n" 11 times. 📘

Please register or sign in to reply
" \"string\",\n" +
  • 🚫 Define a constant instead of duplicating this literal " "string",\n" 6 times. 📘

Please register or sign in to reply
" \"null\"\n" +
  • 🚫 Define a constant instead of duplicating this literal " "null"\n" 6 times. 📘

Please register or sign in to reply
" ]\n" +
  • 🚫 Define a constant instead of duplicating this literal " ]\n" 11 times. 📘

Please register or sign in to reply
" },\n" +
  • 🚫 Define a constant instead of duplicating this literal " },\n" 9 times. 📘

Please register or sign in to reply
" {\n" +
" \"name\": \"timestamp\",\n" +
" \"type\": [\n" +
" \"string\",\n" +
" \"null\"\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"dimensions\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
  • 🚫 Define a constant instead of duplicating this literal " "null",\n" 5 times. 📘

Please register or sign in to reply
" {\n" +
  • 🚫 Define a constant instead of duplicating this literal " {\n" 5 times. 📘

Please register or sign in to reply
" \"type\": \"map\",\n" +
  • 🚫 Define a constant instead of duplicating this literal " "type": "map",\n" 5 times. 📘

Please register or sign in to reply
" \"values\": \"string\"\n" +
  • 🚫 Define a constant instead of duplicating this literal " "values": "string"\n" 3 times. 📘

Please register or sign in to reply
" }\n" +
  • 🚫 Define a constant instead of duplicating this literal " }\n" 5 times. 📘

Please register or sign in to reply
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"metrics\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"map\",\n" +
" \"values\": \"double\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
"}";
/**
* 日志指标集定义
*/
public static final String ZORK_LOG_SCHEMA = "{\n" +
" \"namespace\": \"com.zork.logs\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"logs\",\n" +
......@@ -72,4 +120,8 @@ public class LogAvroMacroDef {
" }\n" +
" ]\n" +
"}";
private AvroSchemaDef() {
}
}
package com.zorkdata.datamask.util.avro;
package com.zorkdata.desensitization.avro;
/**
* @author xiesen
* @Description 序列化工厂类
* @Email xiesen310@163.com
* @Date 2020/6/28 9:32
* AvroSerializerFactory
*
* @author Administrator
*/
@SuppressWarnings("all")
public class AvroSerializerFactory {
private AvroSerializerFactory() {
}
private static AvroSerializer metricMetadata = null;
private static AvroSerializer logMetadata = null;
/**
* 获取日志avro结构
*
* @return AvroSerializer
*/
public static AvroSerializer getLogAvroSerializer() {
if (logMetadata == null) {
logMetadata = new AvroSerializer(LogAvroMacroDef.metadata);
logMetadata = new AvroSerializer(AvroSchemaDef.ZORK_LOG_SCHEMA);
}
return logMetadata;
}
// public static AvroSerializer getMetricAvroSerializer() {
// if (metricMetadata == null) {
// metricMetadata = new AvroSerializer(MetricAvroMacroDef.metadata);
// }
// return metricMetadata;
// }
/**
* 获取指标avro结构
*
* @return AvroSerializer
*/
public static AvroSerializer getMetricAvroSerializer() {
if (metricMetadata == null) {
metricMetadata = new AvroSerializer(AvroSchemaDef.ZORK_METRIC_SCHEMA);
}
return metricMetadata;
}
}
package com.zorkdata.desensitization.config;
import com.zorkdata.desensitization.constans.RegExpConstants;
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
* @author: LiaoMingtao
* @date: 2020/10/26
*/
@Data
public class RegularExpressions implements Serializable {
private static final long serialVersionUID = -9099923348992729289L;
/**
* 姓名正则表达式
*/
private String nameRegExp;
  • Remove this unused "nameRegExp" private field. 📘

Please register or sign in to reply
/**
* 手机号正则表达式
*/
private String mobileRegExp;
  • Remove this unused "mobileRegExp" private field. 📘

Please register or sign in to reply
/**
* 手机号正则表达式
*/
private String phoneRegExp;
  • Remove this unused "phoneRegExp" private field. 📘

Please register or sign in to reply
/**
* 邮箱正则表达式
*/
private String emailRegExp;
  • Remove this unused "emailRegExp" private field. 📘

Please register or sign in to reply
/**
* 身份证正则表达式
*/
private String idRegExp;
  • Remove this unused "idRegExp" private field. 📘

Please register or sign in to reply
/**
* 银行卡号正则表达式
*/
private String bankCardRegExp;
  • Remove this unused "bankCardRegExp" private field. 📘

Please register or sign in to reply
/**
* 家庭住址正则表达式
*/
private String addressRegExp;
  • Remove this unused "addressRegExp" private field. 📘

Please register or sign in to reply
/**
* ip地址正则表达式
*/
private String ipRegExp;
  • Remove this unused "ipRegExp" private field. 📘

Please register or sign in to reply
/**
* mac地址正则表达式
*/
private String macRegExp;
  • Remove this unused "macRegExp" private field. 📘

Please register or sign in to reply
public RegularExpressions(){
}
@SuppressWarnings("all")
public RegularExpressions(Map conf){
Map regularExpressions = (HashMap)conf.get(RegExpConstants.REG_EXP);
this.nameRegExp = String.valueOf(regularExpressions.get(RegExpConstants.NAME_REG_EXP)).trim();
this.mobileRegExp = String.valueOf(regularExpressions.get(RegExpConstants.MOBILE_REG_EXP)).trim();
this.phoneRegExp = String.valueOf(regularExpressions.get(RegExpConstants.PHONE_REG_EXP)).trim();
this.emailRegExp = String.valueOf(regularExpressions.get(RegExpConstants.EMAIL_REG_EXP)).trim();
this.idRegExp = String.valueOf(regularExpressions.get(RegExpConstants.ID_REG_EXP)).trim();
this.bankCardRegExp = String.valueOf(regularExpressions.get(RegExpConstants.BANK_CARD_REG_EXP)).trim();
this.addressRegExp = String.valueOf(regularExpressions.get(RegExpConstants.ADDRESS_REG_EXP)).trim();
this.ipRegExp = String.valueOf(regularExpressions.get(RegExpConstants.IP_REG_EXP)).trim();
this.macRegExp = String.valueOf(regularExpressions.get(RegExpConstants.MAC_REG_EXP)).trim();
}
}
package com.zorkdata.desensitization.constans;
/**
* 配置文件常量key
*
* @author: LiaoMingtao
* @date: 2020/10/26
*/
public final class ConfigConstants {
public static final String SOURCE = "source";
public static final String PARALLELISM = "parallelism";
public static final String MAX_FILE_NUM = "max_file_num";
public static final String HDFS_URI = "hdfs_uri";
public static final String HDFS_USER = "hdfs_user";
public static final String HDFS_SRC = "hdfs_src";
public static final String HDFS_DEST = "hdfs_dest";
public static final String CORE = "core";
public static final String START_TIME = "start_time";
public static final String END_TIME = "end_time";
public static final String SERVERS = "servers";
public static final String ZOOKEEPER = "zookeeper";
public static final String TOPIC = "topic";
public static final String HDFS = "hdfs";
public static final String KAFKA = "kafka";
public static final String FIELDS_WHITE_LIST = "fields_white_list";
private ConfigConstants() {
}
}
package com.zorkdata.desensitization.constans;
/**
* 符号常量
*
* @author: LiaoMingtao
* @date: 2020/10/26
*/
public final class GeneralConstants {
  • Add a private constructor to hide the implicit public one. 📘

Please register or sign in to reply
public static final String COMMA = ",";
public static final String FILE_SEPARATOR = "/";
public static final String AVRO_SUFFIX = ".avro";
public static final String EMPTY_STR = "";
}
package com.zorkdata.desensitization.constans;
/**
* 正则常量key
*
* @author: LiaoMingtao
* @date: 2020/10/26
*/
public final class RegExpConstants {
  • Add a private constructor to hide the implicit public one. 📘

Please register or sign in to reply
public static final String REG_EXP = "reg_exp";
public static final String NAME_REG_EXP = "name";
public static final String MOBILE_REG_EXP = "mobile";
public static final String PHONE_REG_EXP = "phone";
public static final String EMAIL_REG_EXP = "email";
public static final String ID_REG_EXP = "id";
public static final String BANK_CARD_REG_EXP = "bank_card";
public static final String ADDRESS_REG_EXP = "address";
public static final String IP_REG_EXP = "ip";
public static final String MAC_REG_EXP = "mac";
}
package com.zorkdata.desensitization.exception;
/**
* @author: LiaoMingtao
* @date: 2020/10/26
*/
public class ZorkException extends Exception {
public ZorkException() {
}
/**
* 提供一个有参数的构造方法,可自动生成
*
* @param msg 异常信息
*/
public ZorkException(String msg) {
// 把参数传递给Throwable的带String参数的构造方法
super(msg);
}
}
package com.zorkdata.datamask.util;
package com.zorkdata.desensitization.function;
import com.alibaba.fastjson.JSON;
import com.zorkdata.desensitization.config.RegularExpressions;
import java.io.Serializable;
import java.util.*;
......@@ -6,105 +9,32 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Description:
*
* @author: wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date: Create in 2020/9/23 9:30
* @author: LiaoMingtao
* @date: 2020/10/26
*/
public class MaskUtil implements Serializable {
public class DesensitizationFunction implements Serializable {
private static final long serialVersionUID = 1L;
public static final int DEFAULT_MAP_CAPACITY = 16;
/**
* 数据格式信息
*/
//todo 抽取到配置文件
private List<String> dataFormats = new ArrayList<String>(){{
add(",");
add(".");
add("@");
add("-");
}};
/**
* 姓名正则
*/
private String nameRegExp;
/**
* 手机号正则
*/
private String mobileRegExp;
/**
* 电话号码正则
*/
private String phoneRegExp;
/**
* 邮箱正则
*/
private String emailRegExp;
/**
* 身份证号码(15位)正则
*/
private String idRegExp15;
private RegularExpressions regularExpressions;
/**
* 身份证号码(18位)正则
*/
private String idRegExp18;
private List<Pattern> patterns = new ArrayList<>();
/**
* 银行卡号码正则
*/
private String bankCardRegExp;
/**
* 家庭住址正则
*/
private String addressRegExp;
/**
* ip地址正则
*/
private String ipRegExp;
/**
* mac地址正则
*/
private String macRegExp;
List<Pattern> patterns = new ArrayList<Pattern>() {{
}};
public MaskUtil(String nameRegExp, String mobileRegExp, String phoneRegExp, String emailRegExp, String idRegExp15, String idRegExp18, String bankCardRegExp, String addressRegExp, String ipRegExp, String macRegExp) {
this.nameRegExp = nameRegExp;
this.mobileRegExp = mobileRegExp;
this.phoneRegExp = phoneRegExp;
this.emailRegExp = emailRegExp;
this.idRegExp15 = idRegExp15;
this.idRegExp18 = idRegExp18;
this.bankCardRegExp = bankCardRegExp;
this.addressRegExp = addressRegExp;
this.ipRegExp = ipRegExp;
this.macRegExp = macRegExp;
public DesensitizationFunction(RegularExpressions regularExpressions) {
this.regularExpressions = regularExpressions;
}
public Map mask(Map map, ArrayList whiteList) {
patterns.add(Pattern.compile(this.nameRegExp));
patterns.add(Pattern.compile(this.macRegExp));
patterns.add(Pattern.compile(this.emailRegExp));
patterns.add(Pattern.compile(this.ipRegExp));
patterns.add(Pattern.compile(this.idRegExp18));
patterns.add(Pattern.compile(this.idRegExp15));
patterns.add(Pattern.compile(this.bankCardRegExp));
patterns.add(Pattern.compile(this.mobileRegExp));
patterns.add(Pattern.compile(this.phoneRegExp));
patterns.add(Pattern.compile(this.addressRegExp));
public Map desensitization(Map map, List<String> whiteList, List<String> dataFormats) {
  • 🚫 Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed. 📘

Please register or sign in to reply
patterns.add(Pattern.compile(regularExpressions.getNameRegExp()));
patterns.add(Pattern.compile(regularExpressions.getMacRegExp()));
patterns.add(Pattern.compile(regularExpressions.getEmailRegExp()));
patterns.add(Pattern.compile(regularExpressions.getIpRegExp()));
patterns.add(Pattern.compile(regularExpressions.getIpRegExp()));
patterns.add(Pattern.compile(regularExpressions.getBankCardRegExp()));
patterns.add(Pattern.compile(regularExpressions.getMobileRegExp()));
patterns.add(Pattern.compile(regularExpressions.getPhoneRegExp()));
patterns.add(Pattern.compile(regularExpressions.getAddressRegExp()));
map.forEach((k, v) -> {
if (!whiteList.contains(k)) {
......@@ -134,18 +64,20 @@ public class MaskUtil implements Serializable {
}
public static void main(String[] args) {
MaskUtil maskUtil = new MaskUtil("[\\u4e00-\\u9fa5]{1,20}|[a-zA-Z\\\\.\\\\s]{1,20}",
"((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))\\d{8}",
"0\\d{2,3}-\\d{7,8}",
"[a-zA-Z0-9]+@[a-zA-Z0-9]+(\\.[a-zA-Z0-9]+)+",
"[1-9]\\d{7}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}",
"[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}([0-9Xx])",
"([1-9]{1})(\\d{11}|\\d{15}|\\d{16}|\\d{17}|\\d{18})",
"([\u4E00-\u9FA5A-Za-z0-9_]+(省|市|区|县|道|路|街|号|弄|条|室)){2,}",
"((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)",
"([A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}");
Map map = new HashMap(DEFAULT_MAP_CAPACITY);
RegularExpressions regularExpressions = new RegularExpressions();
regularExpressions.setAddressRegExp("([\u4E00-\u9FA5A-Za-z0-9_]+(省|市|区|县|道|路|街|号|弄|条|室)){2,}");
regularExpressions.setBankCardRegExp("([1-9]{1})(\\d{11}|\\d{15}|\\d{16}|\\d{17}|\\d{18})");
regularExpressions.setEmailRegExp("[a-zA-Z0-9]+@[a-zA-Z0-9]+(\\.[a-zA-Z0-9]+)+");
regularExpressions.setIdRegExp("[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}([0-9Xx])");
regularExpressions.setIpRegExp("((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)");
regularExpressions.setMacRegExp("([A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}");
regularExpressions.setMobileRegExp("((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))\\d{8}");
regularExpressions.setPhoneRegExp("0\\d{2,3}-\\d{7,8}");
regularExpressions.setNameRegExp("[\\u4e00-\\u9fa5]{1,20}|[a-zA-Z\\\\.\\\\s]{1,20}");
DesensitizationFunction desensitizationFunction = new DesensitizationFunction(regularExpressions);
Map map = new HashMap(16);
map.put("姓名", "王海鹰");
map.put("身份证号", "372925199101195158");
map.put("手机号", "15000101879");
......@@ -162,10 +94,14 @@ public class MaskUtil implements Serializable {
map.put("normalFields", "13811110000-110101199003075517-上海市浦东新区张江微电子港-zorkdata@163.com-123456789-wanghaiying123-王海鹰-192.168.1.1-00-50-56-C0-00-08-6227002470170278192");
String[] fieldsWhiteListArray = "messid,fundid,custid,orgid,brhid,secuid,bankcode,market,ordersno,ordergroup,count,poststr,stkcode,bsflag,orderamt,price,qty,bankcode,tacode,ofcode,transacc,taacc".split(",");
ArrayList< String> fieldsWhiteList = new ArrayList<String>(fieldsWhiteListArray.length);
List< String> fieldsWhiteList = new ArrayList<>(fieldsWhiteListArray.length);
Collections.addAll(fieldsWhiteList, fieldsWhiteListArray);
System.out.println(maskUtil.mask(map, fieldsWhiteList));
List<String> dataFormats = new ArrayList<String>(){{
  • Move the contents of this initializer to a standard constructor or to field initializers. 📘 🔽 Use another way to initialize this instance. 📘

Please register or sign in to reply
add(",");
add(".");
add("@");
add("-");
}};
System.out.println(JSON.toJSONString(desensitizationFunction.desensitization(map, fieldsWhiteList, dataFormats)));
  • Replace this use of System.out or System.err by a logger. 📘

Please register or sign in to reply
}
}
/**
* @author: LiaoMingtao
* @date: 2020/10/22
*/
package com.zorkdata.desensitization;
\ No newline at end of file
package com.zorkdata.datamask.domain;
package com.zorkdata.desensitization.schmea;
import lombok.Data;
import org.apache.hadoop.io.WritableComparable;
......
package com.zorkdata.desensitization.utils;
import lombok.extern.slf4j.Slf4j;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* @author: LiaoMingtao
* @date: 2020/10/22
*/
@Slf4j
public class DateUtil {
  • Add a private constructor to hide the implicit public one. 📘

Please register or sign in to reply
private static final String NULL = "";
private static final String BAR_STRING = "-";
private static final String TIME_ZONE = "Asia/shanghai";
  • Remove this unused "TIME_ZONE" private field. 📘

Please register or sign in to reply
private static ThreadLocal<SimpleDateFormat> UTC_FORMATTER = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"));
  • 🔽 Rename this field "UTC_FORMATTER" to match the regular expression '^[a-z][a-zA-Z0-9]*$'. 📘

Please register or sign in to reply
private static ThreadLocal<SimpleDateFormat> YYYY_MM_DD_HH_MM_SS_FORMATTER = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
  • 🔽 Rename this field "YYYY_MM_DD_HH_MM_SS_FORMATTER" to match the regular expression '^[a-z][a-zA-Z0-9]*$'. 📘

Please register or sign in to reply
private static ThreadLocal<SimpleDateFormat> YYYY_MM_DD_FORMATTER = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
  • 🔽 Rename this field "YYYY_MM_DD_FORMATTER" to match the regular expression '^[a-z][a-zA-Z0-9]*$'. 📘

Please register or sign in to reply
/**
* 形如yyyy-MM-dd转yyyymmdd
*
* @param time 日期
* @return yyyymmdd的日期
*/
public static String date2date(String time) {
return time.replace(BAR_STRING, NULL);
}
/**
* 批量转日期 去除日期中的‘-’
*
* @param timeList 例如 2019-10-11 的日期列表
* @return List<String>
*/
public static List<String> date2date(List<String> timeList) {
List<String> resultList = new ArrayList<>();
for (String time : timeList) {
String tempTime = date2date(time);
resultList.add(tempTime);
}
return resultList;
}
/**
* 获取两个时间段内的所有日期,日期可跨年
*
* @param begin 开始时间 yyyy-MM-dd HH:mm:ss
* @param end 结束时间 yyyy-MM-dd HH:mm:ss
* @return List<String>
*/
public static List<String> getBetweenDate(String begin, String end) {
List<String> betweenList = new ArrayList<>();
Calendar startDay = Calendar.getInstance();
try {
startDay.setTime(YYYY_MM_DD_HH_MM_SS_FORMATTER.get().parse(begin));
startDay.add(Calendar.DATE, -1);
while (true) {
startDay.add(Calendar.DATE, 1);
Date newDate = startDay.getTime();
String newEnd = YYYY_MM_DD_FORMATTER.get().format(newDate);
betweenList.add(newEnd);
if (end.startsWith(newEnd)) {
break;
}
}
} catch (ParseException e) {
log.error(String.valueOf(e));
}
return betweenList;
}
/**
* 普通时间转时间戳
*
* @param time 普通时间
* @return 时间戳
*/
public static long time2Timestamp(String time) {
Date gpsUtcDate = null;
try {
gpsUtcDate = YYYY_MM_DD_HH_MM_SS_FORMATTER.get().parse(time);
return gpsUtcDate.getTime();
} catch (ParseException e) {
log.error("时间戳格式转换异常:{} 原因: {}", time, e.getMessage());
}
return 0L;
}
/**
* UTC时间转
*
* @param utcTime UTC时间
* @return unix时间戳
*/
public static Long utc2timestamp(String utcTime) {
//时区定义并进行时间获取
// UTC_FORMATTER.get().setTimeZone(TimeZone.getTimeZone(TIME_ZONE));
  • This block of commented-out lines of code should be removed. 📘 及时清理不再使用的代码段或配置信息。 📘

Please register or sign in to reply
Date gpsUtcDate = null;
try {
gpsUtcDate = UTC_FORMATTER.get().parse(utcTime);
} catch (ParseException e) {
log.error("时间戳格式转换异常:{} 原因: {}", utcTime, e.getMessage());
return null;
}
return gpsUtcDate.getTime();
}
}
package com.zorkdata.datamask.util;
package com.zorkdata.desensitization.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
......@@ -16,12 +15,11 @@ import java.util.*;
*
* @author: zhuzhigang
**/
@Slf4j
public class LoadConf {
private static final Logger LOG = LoggerFactory.getLogger(com.zorkdata.datamask.util.LoadConf.class);
public static final int DEFAULT_MAP_CAPACITY = 16;
public LoadConf() {
private LoadConf() {
}
public static List<URL> findResources(String name) {
......@@ -32,7 +30,6 @@ public class LoadConf {
while (resources.hasMoreElements()) {
ret.add(resources.nextElement());
}
return ret;
} catch (IOException var3) {
throw new RuntimeException(var3);
......@@ -42,7 +39,6 @@ public class LoadConf {
public static Map findAndReadYaml(String name, boolean mustExist, boolean canMultiple) {
InputStream in = null;
boolean confFileEmpty = false;
try {
in = getConfigFileInputStream(name, canMultiple);
if (null != in) {
......@@ -55,7 +51,6 @@ public class LoadConf {
confFileEmpty = true;
}
if (mustExist) {
if (confFileEmpty) {
throw new RuntimeException("Config file " + name + " doesn't have any valid storm configs");
......@@ -90,7 +85,7 @@ public class LoadConf {
} else if (resources.size() > 1 && !canMultiple) {
throw new IOException("Found multiple " + configFilePath + " resources. You're probably bundling the Storm jars with your topology jar. " + resources);
} else {
LOG.info("Using " + configFilePath + " from resources");
log.info("Using " + configFilePath + " from resources");
URL resource = (URL) resources.iterator().next();
return resource.openStream();
}
......@@ -125,7 +120,6 @@ public class LoadConf {
}
}
}
Map ret = new HashMap(DEFAULT_MAP_CAPACITY);
ret.putAll(properties);
return ret;
......
package com.zorkdata.desensitization.utils;
import com.zorkdata.desensitization.exception.ZorkException;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.utils.ParameterTool;
import java.util.HashMap;
import java.util.Map;
/**
* @author: LiaoMingtao
* @date: 2020/10/22
*/
@Slf4j
public class YmlUtil {
private YmlUtil() {
}
private static final String YML_SUFFIX = "yml";
private static final String CONF_PARAM = "conf";
private static final int DEFAULT_PARAMS_MAP_LENGTH = 10;
@SuppressWarnings("all")
public static Map<String, String> getParams(String[] args) throws ZorkException {
Map<String, String> confMap = new HashMap<>(DEFAULT_PARAMS_MAP_LENGTH);
String configPath = null;
ParameterTool parameterTool = ParameterTool.fromArgs(args);
configPath = parameterTool.get(CONF_PARAM);
log.info("read config path is {}", configPath);
if (!configPath.endsWith(YML_SUFFIX)) {
log.info("Please input correct configuration file and flink run mode!");
} else {
confMap = LoadConf.loadYaml(configPath);
if (confMap.isEmpty()) {
log.error("配置文件" + args[0] + "不存在,系统退出");
throw new ZorkException(String.format("配置文件%s不存在,系统退出", args[0]));
}
}
return confMap;
}
}
# 日志来源,支持hdfs和kafka,必传
source: "hdfs"
# hdfs日志源文件地址,若source为hdfs,则该地址必传
hdfs_src: "hdfs://cdh-2:8020/tmp/datawarehouse4/jzjy/kcbp_biz_log"
# hdfs日志写入地址,非必传,默认写到hdfs-src目录下的output目录下
hdfs_dest: "hdfs://cdh-2:8020/tmp/datawarehouse/jzjy/kcbp_biz_log/output7/"
# 任务配置
# 并行度
parallelism: "1"
# 文件个数、此为最大文件个数合并为一个任务,防止任务由于打开文件个数导致任务挂
max_file_num: "300"
# 脱敏结果下载到的本地路径
download_path: "/tmp"
# 数据来源,支持hdfs和kafka,必传
source: "hdfs"
# 交易日志的“核心”信息,值以c开头、后面是数字序号,非必传
core: "c9"
# 查询日志日期(默认为当天),非必传
date: 20200929
# 查询日志起始时间戳,非必传
start_time: 1601348849900
# 查询日志起始
start_time: "2020-09-29 11:07:29"
# 查询日志结束
end_time: "2020-09-29 11:07:30"
# 查询日志结束时间戳,非必传
end_time: 1601348850000
# hadoop 相关配置
# hdfs 地址
hdfs_uri: "hdfs://cdh-2:8020/"
# hdfs 用户名
hdfs_user: "hdfs"
# hdfs日志源文件地址,若source为hdfs,则该地址必传
hdfs_src: "hdfs://cdh-2:8020/tmp/datawarehouse4/jzjy/kcbp_biz_log/"
# hdfs日志写入地址,非必传,默认写到hdfs-src目录下的output目录下
hdfs_dest: "hdfs://cdh-2:8020/tmp/datawarehouse/jzjy/kcbp_biz_log/output7/"
# 不做脱敏的字段白名单
fieldsWhiteList: "messid,fundid,custid,orgid,brhid,secuid,bankcode,market,ordersno,ordergroup,count,poststr,stkcode,bsflag,\
orderamt,price,qty,bankcode,tacode,ofcode,transacc,taacc,indexTime,logchecktime,end_logtime,collecttime,deserializerTime,versioninfo,fmillsecond,smillsecond"
fields_white_list: "messid,fundid,custid,orgid,brhid,secuid,bankcode,market,ordersno,ordergroup,count,poststr,stkcode,bsflag,orderamt,price,qty,bankcode,tacode,ofcode,transacc,taacc,indexTime,logchecktime,end_logtime,collecttime,deserializerTime,versioninfo,fmillsecond,smillsecond"
# 脱敏用的正则表达式
reg_exp:
# 姓名正则
name: "[\u4e00-\u9fa5]{1,20}|[a-zA-Z\\.\\s]{1,20}"
name: '(\u8d75|\u94b1|\u5b59|\u674e|\u5468|\u5434|\u90d1|\u738b|\u51af|\u9648|\u696e|\u536b|\u848b|\u6c88|\u97e9|\u6768|\u6731|\u79e6|\u5c24|\u8bb8|\u4f55|\u5415|\u65bd|\u5f20|\u5b54|\u66f9|\u4e25|\u534e|\u91d1|\u9b4f|\u9676|\u59dc|\u621a|\u8c22|\u90b9|\u55bb|\u67cf|\u6c34|\u7aa6|\u7ae0|\u4e91|\u82cf|\u6f58|\u845b|\u595a|\u8303|\u5f6d|\u90ce|\u9c81|\u97e6|\u660c|\u9a6c|\u82d7|\u51e4|\u82b1|\u65b9|\u4fde|\u4efb|\u8881|\u67f3|\u9146|\u9c8d|\u53f2|\u5510|\u8d39|\u5ec9|\u5c91|\u859b|\u96f7|\u8d3a|\u502a|\u6c64|\u6ed5|\u6bb7|\u7f57|\u6bd5|\u90dd|\u90ac|\u5b89|\u5e38|\u4e50|\u4e8e|\u65f6|\u5085|\u76ae|\u535e|\u9f50|\u5eb7|\u4f0d|\u4f59|\u5143|\u535c|\u987e|\u5b5f|\u5e73|\u9ec4|\u548c|\u7a46|\u8427|\u5c39|\u59da|\u90b5|\u6e5b|\u6c6a|\u7941|\u6bdb|\u79b9|\u72c4|\u7c73|\u8d1d|\u660e|\u81e7|\u8ba1|\u4f0f|\u6210|\u6234|\u8c08|\u5b8b|\u8305|\u5e9e|\u718a|\u7eaa|\u8212|\u5c48|\u9879|\u795d|\u8463|\u6881|\u675c|\u962e|\u84dd|\u95fd|\u5e2d|\u5b63|\u9ebb|\u5f3a|\u8d3e|\u8def|\u5a04|\u5371|\u6c5f|\u7ae5|\u989c|\u90ed|\u6885|\u76db|\u6797|\u5201|\u953a|\u5f90|\u4e18|\u9a86|\u9ad8|\u590f|\u8521|\u7530|\u6a0a|\u80e1|\u51cc|\u970d|\u865e|\u4e07|\u652f|\u67ef|\u661d|\u7ba1|\u5362|\u83ab|\u7ecf|\u623f|\u88d8|\u7f2a|\u5e72|\u89e3|\u5e94|\u5b97|\u4e01|\u5ba3|\u8d32|\u9093|\u90c1|\u5355|\u676d|\u6d2a|\u5305|\u8bf8|\u5de6|\u77f3|\u5d14|\u5409|\u94ae|\u9f9a|\u7a0b|\u5d47|\u90a2|\u6ed1|\u88f4|\u9646|\u8363|\u7fc1|\u8340|\u7f8a|\u65bc|\u60e0|\u7504|\u9eb9|\u5bb6|\u5c01|\u82ae|\u7fbf|\u50a8|\u9773|\u6c72|\u90b4|\u7cdc|\u677e|\u4e95|\u6bb5|\u5bcc|\u5deb|\u4e4c|\u7126|\u5df4|\u5f13|\u7267|\u9697|\u5c71|\u8c37|\u8f66|\u4faf|\u5b93|\u84ec|\u5168|\u90d7|\u73ed|\u4ef0|\u79cb|\u4ef2|\u4f0a|\u5bab|\u5b81|\u4ec7|\u683e|\u66b4|\u7518|\u659c|\u5389|\u620e|\u7956|\u6b66|\u7b26|\u5218|\u666f|\u8a79|\u675f|\u9f99|\u53f6|\u5e78|\u53f8|\u97f6|\u90dc|\u9ece|\u84df|\u8584|\u5370|\u5bbf|\u767d|\u6000|\u84b2|\u90b0|\u4ece|\u9102|\u7d22|\u54b8|\u7c4d|\u8d56|\u5353|\u853a|\u5c60|\u8499|\u6c60|\u4e54|\u9634|\u90c1|\u80e5|\u80fd|\u82cd|\u53cc|\u95fb|\u8398|\u515a|\u7fdf|\u8c2d|\u8d21|\u52b3|\u9004|\u59ec|\u7533|\u6276|\u5835|\u5189|\u5bb0|\u90e6|\u96cd|\u90e4|\u74a9|\u6851|\u6842|\u6fee|\u725b|\u5bff|\u901a|\u8fb9|\u6248|\u71d5|\u5180|\u90cf|\u6d66|\u5c1a|\u519c|\u6e29|\u522b|\u5e84|\u664f|\u67f4|\u77bf|\u960e|\u5145|\u6155|\u8fde|\u8339|\u4e60|\u5ba6|\u827e|\u9c7c|\u5bb9|\u5411|\u53e4|\u6613|\u614e|\u6208|\u5ed6|\u5ebe|\u7ec8|\u66a8|\u5c45|\u8861|\u6b65|\u90fd|\u803f|\u6ee1|\u5f18|\u5321|\u56fd|\u6587|\u5bc7|\u5e7f|\u7984|\u9619|\u4e1c|\u6b27|\u6bb3|\u6c83|\u5229|\u851a|\u8d8a|\u5914|\u9686|\u5e08|\u5de9|\u538d|\u8042|\u6641|\u52fe|\u6556|\u878d|\u51b7|\u8a3e|\u8f9b|\u961a|\u90a3|\u7b80|\u9976|\u7a7a|\u66fe|\u6bcb|\u6c99|\u4e5c|\u517b|\u97a0|\u987b|\u4e30|\u5de2|\u5173|\u84af|\u76f8|\u67e5|\u540e|\u8346|\u7ea2|\u6e38|\u7afa|\u6743|\u9011|\u76d6|\u76ca|\u6853|\u516c|\u4e07\u4fdf|\u53f8\u9a6c|\u4e0a\u5b98|\u6b27\u9633|\u590f\u4faf|\u8bf8\u845b|\u95fb\u4eba|\u4e1c\u65b9|\u8d6b\u8fde|\u7687\u752b|\u5c09\u8fdf|\u516c\u7f8a|\u6fb9\u53f0|\u516c\u51b6|\u5b97\u653f|\u6fee\u9633|\u6df3\u4e8e|\u5355\u4e8e|\u592a\u53d4|\u7533\u5c60|\u516c\u5b59|\u4ef2\u5b59|\u8f69\u8f95|\u4ee4\u72d0|\u953a\u79bb|\u5b87\u6587|\u957f\u5b59|\u6155\u5bb9|\u9c9c\u4e8e|\u95fe\u4e18|\u53f8\u5f92|\u53f8\u7a7a|\u4e0c\u5b98|\u53f8\u5bc7|\u4ec9|\u7763|\u5b50\u8f66|\u989b\u5b59|\u7aef\u6728|\u5deb\u9a6c|\u516c\u897f|\u6f06\u96d5|\u4e50\u6b63|\u58e4\u9a77|\u516c\u826f|\u62d3\u62d4|\u5939\u8c37|\u5bb0\u7236|\u8c37\u6881|\u664b|\u695a|\u960e|\u6cd5|\u6c5d|\u9122|\u6d82|\u94a6|\u6bb5\u5e72|\u767e\u91cc|\u4e1c\u90ed|\u5357\u95e8|\u547c\u5ef6|\u5f52|\u6d77|\u7f8a\u820c|\u5fae\u751f|\u5cb3|\u5e05|\u7f11|\u4ea2|\u51b5|\u540e|\u6709|\u7434|\u6881\u4e18|\u5de6\u4e18|\u4e1c\u95e8|\u897f\u95e8|\u5546|\u725f|\u4f58|\u4f74|\u4f2f|\u8d4f|\u5357\u5bab|\u58a8|\u54c8|\u8c2f|\u7b2a|\u5e74|\u7231|\u9633|\u4f5f|\u7b2c\u4e94|\u8a00|\u798f)(\w{1,1})'
# 手机号正则
mobile: "((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))\\d{8}"
mobile: '((\+|00)86)?((134\d{4})|((13[0-3|5-9]|14[1|5-9]|15[0-9]|16[2|5|6|7]|17[0-8]|18[0-9]|19[0-2|5-9])\d{8}))'
# 电话号码正则
phone: "0\\d{2,3}-\\d{7,8}"
phone: '((((010)|(0[2-9]\d{1,2}))[-\s]?)[1-9]\d{6,7}$)|((\+?0?86\-?)?1[3|4|5|7|8][0-9]\d{8}$)'
# 邮箱正则
email: "[a-zA-Z0-9]+@[a-zA-Z0-9]+(\\.[a-zA-Z0-9]+)+"
# 身份证号码(15位)正则
id15: "[1-9]\\d{7}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}"
# 身份证号码(18位)正则
id18: "[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}([0-9Xx])"
email: '([a-zA-Z0-9_-])+@([a-zA-Z0-9_-])+((\.[a-zA-Z0-9_-]{1,4}){1,4})'
# 身份证号码正则
id: '[1-9]\d{5}(18|19|([23]\d))\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\d{3}[0-9Xx]$)|(^[1-9]\d{5}\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\d{2}'
# 银行卡号
bank_card: "([1-9]{1})(\\d{11}|\\d{15}|\\d{16}|\\d{17}|\\d{18})"
bank_card: '(([13-79]\d{3})|(2[1-9]\d{2})|(20[3-9]\d)|(8[01-79]\d{2}))\s?\d{4}\s?\d{4}\s?\d{4}(\s?\d{3})?$'
# 家庭住址正则
address: "([\u4E00-\u9FA5A-Za-z0-9_]+(省|市|区|县|道|路|街|号|弄|条|室)){2,}"
address: '([\u4E00-\u9FA5A-Za-z0-9_]+(省|市|自治区|自治州|区|县|镇|道|路|街|号|弄|条|室)){1,}'
# ip地址正则
ip: "((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)"
ip: '((2[0-4]\d|25[0-5]|[01]?\d\d?)\.){3}(2[0-4]\d|25[0-5]|[01]?\d\d?)'
# mac地址正则
mac: "([A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}"
mac: '[A-F0-9]{2}([-:]?[A-F0-9]{2})([-:.]?[A-F0-9]{2})([-:]?[A-F0-9]{2})([-:.]?[A-F0-9]{2})([-:]?[A-F0-9]{2})$'
\ No newline at end of file
# 日志来源,支持hdfs和kafka,必传
source: "hdfs"
# hdfs日志源文件地址,若source为hdfs,则该地址必传
hdfs_src: "hdfs://cdh-2:8020/tmp/datawarehouse4/jzjy/kcbp_biz_log"
# hdfs日志写入地址,非必传,默认写到hdfs-src目录下的output目录下
hdfs_dest: "hdfs://cdh-2:8020/tmp/datawarehouse/jzjy/kcbp_biz_log/output7/"
# 脱敏结果下载到的本地路径
download_path: "/tmp"
# 交易日志的“核心”信息,值以c开头、后面是数字序号,非必传
core: "c9"
# 查询日志日期(默认为当天),非必传
date: 20200929
# 查询日志起始时间戳,非必传
start_time: 1601348849900
# 查询日志结束时间戳,非必传
end_time: 1601348850000
# 不做脱敏的字段白名单
fieldsWhiteList: "messid,fundid,custid,orgid,brhid,secuid,bankcode,market,ordersno,ordergroup,count,poststr,stkcode,bsflag,orderamt,price,qty,bankcode,tacode,ofcode,transacc,taacc,indexTime,logchecktime,end_logtime,collecttime,deserializerTime,versioninfo,fmillsecond,smillsecond"
# 脱敏用的正则表达式
reg_exp:
# 姓名正则
name: "[\u4e00-\u9fa5]{1,20}|[a-zA-Z\\.\\s]{1,20}"
# 手机号正则
mobile: "((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))\\d{8}"
# 电话号码正则
phone: "0\\d{2,3}-\\d{7,8}"
# 邮箱正则
email: "[a-zA-Z0-9]+@[a-zA-Z0-9]+(\\.[a-zA-Z0-9]+)+"
# 身份证号码(15位)正则
id15: "[1-9]\\d{7}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}"
# 身份证号码(18位)正则
id18: "[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}([0-9Xx])"
# 银行卡号
bank_card: "([1-9]{1})(\\d{11}|\\d{15}|\\d{16}|\\d{17}|\\d{18})"
# 家庭住址正则
address: "([\u4E00-\u9FA5A-Za-z0-9_]+(省|市|区|县|道|路|街|号|弄|条|室)){2,}"
# ip地址正则
ip: "((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)"
# mac地址正则
mac: "([A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}"
#!/usr/bin/env bash
export base_path=$(cd `dirname $0`; pwd)
deploy_path=${base_path%/*}
if [ ! -d "$deploy_path/logs" ]; then
mkdir -p $deploy_path/logs
fi
FLINK_TASK_CONF=application.yml
parallelism=1
flink run $deploy_path/lib/transactionLogMask-0.1.jar --conf $deploy_path/conf/$FLINK_TASK_CONF> $deploy_path/logs/transactionLogMask-0.1.log &
\ No newline at end of file
  • SonarQube analysis reported 110 issues

    • 🚫 21 critical
    • 70 major
    • 🔽 18 minor
    • 1 info

    Watch the comments in this conversation to review them.

    22 extra issues

    Note: The following issues were found on lines that were not modified in the commit. Because these issues can't be reported as line comments, they are summarized here:

    1. 🚫 Change this "try" to a try-with-resources. (sonar.java.source not set. Assuming 7 or greater.) 📘
    2. 🚫 Refactor this code to not throw exceptions in finally blocks. 📘
    3. 🚫 Refactor this code to not throw exceptions in finally blocks. 📘
    4. Rename "jsonObject" which hides the field declared at line 39. 📘
    5. Remove this expression which always evaluates to "true" 📘
    6. A "List" cannot contain a "K" 📘
    7. Replace this use of System.out or System.err by a logger. 📘
    8. Define and throw a dedicated exception instead of using a generic one. 📘
    9. Define and throw a dedicated exception instead of using a generic one. 📘
    10. Define and throw a dedicated exception instead of using a generic one. 📘
    11. Define and throw a dedicated exception instead of using a generic one. 📘
    12. Define and throw a dedicated exception instead of using a generic one. 📘
    13. Remove this throw statement from this finally block. 📘
    14. Use the URI class instead. 📘
    15. Define and throw a dedicated exception instead of using a generic one. 📘
    16. Define and throw a dedicated exception instead of using a generic one. 📘
    17. Define and throw a dedicated exception instead of using a generic one. 📘
    18. Remove this throw statement from this finally block. 📘
    19. 🔽 Make this IP "192.168.70.2" address configurable. 📘
    20. 🔽 Immediately return this expression instead of assigning it to the temporary variable "var7". 📘
    21. 🔽 Immediately return this expression instead of assigning it to the temporary variable "var19". 📘
    22. 🔽 Remove this unnecessary cast to "URL". 📘
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment