<dev>
1. 修改工程,后初始化完成
| package com.zorkdata.desensitization.config; | |||
| import com.zorkdata.desensitization.constans.GeneralConstants; | |||
| import lombok.Data; | |||
| import java.io.Serializable; | |||
| import java.util.ArrayList; | |||
| import java.util.List; | |||
| import java.util.stream.Collectors; | |||
| /** | |||
| * @author: LiaoMingtao | |||
| * @date: 2021/2/23 | |||
| */ | |||
| @Data | |||
| public class JobConfig implements Serializable { | |||
| private static final long serialVersionUID = 693924914570906529L; | |||
| public JobConfig(JobInitConfig jobInitConfig) { | |||
| this.setJobName(jobInitConfig.getJobName()); | |||
| this.setSourceParallelism(jobInitConfig.getSourceParallelism()); | |||
| this.setTransformerParallelism(jobInitConfig.getTransformerParallelism()); | |||
| this.setSinkParallelism(jobInitConfig.getSinkParallelism()); | |||
| this.setAvroOutputSchema(jobInitConfig.getAvroOutputSchema()); | |||
| this.setHdfsUri(jobInitConfig.getHdfsUri()); | |||
| this.setHdfsUser(jobInitConfig.getHdfsUser()); | |||
| this.setHdfsSrc(jobInitConfig.getHdfsUri() + jobInitConfig.getHdfsSrc()); | |||
| this.setHdfsDest(jobInitConfig.getHdfsUri() + jobInitConfig.getHdfsDest()); | |||
| this.setMatchHostname(jobInitConfig.getMatchHostname()); | |||
| this.setStartTime(jobInitConfig.getStartTime()); | |||
| this.setEndTime(jobInitConfig.getEndTime()); | |||
| this.setStartTimestamp(jobInitConfig.getStartTimestamp()); | |||
| this.setEndTimestamp(jobInitConfig.getEndTimestamp()); | |||
| String password = jobInitConfig.getPassword() + GeneralConstants.COMMA; | |||
| String name = jobInitConfig.getName() + GeneralConstants.COMMA; | |||
| String mobile = jobInitConfig.getMobile() + GeneralConstants.COMMA; | |||
| String phone = jobInitConfig.getPhone() + GeneralConstants.COMMA; | |||
| String email = jobInitConfig.getEmail() + GeneralConstants.COMMA; | |||
| String id = jobInitConfig.getId() + GeneralConstants.COMMA; | |||
| String address = jobInitConfig.getAddress() + GeneralConstants.COMMA; | |||
| String bankCard = jobInitConfig.getBankCard() + GeneralConstants.COMMA; | |||
| String ip = jobInitConfig.getIp() + GeneralConstants.COMMA; | |||
| String mac = jobInitConfig.getMac() + GeneralConstants.COMMA; | |||
| String position = jobInitConfig.getPosition() + GeneralConstants.COMMA; | |||
| String allKey = password + name + mobile + phone + email + id + address + bankCard + ip + mac + position; | |||
| String[] keys = allKey.split(GeneralConstants.COMMA); | |||
| List<String> keyList = new ArrayList<>(); | |||
| for (String key : keys) { | |||
| if (!GeneralConstants.EMPTY_STR.equals(key)) { | |||
| keyList.add(key); | |||
| } | |||
| } | |||
| keyList = keyList.stream().distinct().collect(Collectors.toList()); | |||
| this.setDesensitizationKeyList(keyList); | |||
| } | |||
| private String jobName; | |||
|
|||
| private int sourceParallelism; | |||
|
|||
| private int transformerParallelism; | |||
|
|||
| private int sinkParallelism; | |||
|
|||
| private String avroOutputSchema; | |||
|
|||
| private String hdfsUri; | |||
|
|||
| private String hdfsUser; | |||
|
|||
| private String hdfsSrc; | |||
|
|||
| private String hdfsDest; | |||
|
|||
| private String matchHostname; | |||
|
|||
| private String startTime; | |||
|
|||
| private String endTime; | |||
|
|||
| private long startTimestamp; | |||
|
|||
| private long endTimestamp; | |||
|
|||
| private List<String> desensitizationKeyList; | |||
|
|||
| } | |||
| package com.zorkdata.desensitization.config; | |||
| import com.zorkdata.desensitization.avro.AvroSchemaDef; | |||
| import com.zorkdata.desensitization.constans.ConfigConstants; | |||
| import com.zorkdata.desensitization.utils.DateUtils; | |||
| import lombok.Data; | |||
| import org.apache.avro.Schema; | |||
| import org.apache.commons.collections.MapUtils; | |||
| import java.io.Serializable; | |||
| import java.util.Map; | |||
| /** | |||
| * @author: LiaoMingtao | |||
| * @date: 2021/2/23 | |||
| */ | |||
| @Data | |||
| public class JobInitConfig implements Serializable { | |||
| private static final long serialVersionUID = -1959581564693543666L; | |||
| public JobInitConfig(Map<String, String> conf) { | |||
| this.jobName = String.valueOf(conf.get(ConfigConstants.JOB_NAME)); | |||
| this.sourceParallelism = Integer.parseInt(conf.get(ConfigConstants.SOURCE_PARALLELISM)); | |||
| this.transformerParallelism = Integer.parseInt(conf.get(ConfigConstants.TRANSFORMER_PARALLELISM)); | |||
| this.sinkParallelism = Integer.parseInt(conf.get(ConfigConstants.SINK_PARALLELISM)); | |||
| this.avroOutputSchema = new Schema.Parser().parse(AvroSchemaDef.ZORK_LOG_SCHEMA).toString(true); | |||
| this.hdfsUri = String.valueOf(conf.get(ConfigConstants.HDFS_URI)).trim(); | |||
| this.hdfsUser = String.valueOf(conf.get(ConfigConstants.HDFS_USER)).trim(); | |||
| this.hdfsSrc = String.valueOf(conf.get(ConfigConstants.HDFS_SRC)).trim(); | |||
| this.hdfsDest = String.valueOf(conf.get(ConfigConstants.HDFS_DEST)).trim(); | |||
| this.matchHostname = String.valueOf(conf.get(ConfigConstants.MATCH_HOSTNAME)).trim(); | |||
| this.startTime = String.valueOf(conf.get(ConfigConstants.START_TIME)); | |||
| this.endTime = String.valueOf(conf.get(ConfigConstants.END_TIME)); | |||
| this.startTimestamp = DateUtils.time2Timestamp(startTime); | |||
| this.endTimestamp = DateUtils.time2Timestamp(endTime); | |||
| this.name = MapUtils.getString(conf, ConfigConstants.NAME_KEY); | |||
| this.mobile = MapUtils.getString(conf, ConfigConstants.MOBILE_KEY); | |||
| this.phone = MapUtils.getString(conf, ConfigConstants.PHONE_KEY); | |||
| this.email = MapUtils.getString(conf, ConfigConstants.EMAIL_KEY); | |||
| this.id = MapUtils.getString(conf, ConfigConstants.ID_KEY); | |||
| this.bankCard = MapUtils.getString(conf, ConfigConstants.BANK_CARD_KEY); | |||
| this.address = MapUtils.getString(conf, ConfigConstants.ADDRESS_KEY); | |||
| this.ip = MapUtils.getString(conf, ConfigConstants.IP_KEY); | |||
| this.mac = MapUtils.getString(conf, ConfigConstants.MAC_KEY); | |||
| this.position = MapUtils.getString(conf, ConfigConstants.POSITION_KEY); | |||
| this.password = MapUtils.getString(conf, ConfigConstants.POSITION_KEY); | |||
| } | |||
| private String jobName; | |||
|
|||
| private int sourceParallelism; | |||
|
|||
| private int transformerParallelism; | |||
|
|||
| private int sinkParallelism; | |||
|
|||
| private String avroOutputSchema; | |||
|
|||
| private String hdfsUri; | |||
|
|||
| private String hdfsUser; | |||
|
|||
| private String hdfsSrc; | |||
|
|||
| private String hdfsDest; | |||
|
|||
| private String matchHostname; | |||
|
|||
| private String startTime; | |||
| private String endTime; | |||
| private long startTimestamp; | |||
|
|||
| private long endTimestamp; | |||
|
|||
| private String name; | |||
|
|||
| private String mobile; | |||
|
|||
| private String phone; | |||
|
|||
| private String email; | |||
|
|||
| /** | |||
| * 身份证号 | |||
| */ | |||
| private String id; | |||
|
|||
| private String bankCard; | |||
|
|||
| private String address; | |||
|
|||
| private String ip; | |||
|
|||
| private String mac; | |||
|
|||
| /** | |||
| * 持仓信息 | |||
| */ | |||
| private String position; | |||
|
|||
| private String password; | |||
|
|||
| } | |||
| ... | @@ -21,10 +21,23 @@ public final class ConfigConstants { | ... | @@ -21,10 +21,23 @@ public final class ConfigConstants { |
| public static final String HDFS_USER = "hdfs_user"; | public static final String HDFS_USER = "hdfs_user"; | ||
| public static final String HDFS_SRC = "hdfs_src"; | public static final String HDFS_SRC = "hdfs_src"; | ||
| public static final String HDFS_DEST = "hdfs_dest"; | public static final String HDFS_DEST = "hdfs_dest"; | ||
| public static final String MATCH_HOSTNAME = "match.hostname"; | |||
| public static final String CORE = "core"; | public static final String CORE = "core"; | ||
| public static final String START_TIME = "start_time"; | public static final String START_TIME = "start_time"; | ||
| public static final String END_TIME = "end_time"; | public static final String END_TIME = "end_time"; | ||
| public static final String NAME_KEY = "name"; | |||
| public static final String MOBILE_KEY = "mobile"; | |||
| public static final String PHONE_KEY = "phone"; | |||
| public static final String EMAIL_KEY = "email"; | |||
| public static final String ID_KEY = "id"; | |||
| public static final String BANK_CARD_KEY = "bank_card"; | |||
| public static final String ADDRESS_KEY = "address"; | |||
| public static final String IP_KEY = "ip"; | |||
| public static final String MAC_KEY = "mac"; | |||
| public static final String POSITION_KEY = "position"; | |||
| public static final String PASSWORD_KEY = "password"; | |||
|
|||
| public static final String SERVERS = "servers"; | public static final String SERVERS = "servers"; | ||
| public static final String ZOOKEEPER = "zookeeper"; | public static final String ZOOKEEPER = "zookeeper"; | ||
| public static final String TOPIC = "topic"; | public static final String TOPIC = "topic"; | ||
| ... | ... | ||
| package com.zorkdata.desensitization.function; | package com.zorkdata.desensitization.function; | ||
| import com.alibaba.fastjson.JSON; | import com.alibaba.fastjson.JSON; | ||
| import com.zorkdata.desensitization.config.RegularExpressions; | import com.alibaba.fastjson.TypeReference; | ||
| import com.zorkdata.desensitization.config.JobConfig; | |||
| import com.zorkdata.desensitization.schmea.LogData; | |||
| import org.apache.flink.api.common.functions.RichFlatMapFunction; | |||
| import org.apache.flink.api.java.tuple.Tuple2; | |||
| import org.apache.flink.util.Collector; | |||
| import java.io.Serializable; | |||
| import java.util.*; | |||
| import java.util.regex.Matcher; | |||
| import java.util.regex.Pattern; | |||
| /** | /** | ||
| * @author: LiaoMingtao | * @author: LiaoMingtao | ||
| * @date: 2020/10/26 | * @date: 2021/2/24 | ||
| */ | */ | ||
| public class DesensitizationFunction implements Serializable { | public class DesensitizationFunction<T, R> extends RichFlatMapFunction<Tuple2<Object, Object>, LogData> { | ||
|
|
|||
| private static final long serialVersionUID = 1L; | private JobConfig jobConfig; | ||
|
|||
| private RegularExpressions regularExpressions; | public DesensitizationFunction(JobConfig jobConfig) { | ||
| this.jobConfig = jobConfig; | |||
| private List<Pattern> patterns = new ArrayList<>(); | |||
| public DesensitizationFunction(RegularExpressions regularExpressions) { | |||
| this.regularExpressions = regularExpressions; | |||
| patterns.add(Pattern.compile(regularExpressions.getIdRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getBankCardRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getPhoneRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getMobileRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getAddressRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getPositionExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getNameRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getMacRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getEmailRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getIpRegExp())); | |||
| } | } | ||
| public Map<String, String> desensitization(Map<String, String> map, | @Override | ||
| List<String> whiteList, List<String> dataFormats) { | public void flatMap(Tuple2<Object, Object> value, Collector<LogData> collector) throws Exception { | ||
| Iterator<Map.Entry<String, String>> entryIterator = map.entrySet().iterator(); | LogData logData = JSON.parseObject(value.getField(0).toString(), new TypeReference<LogData>() { | ||
| while (entryIterator.hasNext()) { | |||
| Map.Entry<String, String> next = entryIterator.next(); | |||
| String key = next.getKey(); | |||
| String value = next.getValue(); | |||
| if (!whiteList.contains(key)) { | |||
| // 执行脱敏操作 | |||
| for (Pattern pattern : patterns) { | |||
| Matcher matcher = pattern.matcher(value); | |||
| if (matcher.find()) { | |||
| String replaceStr = ""; | |||
| for (int i = 0; i < matcher.group().length(); i++) { | |||
| String s = String.valueOf(matcher.group().charAt(i)); | |||
| if(dataFormats.contains(s)){ | |||
| replaceStr = replaceStr.concat(s); | |||
| }else{ | |||
| replaceStr = replaceStr.concat("*"); | |||
| } | |||
| } | |||
| value = value.replace(matcher.group(), replaceStr); | |||
| map.put(key, value); | |||
| } | |||
| } | |||
| } | |||
| } | |||
| return map; | |||
| } | |||
| public Map desensitizationTemp(Map map, List<String> whiteList, List<String> dataFormats) { | |||
| map.forEach((k, v) -> { | |||
| if (!whiteList.contains(k)) { | |||
| String value = v.toString(); | |||
| for (Pattern pattern : patterns) { | |||
| Matcher matcher = pattern.matcher(value); | |||
| if (matcher.find()) { | |||
| String replaceStr = ""; | |||
| for (int i = 0; i < matcher.group().length(); i++) { | |||
| String s = String.valueOf(matcher.group().charAt(i)); | |||
| if(dataFormats.contains(s)){ | |||
| replaceStr = replaceStr.concat(s); | |||
| }else{ | |||
| replaceStr = replaceStr.concat("*"); | |||
| } | |||
| } | |||
| value = value.replace(matcher.group(), replaceStr); | |||
| System.out.println("\n"); | |||
| } | |||
| } | |||
| map.put(k, value); | |||
| } else { | |||
| map.put(k, v); | |||
| } | |||
| }); | }); | ||
| return map; | |||
| collector.collect(logData); | |||
| } | } | ||
| } | } | ||
| package com.zorkdata.desensitization.function; | |||
| import com.zorkdata.desensitization.config.RegularExpressions; | |||
| import java.io.Serializable; | |||
| import java.util.*; | |||
| import java.util.regex.Matcher; | |||
| import java.util.regex.Pattern; | |||
| /** | |||
| * @author: LiaoMingtao | |||
| * @date: 2020/10/26 | |||
| */ | |||
| public class DesensitizationFunctionOld implements Serializable { | |||
| private static final long serialVersionUID = 1L; | |||
| private RegularExpressions regularExpressions; | |||
|
|||
| private List<Pattern> patterns = new ArrayList<>(); | |||
| @Deprecated | |||
| public DesensitizationFunctionOld(RegularExpressions regularExpressions) { | |||
|
|
|||
| this.regularExpressions = regularExpressions; | |||
| patterns.add(Pattern.compile(regularExpressions.getIdRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getBankCardRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getPhoneRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getMobileRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getAddressRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getPositionExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getNameRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getMacRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getEmailRegExp())); | |||
| patterns.add(Pattern.compile(regularExpressions.getIpRegExp())); | |||
| } | |||
| @Deprecated | |||
| public Map<String, String> desensitization(Map<String, String> map, | |||
|
|
|||
| List<String> whiteList, List<String> dataFormats) { | |||
| Iterator<Map.Entry<String, String>> entryIterator = map.entrySet().iterator(); | |||
| while (entryIterator.hasNext()) { | |||
| Map.Entry<String, String> next = entryIterator.next(); | |||
| String key = next.getKey(); | |||
| String value = next.getValue(); | |||
| if (!whiteList.contains(key)) { | |||
| // 执行脱敏操作 | |||
| for (Pattern pattern : patterns) { | |||
| Matcher matcher = pattern.matcher(value); | |||
| if (matcher.find()) { | |||
| String replaceStr = ""; | |||
| for (int i = 0; i < matcher.group().length(); i++) { | |||
| String s = String.valueOf(matcher.group().charAt(i)); | |||
| if(dataFormats.contains(s)){ | |||
| replaceStr = replaceStr.concat(s); | |||
| }else{ | |||
| replaceStr = replaceStr.concat("*"); | |||
| } | |||
| } | |||
| value = value.replace(matcher.group(), replaceStr); | |||
| map.put(key, value); | |||
| } | |||
| } | |||
| } | |||
| } | |||
| return map; | |||
| } | |||
| public Map desensitizationTemp(Map map, List<String> whiteList, List<String> dataFormats) { | |||
|
|||
| map.forEach((k, v) -> { | |||
| if (!whiteList.contains(k)) { | |||
|
|||
| String value = v.toString(); | |||
| for (Pattern pattern : patterns) { | |||
| Matcher matcher = pattern.matcher(value); | |||
| if (matcher.find()) { | |||
| String replaceStr = ""; | |||
| for (int i = 0; i < matcher.group().length(); i++) { | |||
| String s = String.valueOf(matcher.group().charAt(i)); | |||
| if(dataFormats.contains(s)){ | |||
| replaceStr = replaceStr.concat(s); | |||
| }else{ | |||
| replaceStr = replaceStr.concat("*"); | |||
| } | |||
| } | |||
| value = value.replace(matcher.group(), replaceStr); | |||
| System.out.println("\n"); | |||
|
|||
| } | |||
| } | |||
| map.put(k, value); | |||
| } else { | |||
| map.put(k, v); | |||
| } | |||
| }); | |||
| return map; | |||
| } | |||
| } | |||
| package com.zorkdata.desensitization.hadoop; | package com.zorkdata.desensitization.hadoop; | ||
| import com.alibaba.fastjson.JSON; | import com.zorkdata.desensitization.config.JobConfig; | ||
| import com.alibaba.fastjson.TypeReference; | |||
| import com.zorkdata.desensitization.avro.AvroSchemaDef; | |||
| import com.zorkdata.desensitization.config.RegularExpressions; | |||
| import com.zorkdata.desensitization.constans.ConfigConstants; | |||
| import com.zorkdata.desensitization.constans.GeneralConstants; | import com.zorkdata.desensitization.constans.GeneralConstants; | ||
| import com.zorkdata.desensitization.exception.ZorkException; | |||
| import com.zorkdata.desensitization.function.DesensitizationFunction; | import com.zorkdata.desensitization.function.DesensitizationFunction; | ||
| import com.zorkdata.desensitization.schmea.LogData; | import com.zorkdata.desensitization.schmea.LogData; | ||
| import com.zorkdata.desensitization.utils.DateUtil; | import com.zorkdata.desensitization.utils.DateUtils; | ||
| import lombok.extern.slf4j.Slf4j; | import lombok.extern.slf4j.Slf4j; | ||
| import org.apache.avro.Schema; | |||
| import org.apache.avro.mapred.AvroInputFormat; | import org.apache.avro.mapred.AvroInputFormat; | ||
| import org.apache.avro.mapred.AvroKey; | import org.apache.avro.mapred.AvroKey; | ||
| import org.apache.avro.mapred.AvroOutputFormat; | import org.apache.avro.mapred.AvroOutputFormat; | ||
| import org.apache.avro.mapred.AvroWrapper; | import org.apache.avro.mapred.AvroWrapper; | ||
| import org.apache.flink.api.common.functions.FlatMapFunction; | |||
| import org.apache.flink.api.common.functions.MapFunction; | import org.apache.flink.api.common.functions.MapFunction; | ||
| import org.apache.flink.api.java.ExecutionEnvironment; | import org.apache.flink.api.java.ExecutionEnvironment; | ||
| import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; | import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; | ||
| ... | @@ -23,8 +18,6 @@ import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; | ... | @@ -23,8 +18,6 @@ import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; |
| import org.apache.flink.api.java.operators.DataSource; | import org.apache.flink.api.java.operators.DataSource; | ||
| import org.apache.flink.api.java.operators.FlatMapOperator; | import org.apache.flink.api.java.operators.FlatMapOperator; | ||
| import org.apache.flink.api.java.tuple.Tuple2; | import org.apache.flink.api.java.tuple.Tuple2; | ||
| import org.apache.flink.api.java.utils.ParameterTool; | |||
| import org.apache.flink.util.Collector; | |||
| import org.apache.hadoop.conf.Configuration; | import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.FileSystem; | import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.LocatedFileStatus; | import org.apache.hadoop.fs.LocatedFileStatus; | ||
| ... | @@ -38,168 +31,122 @@ import java.io.IOException; | ... | @@ -38,168 +31,122 @@ import java.io.IOException; |
| import java.io.Serializable; | import java.io.Serializable; | ||
| import java.net.URI; | import java.net.URI; | ||
| import java.net.URISyntaxException; | import java.net.URISyntaxException; | ||
| import java.util.*; | import java.util.ArrayList; | ||
| import java.util.List; | |||
| /** | /** | ||
| * @author: LiaoMingtao | * @author: LiaoMingtao | ||
| * @date: 2020/10/26 | * @date: 2021/2/24 | ||
| */ | */ | ||
| @Slf4j | @Slf4j | ||
| public class HdfsLogDesensitization implements Serializable { | public class HdfsLogDesensitization implements Serializable { | ||
| private static final long serialVersionUID = -6253583122681202967L; | |||
| private static final long serialVersionUID = 1L; | |||
| private static final String AVRO_OUTPUT_SCHEMA = "avro.output.schema"; | private static final String AVRO_OUTPUT_SCHEMA = "avro.output.schema"; | ||
| private static final String HOSTNAME = "hostname"; | private JobConfig jobConfig; | ||
| private static final List<String> dataFormats = new ArrayList<String>() {{ | |||
| add(","); | |||
| add("."); | |||
| add("@"); | |||
| add("-"); | |||
| add(":"); | |||
| }}; | |||
| private String jobName; | |||
| private int sourceParallelism; | |||
| private int transformerParallelism; | |||
| private int sinkParallelism; | |||
| private int maxFileNum; | |||
| private String avroOutputSchema; | |||
| private List<String> fieldsWhiteList; | |||
| private String core; | |||
| private String hdfsUri; | |||
| private String hdfsUser; | |||
| private String hdfsSrc; | |||
| private String hdfsDest; | |||
| private String startTime; | |||
| private String endTime; | |||
| /** | |||
| * 是否脱敏维度信息 | |||
| */ | |||
| private boolean hasRegDimension; | |||
| private long startTimestamp; | |||
| private long endTimestamp; | |||
| private Map<String, String> confMap; | |||
| private Map<String, String> regularMap; | |||
| public HdfsLogDesensitization initRegular(Map<String, String> regularMap) { | |||
| this.regularMap = regularMap; | |||
| return this; | |||
| } | |||
| public HdfsLogDesensitization initConf(Map<String, String> conf) { | public HdfsLogDesensitization initJobConfig(JobConfig jobConfig) { | ||
| this.jobName = String.valueOf(conf.get(ConfigConstants.JOB_NAME)); | this.jobConfig = jobConfig; | ||
| this.sourceParallelism = Integer.parseInt(conf.get(ConfigConstants.SOURCE_PARALLELISM)); | |||
| this.transformerParallelism = Integer.parseInt(conf.get(ConfigConstants.TRANSFORMER_PARALLELISM)); | |||
| this.sinkParallelism = Integer.parseInt(conf.get(ConfigConstants.SINK_PARALLELISM)); | |||
| String[] fieldsWhiteListArray = String.valueOf(conf.get(ConfigConstants.FIELDS_WHITE_LIST)) | |||
| .trim().split(GeneralConstants.COMMA); | |||
| this.fieldsWhiteList = new ArrayList<>(Arrays.asList(fieldsWhiteListArray)); | |||
| this.avroOutputSchema = new Schema.Parser().parse(AvroSchemaDef.ZORK_LOG_SCHEMA).toString(true); | |||
| this.hdfsUri = String.valueOf(conf.get(ConfigConstants.HDFS_URI)).trim(); | |||
| this.hdfsUser = String.valueOf(conf.get(ConfigConstants.HDFS_USER)).trim(); | |||
| this.hdfsSrc = hdfsUri + String.valueOf(conf.get(ConfigConstants.HDFS_SRC)).trim(); | |||
| this.hdfsDest = hdfsUri + String.valueOf(conf.get(ConfigConstants.HDFS_DEST)).trim(); | |||
| this.core = String.valueOf(conf.get(ConfigConstants.CORE)).trim(); | |||
| this.startTime = String.valueOf(conf.get(ConfigConstants.START_TIME)); | |||
| this.endTime = String.valueOf(conf.get(ConfigConstants.END_TIME)); | |||
| this.startTimestamp = DateUtil.time2Timestamp(startTime); | |||
| this.endTimestamp = DateUtil.time2Timestamp(endTime); | |||
| this.hasRegDimension = Boolean.parseBoolean(conf.get(ConfigConstants.REG_DIMENSION)); | |||
| this.confMap = conf; | |||
| return this; | return this; | ||
| } | } | ||
| public void desensitizationHdfsLog() throws Exception { | public void desensitizationHdfsLog() { | ||
| desensitizationHdfsLog(this.confMap); | desensitizationHdfsLog(this.jobConfig); | ||
| } | } | ||
| public void desensitizationHdfsLog(Map<String, String> conf) throws Exception { | public void desensitizationHdfsLog(JobConfig jobConfig) { | ||
| // 初始化env | // 初始化flink job env | ||
| ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | ||
| JobConf jobConf = new JobConf(); | JobConf jobConf = new JobConf(); | ||
| jobConf.set(AVRO_OUTPUT_SCHEMA, this.avroOutputSchema); | jobConf.set(AVRO_OUTPUT_SCHEMA, jobConfig.getAvroOutputSchema()); | ||
| ParameterTool parameterTool = ParameterTool.fromMap(conf); | // source部分 | ||
| env.getConfig().setGlobalJobParameters(parameterTool); | // 1、通过时间获取文件夹信息 | ||
| RegularExpressions regularExpressions = new RegularExpressions(this.regularMap); | List<String> logFiles = filterHdfsLogFiles(jobConfig.getHdfsSrc(), jobConfig.getHdfsUri(), jobConfig.getHdfsUser()); | ||
| DesensitizationFunction desensitizationFunction = new DesensitizationFunction(regularExpressions); | |||
| // source | |||
| List<String> logFiles = filterHdfsLogFiles(hdfsSrc, hdfsUri, hdfsUser); | |||
| String logFileListString = list2String(logFiles); | String logFileListString = list2String(logFiles); | ||
| HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<> | HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<> | ||
| (new AvroInputFormat(), Object.class, Object.class, jobConf); | (new AvroInputFormat(), Object.class, Object.class, jobConf); | ||
| AvroInputFormat.addInputPaths(hadoopInputFormat.getJobConf(), logFileListString); | AvroInputFormat.addInputPaths(hadoopInputFormat.getJobConf(), logFileListString); | ||
| // 2、创建datasource | |||
| DataSource<Tuple2<Object, Object>> hdfsLogInput = env | DataSource<Tuple2<Object, Object>> hdfsLogInput = env | ||
| .createInput(hadoopInputFormat).setParallelism(sourceParallelism); | .createInput(hadoopInputFormat).setParallelism(jobConfig.getSourceParallelism()); | ||
| // transformer部分 | |||
| // transformer | FlatMapOperator<Tuple2<Object, Object>, LogData> flatMapOperator = | ||
| FlatMapOperator<Tuple2<Object, Object>, Object> maskFlatMapOperator = | hdfsLogInput.flatMap(new DesensitizationFunction<Tuple2<Object, Object>, LogData>(jobConfig)); | ||
| hdfsLogInput.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() { | // sink部分 | ||
| @Override | |||
| public void flatMap(Tuple2<Object, Object> value, Collector<Object> collector) { | |||
| LogData logData = JSON.parseObject(value.getField(0).toString(), | |||
| new TypeReference<LogData>() { | |||
| }); | |||
| //根据日志事件的核心信息做过滤 | |||
| boolean hasCore = (null != core && logData.getDimensions().get(HOSTNAME).contains(core)) | |||
| || "*".equals(core); | |||
| if (hasCore) { | |||
| //根据日志事件的timestamp做过滤 | |||
| Long timestamp = DateUtil.utc2timestamp(logData.getTimestamp()); | |||
| if (null != timestamp && timestamp.compareTo(startTimestamp) >= 0 && | |||
| timestamp.compareTo(endTimestamp) <= 0) { | |||
| Map<String, String> normalFields = logData.getNormalFields(); | |||
| Map desensitization = desensitizationFunction. | |||
| desensitization(normalFields, fieldsWhiteList, dataFormats); | |||
| logData.setNormalFields(desensitization); | |||
| if (hasRegDimension) { | |||
| Map<String, String> dimensions = logData.getDimensions(); | |||
| Map desensitizationDimensions = desensitizationFunction. | |||
| desensitization(dimensions, fieldsWhiteList, dataFormats); | |||
| logData.setDimensions(desensitizationDimensions); | |||
| } | |||
| collector.collect(logData); | |||
| } | |||
| } | |||
| } | |||
| }).setParallelism(transformerParallelism); | |||
| // 获取目标hdfs的输出目录 | // 获取目标hdfs的输出目录 | ||
| String filePath = hdfsDest; | String filePath = jobConfig.getHdfsDest(); | ||
| HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(new AvroOutputFormat(), jobConf); | HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(new AvroOutputFormat(), jobConf); | ||
| FileOutputFormat.setOutputPath(jobConf, new Path(filePath)); | FileOutputFormat.setOutputPath(jobConf, new Path(filePath)); | ||
| // avro序列化算子(.writeAsText("file:///lmt/output"); 本地写入) | |||
| // avro序列化算子 .writeAsText("file:///lmt/output"); | flatMapOperator.map(new MapFunction<LogData, Tuple2<AvroWrapper<LogData>, NullWritable>>() { | ||
| maskFlatMapOperator.map(new MapFunction<Object, Tuple2<AvroWrapper<LogData>, NullWritable>>() { | |||
| @Override | @Override | ||
| public Tuple2<AvroWrapper<LogData>, NullWritable> map(Object value) throws Exception { | public Tuple2<AvroWrapper<LogData>, NullWritable> map(LogData value) throws Exception { | ||
| AvroKey<LogData> key = new AvroKey<>((LogData) value); | AvroKey<LogData> key = new AvroKey<>(value); | ||
| Tuple2<AvroWrapper<LogData>, NullWritable> tuple = new Tuple2<>(key, NullWritable.get()); | return new Tuple2<>(key, NullWritable.get()); | ||
| return tuple; | |||
| } | } | ||
| }).setParallelism(transformerParallelism).output(hadoopOutputFormat).setParallelism(sinkParallelism); | }).setParallelism(jobConfig.getTransformerParallelism()) | ||
| .writeAsText("file:///lmt/output", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE) | |||
| // .output(hadoopOutputFormat) | |||
| .setParallelism(jobConfig.getSinkParallelism()); | |||
| try { | try { | ||
| env.execute(jobName); | env.execute(jobConfig.getJobName()); | ||
| } catch (Exception e) { | } catch (Exception e) { | ||
| log.error(String.valueOf(e)); | log.error(String.valueOf(e)); | ||
| } | } | ||
| } | } | ||
| /** | /** | ||
| * 分组list | * 获取hdfs日志文件的所有文件路径 | ||
| * | * | ||
| * @param list 演示list | * @param hdfsSrc hdfs地址 eg: /tmp/ | ||
| * @return List<String> | * @param hdfsUri hdfs的URI eg: hdfs://cdh-2:8020/ | ||
| * @param hdfsUser hfs用户名 eg: hdfs | |||
| * @return hdfs日志文件的所有文件路径 | |||
| */ | */ | ||
| private List<String> changeList(List<String> list) { | private List<String> filterHdfsLogFiles(String hdfsSrc, String hdfsUri, String hdfsUser) { | ||
|
|||
| List<String> resultList = new ArrayList<>(); | if (!hdfsSrc.endsWith(GeneralConstants.FILE_SEPARATOR)) { | ||
| List<List<String>> lists = subList(list, maxFileNum); | hdfsSrc += GeneralConstants.FILE_SEPARATOR; | ||
|
|||
| lists.forEach(item -> { | } | ||
| String tempString = list2String(item); | String path = hdfsSrc; | ||
| resultList.add(tempString); | Configuration conf = new Configuration(); | ||
| }); | List<String> logFiles = new ArrayList<>(); | ||
| return resultList; | FileSystem fileSystem = null; | ||
| List<String> betweenDate = DateUtils.getBetweenDate(jobConfig.getStartTime(), jobConfig.getEndTime()); | |||
| List<String> dateList = DateUtils.date2date(betweenDate); | |||
| if (!dateList.isEmpty()) { | |||
| try { | |||
| fileSystem = FileSystem.get(new URI(hdfsUri), conf, hdfsUser); | |||
| for (String item : dateList) { | |||
| path = hdfsSrc + item; | |||
| List<String> hdfsLogFiles = null; | |||
| try { | |||
|
|||
| hdfsLogFiles = getHdfsLogFilesByPath(fileSystem, path); | |||
| logFiles.addAll(hdfsLogFiles); | |||
| } catch (ZorkException e) { | |||
| e.printStackTrace(); | |||
| log.error(String.valueOf(e)); | |||
| } | |||
| } | |||
| } catch (IOException e) { | |||
| log.error(String.valueOf(e)); | |||
| } catch (InterruptedException e) { | |||
|
|
|||
| log.error(String.valueOf(e)); | |||
| } catch (URISyntaxException e) { | |||
|
|||
| log.error(String.valueOf(e)); | |||
| } finally { | |||
| if (null != fileSystem) { | |||
| try { | |||
| fileSystem.close(); | |||
| } catch (IOException e) { | |||
| log.error(String.valueOf(e)); | |||
| } | |||
| } | |||
| } | |||
| } else { | |||
| log.warn("{} -- {} 时间段内无数据,请注意时间范围", jobConfig.getStartTime(), jobConfig.getEndTime()); | |||
| } | |||
| return logFiles; | |||
| } | } | ||
| /** | /** | ||
| ... | @@ -250,76 +197,25 @@ public class HdfsLogDesensitization implements Serializable { | ... | @@ -250,76 +197,25 @@ public class HdfsLogDesensitization implements Serializable { |
| * @param path 目录路径 | * @param path 目录路径 | ||
| * @return 文件路径下所有文件全路径 | * @return 文件路径下所有文件全路径 | ||
| */ | */ | ||
| private List<String> getHdfsLogFilesByPath(FileSystem fileSystem, String path) { | private List<String> getHdfsLogFilesByPath(FileSystem fileSystem, String path) throws ZorkException { | ||
| List<String> logFiles = new ArrayList<>(); | List<String> logFiles = new ArrayList<>(); | ||
| try { | try { | ||
| RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path(path), | RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = | ||
| false); | fileSystem.listFiles(new Path(path), false); | ||
| while (locatedFileStatusRemoteIterator.hasNext()) { | while (locatedFileStatusRemoteIterator.hasNext()) { | ||
| LocatedFileStatus next = locatedFileStatusRemoteIterator.next(); | LocatedFileStatus next = locatedFileStatusRemoteIterator.next(); | ||
| long modificationTime = next.getModificationTime(); | long modificationTime = next.getModificationTime(); | ||
| // 根据文件的修改时间做过滤,获取用户指定时间段内的文件 | // 根据文件的修改时间做过滤,获取用户指定时间段内的文件 | ||
| if (modificationTime > startTimestamp) { | if (modificationTime > jobConfig.getStartTimestamp()) { | ||
| Path hdfsFilePath = next.getPath(); | Path hdfsFilePath = next.getPath(); | ||
| logFiles.add(hdfsFilePath.toString()); | logFiles.add(hdfsFilePath.toString()); | ||
| } | } | ||
| } | } | ||
| } catch (IOException e) { | } catch (IOException e) { | ||
| log.error(String.valueOf(e)); | log.error(String.valueOf(e)); | ||
| throw new ZorkException(String.format("IO流异常:%s", e.getMessage())); | |||
| } | } | ||
| return logFiles; | return logFiles; | ||
| } | } | ||
| /** | |||
| * 获取hdfs日志文件的所有文件路径 | |||
| * | |||
| * @param hdfsSrc hdfs地址 | |||
| * @param hdfsUri hdfs的URI | |||
| * @param hdfsUser hfs用户名 | |||
| * @return hdfs日志文件的所有文件路径 | |||
| */ | |||
| private List<String> filterHdfsLogFiles(String hdfsSrc, String hdfsUri, String hdfsUser) { | |||
| // hdfs://cdh-2:8020/ hdfs | |||
| if (!hdfsSrc.endsWith(GeneralConstants.FILE_SEPARATOR)) { | |||
| hdfsSrc += GeneralConstants.FILE_SEPARATOR; | |||
| } | |||
| String path = hdfsSrc; | |||
| Configuration conf = new Configuration(); | |||
| List<String> logFiles = new ArrayList<>(); | |||
| FileSystem fileSystem = null; | |||
| List<String> betweenDate = DateUtil.getBetweenDate(startTime, endTime); | |||
| List<String> dateList = DateUtil.date2date(betweenDate); | |||
| if (dateList.size() > 0) { | |||
| try { | |||
| fileSystem = FileSystem.get(new URI(hdfsUri), conf, hdfsUser); | |||
| for (String item : dateList) { | |||
| path = hdfsSrc + item; | |||
| List<String> hdfsLogFiles = getHdfsLogFilesByPath(fileSystem, path); | |||
| logFiles.addAll(hdfsLogFiles); | |||
| } | |||
| } catch (IOException e) { | |||
| log.error(String.valueOf(e)); | |||
| } catch (InterruptedException e) { | |||
| log.error(String.valueOf(e)); | |||
| } catch (URISyntaxException e) { | |||
| log.error(String.valueOf(e)); | |||
| } finally { | |||
| if (null != fileSystem) { | |||
| try { | |||
| fileSystem.close(); | |||
| } catch (IOException e) { | |||
| log.error(String.valueOf(e)); | |||
| } | |||
| } | |||
| } | |||
| } else { | |||
| log.warn("{} -- {} 时间段内无数据,请注意时间范围", startTime, endTime); | |||
| } | |||
| return logFiles; | |||
| } | |||
| public HdfsLogDesensitization() { | |||
| } | |||
| } | } | ||
| // package com.zorkdata.desensitization.hadoop; | |||
| // | |||
| // import com.alibaba.fastjson.JSON; | |||
|
|||
| // import com.alibaba.fastjson.TypeReference; | |||
| // import com.zorkdata.desensitization.avro.AvroSchemaDef; | |||
| // import com.zorkdata.desensitization.config.RegularExpressions; | |||
| // import com.zorkdata.desensitization.constans.ConfigConstants; | |||
| // import com.zorkdata.desensitization.constans.GeneralConstants; | |||
| // import com.zorkdata.desensitization.function.DesensitizationFunctionOld; | |||
| // import com.zorkdata.desensitization.schmea.LogData; | |||
| // import com.zorkdata.desensitization.utils.DateUtils; | |||
| // import lombok.extern.slf4j.Slf4j; | |||
| // import org.apache.avro.Schema; | |||
| // import org.apache.avro.mapred.AvroInputFormat; | |||
| // import org.apache.avro.mapred.AvroKey; | |||
| // import org.apache.avro.mapred.AvroOutputFormat; | |||
| // import org.apache.avro.mapred.AvroWrapper; | |||
| // import org.apache.flink.api.common.functions.FlatMapFunction; | |||
| // import org.apache.flink.api.common.functions.MapFunction; | |||
| // import org.apache.flink.api.java.ExecutionEnvironment; | |||
| // import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; | |||
| // import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; | |||
| // import org.apache.flink.api.java.operators.DataSource; | |||
| // import org.apache.flink.api.java.operators.FlatMapOperator; | |||
| // import org.apache.flink.api.java.tuple.Tuple2; | |||
| // import org.apache.flink.api.java.utils.ParameterTool; | |||
| // import org.apache.flink.util.Collector; | |||
| // import org.apache.hadoop.conf.Configuration; | |||
| // import org.apache.hadoop.fs.FileSystem; | |||
| // import org.apache.hadoop.fs.LocatedFileStatus; | |||
| // import org.apache.hadoop.fs.Path; | |||
| // import org.apache.hadoop.fs.RemoteIterator; | |||
| // import org.apache.hadoop.io.NullWritable; | |||
| // import org.apache.hadoop.mapred.FileOutputFormat; | |||
| // import org.apache.hadoop.mapred.JobConf; | |||
| // | |||
| // import java.io.IOException; | |||
|
|||
| // import java.io.Serializable; | |||
| // import java.net.URI; | |||
| // import java.net.URISyntaxException; | |||
| // import java.util.*; | |||
| // | |||
| // /** | |||
| // * @author: LiaoMingtao | |||
| // * @date: 2020/10/26 | |||
| // */ | |||
| // @Slf4j | |||
| // public class HdfsLogDesensitizationOld implements Serializable { | |||
|
|||
| // | |||
| // private static final long serialVersionUID = 1L; | |||
|
|||
| // | |||
| // private static final String AVRO_OUTPUT_SCHEMA = "avro.output.schema"; | |||
|
|||
| // private static final String HOSTNAME = "hostname"; | |||
| // private static final List<String> dataFormats = new ArrayList<String>() {{ | |||
| // add(","); | |||
| // add("."); | |||
| // add("@"); | |||
| // add("-"); | |||
| // add(":"); | |||
| // }}; | |||
| // | |||
| // private String jobName; | |||
|
|||
| // private int sourceParallelism; | |||
| // private int transformerParallelism; | |||
| // private int sinkParallelism; | |||
| // private int maxFileNum; | |||
| // private String avroOutputSchema; | |||
| // private List<String> fieldsWhiteList; | |||
| // private String core; | |||
| // private String hdfsUri; | |||
| // private String hdfsUser; | |||
| // private String hdfsSrc; | |||
| // private String hdfsDest; | |||
| // private String startTime; | |||
| // private String endTime; | |||
| // /** | |||
| // * 是否脱敏维度信息 | |||
| // */ | |||
| // private boolean hasRegDimension; | |||
|
|||
| // private long startTimestamp; | |||
| // private long endTimestamp; | |||
| // private Map<String, String> confMap; | |||
| // private Map<String, String> regularMap; | |||
| // | |||
| // public HdfsLogDesensitizationOld initRegular(Map<String, String> regularMap) { | |||
|
|||
| // this.regularMap = regularMap; | |||
| // return this; | |||
| // } | |||
| // | |||
| // public HdfsLogDesensitizationOld initConf(Map<String, String> conf) { | |||
|
|||
| // this.jobName = String.valueOf(conf.get(ConfigConstants.JOB_NAME)); | |||
| // this.sourceParallelism = Integer.parseInt(conf.get(ConfigConstants.SOURCE_PARALLELISM)); | |||
| // this.transformerParallelism = Integer.parseInt(conf.get(ConfigConstants.TRANSFORMER_PARALLELISM)); | |||
| // this.sinkParallelism = Integer.parseInt(conf.get(ConfigConstants.SINK_PARALLELISM)); | |||
| // String[] fieldsWhiteListArray = String.valueOf(conf.get(ConfigConstants.FIELDS_WHITE_LIST)) | |||
| // .trim().split(GeneralConstants.COMMA); | |||
|
|||
| // this.fieldsWhiteList = new ArrayList<>(Arrays.asList(fieldsWhiteListArray)); | |||
| // this.avroOutputSchema = new Schema.Parser().parse(AvroSchemaDef.ZORK_LOG_SCHEMA).toString(true); | |||
| // this.hdfsUri = String.valueOf(conf.get(ConfigConstants.HDFS_URI)).trim(); | |||
| // this.hdfsUser = String.valueOf(conf.get(ConfigConstants.HDFS_USER)).trim(); | |||
| // this.hdfsSrc = hdfsUri + String.valueOf(conf.get(ConfigConstants.HDFS_SRC)).trim(); | |||
| // this.hdfsDest = hdfsUri + String.valueOf(conf.get(ConfigConstants.HDFS_DEST)).trim(); | |||
| // this.core = String.valueOf(conf.get(ConfigConstants.CORE)).trim(); | |||
| // this.startTime = String.valueOf(conf.get(ConfigConstants.START_TIME)); | |||
| // this.endTime = String.valueOf(conf.get(ConfigConstants.END_TIME)); | |||
| // this.startTimestamp = DateUtils.time2Timestamp(startTime); | |||
| // this.endTimestamp = DateUtils.time2Timestamp(endTime); | |||
| // this.hasRegDimension = Boolean.parseBoolean(conf.get(ConfigConstants.REG_DIMENSION)); | |||
| // this.confMap = conf; | |||
| // return this; | |||
| // } | |||
| // | |||
| // public void desensitizationHdfsLog() throws Exception { | |||
|
|||
| // desensitizationHdfsLog(this.confMap); | |||
| // } | |||
| // | |||
| // public void desensitizationHdfsLog(Map<String, String> conf) throws Exception { | |||
|
|||
| // // 初始化env | |||
| // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | |||
|
|||
| // JobConf jobConf = new JobConf(); | |||
| // jobConf.set(AVRO_OUTPUT_SCHEMA, this.avroOutputSchema); | |||
| // ParameterTool parameterTool = ParameterTool.fromMap(conf); | |||
| // env.getConfig().setGlobalJobParameters(parameterTool); | |||
| // RegularExpressions regularExpressions = new RegularExpressions(this.regularMap); | |||
| // DesensitizationFunctionOld desensitizationFunctionOld = new DesensitizationFunctionOld(regularExpressions); | |||
| // | |||
| // // source | |||
| // List<String> logFiles = filterHdfsLogFiles(hdfsSrc, hdfsUri, hdfsUser); | |||
|
|||
| // String logFileListString = list2String(logFiles); | |||
| // HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<> | |||
| // (new AvroInputFormat(), Object.class, Object.class, jobConf); | |||
|
|||
| // AvroInputFormat.addInputPaths(hadoopInputFormat.getJobConf(), logFileListString); | |||
| // DataSource<Tuple2<Object, Object>> hdfsLogInput = env | |||
| // .createInput(hadoopInputFormat).setParallelism(sourceParallelism); | |||
|
|||
| // | |||
| // // transformer | |||
| // FlatMapOperator<Tuple2<Object, Object>, Object> maskFlatMapOperator = | |||
| // hdfsLogInput.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() { | |||
|
|||
| // @Override | |||
| // public void flatMap(Tuple2<Object, Object> value, Collector<Object> collector) { | |||
|
|||
| // LogData logData = JSON.parseObject(value.getField(0).toString(), | |||
| // new TypeReference<LogData>() { | |||
|
|||
| // }); | |||
| // //根据日志事件的核心信息做过滤 | |||
| // boolean hasCore = (null != core && logData.getDimensions().get(HOSTNAME).contains(core)) | |||
| // || "*".equals(core); | |||
|
|||
| // if (hasCore) { | |||
| // //根据日志事件的timestamp做过滤 | |||
| // Long timestamp = DateUtils.utc2timestamp(logData.getTimestamp()); | |||
|
|||
| // | |||
| // if (null != timestamp && timestamp.compareTo(startTimestamp) >= 0 && | |||
|
|||
| // timestamp.compareTo(endTimestamp) <= 0) { | |||
| // Map<String, String> normalFields = logData.getNormalFields(); | |||
| // Map desensitization = desensitizationFunctionOld. | |||
| // desensitization(normalFields, fieldsWhiteList, dataFormats); | |||
|
|||
| // logData.setNormalFields(desensitization); | |||
| // if (hasRegDimension) { | |||
| // Map<String, String> dimensions = logData.getDimensions(); | |||
| // Map desensitizationDimensions = desensitizationFunctionOld. | |||
| // desensitization(dimensions, fieldsWhiteList, dataFormats); | |||
|
|||
| // logData.setDimensions(desensitizationDimensions); | |||
| // } | |||
| // collector.collect(logData); | |||
| // } | |||
| // } | |||
| // } | |||
| // }).setParallelism(transformerParallelism); | |||
| // // 获取目标hdfs的输出目录 | |||
| // String filePath = hdfsDest; | |||
|
|||
| // HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(new AvroOutputFormat(), jobConf); | |||
| // FileOutputFormat.setOutputPath(jobConf, new Path(filePath)); | |||
| // | |||
| // // avro序列化算子 .writeAsText("file:///lmt/output"); | |||
|
|||
| // maskFlatMapOperator.map(new MapFunction<Object, Tuple2<AvroWrapper<LogData>, NullWritable>>() { | |||
| // @Override | |||
| // public Tuple2<AvroWrapper<LogData>, NullWritable> map(Object value) throws Exception { | |||
|
|||
| // AvroKey<LogData> key = new AvroKey<>((LogData) value); | |||
| // Tuple2<AvroWrapper<LogData>, NullWritable> tuple = new Tuple2<>(key, NullWritable.get()); | |||
| // return tuple; | |||
| // } | |||
| // }).setParallelism(transformerParallelism).output(hadoopOutputFormat).setParallelism(sinkParallelism); | |||
| // try { | |||
| // env.execute(jobName); | |||
| // } catch (Exception e) { | |||
| // log.error(String.valueOf(e)); | |||
| // } | |||
| // } | |||
| // | |||
| // /** | |||
| // * 分组list | |||
| // * | |||
| // * @param list 演示list | |||
| // * @return List<String> | |||
| // */ | |||
| // private List<String> changeList(List<String> list) { | |||
|
|||
| // List<String> resultList = new ArrayList<>(); | |||
| // List<List<String>> lists = subList(list, maxFileNum); | |||
| // lists.forEach(item -> { | |||
| // String tempString = list2String(item); | |||
| // resultList.add(tempString); | |||
| // }); | |||
| // return resultList; | |||
| // } | |||
| // | |||
| // /** | |||
| // * 将List按照每组n个元素进行分组 | |||
| // * | |||
| // * @param sourceList 原始list | |||
| // * @param n n个元素 | |||
| // * @param <T> 泛型 | |||
| // * @return List<List < T>> | |||
| // */ | |||
| // private <T> List<List<T>> subList(List<T> sourceList, int n) { | |||
|
|||
| // List<List<T>> rsList = new ArrayList<>(); | |||
| // if (n <= 0) { | |||
| // rsList.add(sourceList); | |||
| // return rsList; | |||
| // } | |||
| // int listSize = sourceList.size(); | |||
| // int groupNum = (sourceList.size() / n) + 1; | |||
| // for (int i = 0; i < groupNum; i++) { | |||
| // List<T> subList = new ArrayList<>(); | |||
| // for (int j = i * n; j < (i + 1) * n; j++) { | |||
| // if (j < listSize) { | |||
| // subList.add(sourceList.get(j)); | |||
| // } | |||
| // } | |||
| // rsList.add(subList); | |||
| // } | |||
| // if (rsList.get(rsList.size() - 1).isEmpty()) { | |||
| // rsList.remove(rsList.size() - 1); | |||
| // } | |||
| // return rsList; | |||
| // } | |||
| // | |||
| // /** | |||
| // * list<string>转逗号分割的string | |||
| // * | |||
| // * @param list list<string> | |||
| // * @return String | |||
| // */ | |||
| // private String list2String(List<String> list) { | |||
|
|||
| // return String.join(GeneralConstants.COMMA, list); | |||
| // } | |||
| // | |||
| // /** | |||
| // * 通过路径获取 | |||
| // * | |||
| // * @param fileSystem 文件系统 | |||
| // * @param path 目录路径 | |||
| // * @return 文件路径下所有文件全路径 | |||
| // */ | |||
| // private List<String> getHdfsLogFilesByPath(FileSystem fileSystem, String path) { | |||
|
|||
| // List<String> logFiles = new ArrayList<>(); | |||
| // try { | |||
| // RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path(path), | |||
| // false); | |||
|
|||
| // while (locatedFileStatusRemoteIterator.hasNext()) { | |||
| // LocatedFileStatus next = locatedFileStatusRemoteIterator.next(); | |||
| // long modificationTime = next.getModificationTime(); | |||
| // // 根据文件的修改时间做过滤,获取用户指定时间段内的文件 | |||
| // if (modificationTime > startTimestamp) { | |||
|
|||
| // Path hdfsFilePath = next.getPath(); | |||
| // logFiles.add(hdfsFilePath.toString()); | |||
| // } | |||
| // } | |||
| // } catch (IOException e) { | |||
| // log.error(String.valueOf(e)); | |||
| // } | |||
| // return logFiles; | |||
| // } | |||
| // | |||
| // /** | |||
| // * 获取hdfs日志文件的所有文件路径 | |||
| // * | |||
| // * @param hdfsSrc hdfs地址 | |||
| // * @param hdfsUri hdfs的URI | |||
| // * @param hdfsUser hfs用户名 | |||
| // * @return hdfs日志文件的所有文件路径 | |||
| // */ | |||
| // @Deprecated | |||
| // private List<String> filterHdfsLogFiles(String hdfsSrc, String hdfsUri, String hdfsUser) { | |||
|
|||
| // // hdfs://cdh-2:8020/ hdfs | |||
| // if (!hdfsSrc.endsWith(GeneralConstants.FILE_SEPARATOR)) { | |||
|
|||
| // hdfsSrc += GeneralConstants.FILE_SEPARATOR; | |||
| // } | |||
| // String path = hdfsSrc; | |||
| // Configuration conf = new Configuration(); | |||
| // List<String> logFiles = new ArrayList<>(); | |||
| // FileSystem fileSystem = null; | |||
| // List<String> betweenDate = DateUtils.getBetweenDate(startTime, endTime); | |||
| // List<String> dateList = DateUtils.date2date(betweenDate); | |||
| // if (dateList.size() > 0) { | |||
| // try { | |||
| // fileSystem = FileSystem.get(new URI(hdfsUri), conf, hdfsUser); | |||
| // for (String item : dateList) { | |||
| // path = hdfsSrc + item; | |||
| // List<String> hdfsLogFiles = getHdfsLogFilesByPath(fileSystem, path); | |||
| // logFiles.addAll(hdfsLogFiles); | |||
| // } | |||
| // } catch (IOException e) { | |||
| // log.error(String.valueOf(e)); | |||
| // } catch (InterruptedException e) { | |||
| // log.error(String.valueOf(e)); | |||
| // } catch (URISyntaxException e) { | |||
| // log.error(String.valueOf(e)); | |||
| // } finally { | |||
| // if (null != fileSystem) { | |||
| // try { | |||
| // fileSystem.close(); | |||
| // } catch (IOException e) { | |||
| // log.error(String.valueOf(e)); | |||
| // } | |||
| // } | |||
| // } | |||
| // } else { | |||
| // log.warn("{} -- {} 时间段内无数据,请注意时间范围", startTime, endTime); | |||
| // } | |||
| // | |||
| // return logFiles; | |||
|
|||
| // } | |||
| // | |||
| // public HdfsLogDesensitizationOld() { | |||
|
|||
| // | |||
| // } | |||
|
|||
| // } | |||
| ... | @@ -11,7 +11,7 @@ import java.util.*; | ... | @@ -11,7 +11,7 @@ import java.util.*; |
| * @date: 2020/10/22 | * @date: 2020/10/22 | ||
| */ | */ | ||
| @Slf4j | @Slf4j | ||
| public class DateUtil { | public class DateUtils { | ||
|
|||
| private static final String NULL = ""; | private static final String NULL = ""; | ||
| private static final String BAR_STRING = "-"; | private static final String BAR_STRING = "-"; | ||
| ... | ... | ||
| package com.zorkdata.desensitization.utils; | package com.zorkdata.desensitization.utils; | ||
| import com.alibaba.fastjson.JSON; | |||
| import com.zorkdata.desensitization.constans.GeneralConstants; | import com.zorkdata.desensitization.constans.GeneralConstants; | ||
| import com.zorkdata.desensitization.exception.ZorkException; | import com.zorkdata.desensitization.exception.ZorkException; | ||
| import lombok.extern.slf4j.Slf4j; | import lombok.extern.slf4j.Slf4j; | ||
| ... | @@ -14,20 +13,12 @@ import java.util.*; | ... | @@ -14,20 +13,12 @@ import java.util.*; |
| * @date: 2020/8/7 | * @date: 2020/8/7 | ||
| */ | */ | ||
| @Slf4j | @Slf4j | ||
| public class PropertiesUtil { | public class PropertiesUtils { | ||
|
|||
| private static final int DEFAULT_PARAMS_MAP_LENGTH = 10; | private static final int DEFAULT_PARAMS_MAP_LENGTH = 10; | ||
| private static final String REGULAR = "regular"; | private static final String REGULAR = "regular"; | ||
| public static void main(String[] args) { | |||
| List<String> propertiesContentList = PropertiesUtil.getPropertiesContentList("/regular"); | |||
| System.out.println(JSON.toJSONString(propertiesContentList)); | |||
| Map<String, String> propertiesMap = getPropertiesMap(propertiesContentList); | |||
| System.out.println(JSON.toJSONString(propertiesMap)); | |||
| } | |||
| /** | /** | ||
| * 获取配置文件map | * 获取配置文件map | ||
| * | * | ||
| ... | @@ -39,8 +30,8 @@ public class PropertiesUtil { | ... | @@ -39,8 +30,8 @@ public class PropertiesUtil { |
| ParameterTool parameterTool = ParameterTool.fromArgs(args); | ParameterTool parameterTool = ParameterTool.fromArgs(args); | ||
| configPath = parameterTool.get(REGULAR); | configPath = parameterTool.get(REGULAR); | ||
| log.info("read config path is {}", configPath); | log.info("read config path is {}", configPath); | ||
| List<String> propertiesContentList = PropertiesUtil.getPropertiesContentList(configPath); | List<String> propertiesContentList = PropertiesUtils.getPropertiesContentList(configPath); | ||
| Map<String, String> confMap = PropertiesUtil.getPropertiesMap(propertiesContentList); | Map<String, String> confMap = PropertiesUtils.getPropertiesMap(propertiesContentList); | ||
| if (confMap.isEmpty()) { | if (confMap.isEmpty()) { | ||
| log.error("配置文件regular不存在,系统退出"); | log.error("配置文件regular不存在,系统退出"); | ||
| throw new ZorkException("配置文件regular不存在,系统退出"); | throw new ZorkException("配置文件regular不存在,系统退出"); | ||
| ... | @@ -114,7 +105,7 @@ public class PropertiesUtil { | ... | @@ -114,7 +105,7 @@ public class PropertiesUtil { |
| InputStream inputStream = null; | InputStream inputStream = null; | ||
| StringBuilder stringBuilder = new StringBuilder(); | StringBuilder stringBuilder = new StringBuilder(); | ||
| try { | try { | ||
| inputStream = PropertiesUtil.class.getResourceAsStream(propertiesFileName); | inputStream = PropertiesUtils.class.getResourceAsStream(propertiesFileName); | ||
| //存放读的字节,就是读的结果 | //存放读的字节,就是读的结果 | ||
| int result = -1; | int result = -1; | ||
| while ((result = inputStream.read()) != -1) { | while ((result = inputStream.read()) != -1) { | ||
| ... | @@ -144,7 +135,7 @@ public class PropertiesUtil { | ... | @@ -144,7 +135,7 @@ public class PropertiesUtil { |
| Properties properties = new Properties(); | Properties properties = new Properties(); | ||
| InputStream inputStream = null; | InputStream inputStream = null; | ||
| try { | try { | ||
| inputStream = PropertiesUtil.class.getResourceAsStream(propertiesFileName); | inputStream = PropertiesUtils.class.getResourceAsStream(propertiesFileName); | ||
| properties.load(inputStream); | properties.load(inputStream); | ||
| } catch (IOException e) { | } catch (IOException e) { | ||
| e.printStackTrace(); | e.printStackTrace(); | ||
| ... | ... | ||
-
SonarQube analysis reported 174 issues
-
⛔ 1 blocker -
🚫 20 critical -
⚠ 130 major -
🔽 21 minor -
ℹ 2 info
Watch the comments in this conversation to review them.
Top 30 extra issues
Note: The following issues were found on lines that were not modified in the commit. Because these issues can't be reported as line comments, they are summarized here:
-
🚫 Define a constant instead of duplicating this literal " {\n" 11 times.📘 -
🚫 [Define a constant instead of duplicating this literal " "type": \n" 11 times.📘 -
🚫 Define a constant instead of duplicating this literal " "string",\n" 6 times.📘 -
🚫 Define a constant instead of duplicating this literal " "null"\n" 6 times.📘 -
🚫 [Define a constant instead of duplicating this literal " ]\n" 11 times.](https://git.zorkdata.com/liaomingtao/transaction-log-desensitization/blob/7bdc64f5f5802beac0ab4f93b898b7cf98d58177/src/main/java/com/zorkdata/desensitization/avro/AvroSchemaDef.java#L23)📘 -
🚫 Define a constant instead of duplicating this literal " },\n" 9 times.📘 -
🚫 Define a constant instead of duplicating this literal " "null",\n" 5 times.📘 -
🚫 Define a constant instead of duplicating this literal " {\n" 5 times.📘 -
🚫 Define a constant instead of duplicating this literal " "type": "map",\n" 5 times.📘 -
🚫 Define a constant instead of duplicating this literal " "values": "string"\n" 3 times.📘 -
🚫 Define a constant instead of duplicating this literal " }\n" 5 times.📘 -
🚫 Define a constant instead of duplicating this literal "序列化失败" 13 times.📘 -
🚫 Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed.📘 -
🚫 Change this "try" to a try-with-resources. (sonar.java.source not set. Assuming 7 or greater.)📘 -
🚫 Refactor this code to not throw exceptions in finally blocks.📘 -
🚫 Refactor this code to not throw exceptions in finally blocks.📘 -
🚫 Change this "try" to a try-with-resources. (sonar.java.source not set. Assuming 7 or greater.)📘 -
⚠ Rename "jsonObject" which hides the field declared at line 39.📘 -
⚠ Remove this expression which always evaluates to "true"📘 -
⚠ Remove this expression which always evaluates to "true"📘 -
⚠ This block of commented-out lines of code should be removed.📘 -
⚠ 及时清理不再使用的代码段或配置信息。📘 -
⚠ Remove this expression which always evaluates to "true"📘 -
⚠ Iterate over the "entrySet" instead of the "keySet".📘 -
⚠ Remove this conditional structure or edit its code blocks so that they're not all the same.📘 -
⚠ Iterate over the "entrySet" instead of the "keySet".📘 -
⚠ Remove this conditional structure or edit its code blocks so that they're not all the same.📘 -
⚠ Iterate over the "entrySet" instead of the "keySet".📘 -
⚠ Remove this conditional structure or edit its code blocks so that they're not all the same.📘 -
⚠ Remove this unused private "bigDecimal2Double" method.📘
- ... 47 more
-