Commit ebf131df authored by 王海鹰's avatar 王海鹰

commit

parent 2e1fea55
......@@ -3,17 +3,23 @@ package com.zorkdata.datamask;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.zorkdata.datamask.domain.LogData;
import com.zorkdata.datamask.domain.TransactionLog;
import com.zorkdata.datamask.util.MaskUtil;
import org.apache.avro.mapreduce.AvroJob;
import com.zorkdata.datamask.util.avro.AvroSerializerFactory;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -29,7 +35,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import sun.rmi.runtime.Log;
import java.io.IOException;
import java.net.URI;
......@@ -70,23 +80,27 @@ public class TransactionLogMask {
env.setParallelism(1);
JobConf jobConf = new JobConf();
Job job = Job.getInstance();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
String hdfsSrc = params.get("hdfs-src");
String hdfsDest = params.get("hdfs-dest");
String core = params.get("core", "c1");
String date = params.get("date", sdf.format(new Date()));
String startTime = params.get("startTime");
String endTime = params.get("endTime");
Long startTime = Long.parseLong(params.get("startTime"));
Long endTime = Long.parseLong(params.get("endTime"));
// List<String> logFiles = filterHdfsLogFiles(hdfsSrc, date, startTime, endTime);
List<String> logFiles = new ArrayList<String>() {
{
add(hdfsSrc);
}
};
List<String> logFiles = filterHdfsLogFiles(hdfsSrc, date, startTime, endTime);
System.out.println("-----------logFiles-------------:" + logFiles);
// List<String> logFiles = new ArrayList<String>() {
// {
// add(hdfsSrc);
// }
// };
for (String logFile : logFiles) {
System.out.println("-----------logFile-------------:" + logFile);
HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<Object, Object>
(new AvroInputFormat(), Object.class, Object.class, jobConf);
......@@ -99,12 +113,12 @@ public class TransactionLogMask {
*/
FlatMapOperator<Tuple2<Object, Object>, Object> maskFlatMapOperator = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
@Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> out) throws Exception {
public void flatMap(Tuple2<Object, Object> value, Collector<Object> collector) throws Exception {
// System.out.println("--------------------value:" + value);
// System.out.println("--------------------getField0" + value.getField(0));
LogData logData = JSON.parseObject(value.getField(0).toString(), new TypeReference<LogData>(){});
logData.setNormalFields(MaskUtil.mask(logData.getNormalFields()));
out.collect(logData);
collector.collect(logData);
}
});
// tuple2ObjectFlatMapOperator.print();
......@@ -114,21 +128,35 @@ public class TransactionLogMask {
*/
FlatMapOperator<Object, Object> avroFlatMapOperator = maskFlatMapOperator.flatMap(new FlatMapFunction<Object, Object>() {
@Override
public void flatMap(Object value, Collector<Object> out) throws Exception {
// System.out.println("--------------------value:" + value);
public void flatMap(Object value, Collector<Object> collector) throws Exception {
LogData logData = (LogData)value;
byte[] bytes = AvroSerializerFactory.getLogAvroSerializer().serializingLog(logData.getLogTypeName(), logData.getTimestamp(),
logData.getSource(), logData.getOffset(), logData.getDimensions(), logData.getMeasures(), logData.getNormalFields());
out.collect(bytes);
// byte[] bytes = AvroSerializerFactory.getLogAvroSerializer().serializingLog(logData.getLogTypeName(), logData.getTimestamp(),
// logData.getSource(), logData.getOffset(), logData.getDimensions(), logData.getMeasures(), logData.getNormalFields());
// out.collect(bytes);
collector.collect(JSON.toJSONString(logData));
}
});
avroFlatMapOperator.print();
String logFileName = logFile.split("/")[logFile.split("/").length - 1];
String filePath = hdfsDest + logFileName;
String filePath = hdfsDest + logFileName.replace(".avro", "out.avro");
System.out.println("---------------writepath-----------------:" + filePath);
avroFlatMapOperator.writeAsText(filePath, org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).name("hadoop-sink");
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(new AvroOutputFormat(), jobConf);
AvroOutputFormat<LogData> logDataAvroOutputFormat = new AvroOutputFormat<>();
HadoopOutputFormat hadoopOutputFormat2 = new HadoopOutputFormat<>(logDataAvroOutputFormat, jobConf);
// AvroOutputFormat.
jobConf.set("avro.output.schema", TransactionLog.SCHEMA$.toString());
AvroJob.setInputKeySchema(job, TransactionLog.getClassSchema());
FileOutputFormat.setOutputPath(jobConf, new Path(filePath));
avroFlatMapOperator.output(hadoopOutputFormat);
// avroFlatMapOperator.writeAsText(filePath, org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).name("hadoop-sink");
try {
env.execute("国泰交易日志脱敏job");
} catch (Exception e) {
......@@ -197,7 +225,7 @@ public class TransactionLogMask {
* @param endTime 结束时间
* @return hdfs文件列表
*/
private static List<String> filterHdfsLogFiles(String hdfs, String date, String startTime, String endTime) {
private static List<String> filterHdfsLogFiles(String hdfs, String date, Long startTime, Long endTime) {
if (!hdfs.endsWith("/")) {
hdfs += "/";
}
......@@ -224,9 +252,12 @@ public class TransactionLogMask {
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path(path), false);
while (locatedFileStatusRemoteIterator.hasNext()) {
LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
Path path1 = next.getPath();
System.out.println("---------------path1-----------------:" + path1.toString());
logFiles.add(path1.toString());
long modificationTime = next.getModificationTime();
if(modificationTime > startTime && modificationTime < endTime){
Path hdfsFilePath = next.getPath();
System.out.println("---------------hdfsFilePath-----------------:" + hdfsFilePath.toString());
logFiles.add(hdfsFilePath.toString());
}
}
} catch (IOException e) {
e.printStackTrace();
......@@ -235,12 +266,4 @@ public class TransactionLogMask {
}
return logFiles;
}
private static LogData avroDeserialize(byte[] data) throws IOException {
DatumReader<LogData> reader = new SpecificDatumReader<LogData>(LogData.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
LogData transactionLog = reader.read(null, decoder);
System.out.println(transactionLog);
return transactionLog;
}
}
\ No newline at end of file
......@@ -16,7 +16,7 @@ import java.util.Map;
@org.apache.avro.specific.AvroGenerated
public class TransactionLog extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = -4444562953482178409L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"logs\",\"namespace\":\"com.zork.logs\",\"fields\":[{\"name\":\"logTypeName\",\"type\":[\"string\",\"null\"]},{\"name\":\"timestamp\",\"type\":[\"string\",\"null\"]},{\"name\":\"source\",\"type\":[\"string\",\"null\"]},{\"name\":\"offset\",\"type\":[\"string\",\"null\"]},{\"name\":\"dimensions\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}]},{\"name\":\"measures\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"double\"}]},{\"name\":\"normalfields\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}]}]}");
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"logs\",\"namespace\":\"com.zork.logs\",\"fields\":[{\"name\":\"logTypeName\",\"type\":[\"string\",\"null\"]},{\"name\":\"timestamp\",\"type\":[\"string\",\"null\"]},{\"name\":\"source\",\"type\":[\"string\",\"null\"]},{\"name\":\"offset\",\"type\":[\"string\",\"null\"]},{\"name\":\"dimensions\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}]},{\"name\":\"measures\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"double\"}]},{\"name\":\"normalFields\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}]}]}");
public static org.apache.avro.Schema getClassSchema() {
return SCHEMA$;
......@@ -69,7 +69,7 @@ public class TransactionLog extends org.apache.avro.specific.SpecificRecordBase
@Deprecated
public Map<CharSequence, Double> measures;
@Deprecated
public Map<CharSequence, CharSequence> normalfields;
public Map<CharSequence, CharSequence> normalFields;
/**
* Default constructor. Note that this does not initialize fields
......@@ -98,7 +98,7 @@ public class TransactionLog extends org.apache.avro.specific.SpecificRecordBase
this.offset = offset;
this.dimensions = dimensions;
this.measures = measures;
this.normalfields = normalfields;
this.normalFields = normalfields;
}
public org.apache.avro.Schema getSchema() {
......@@ -121,7 +121,7 @@ public class TransactionLog extends org.apache.avro.specific.SpecificRecordBase
case 5:
return measures;
case 6:
return normalfields;
return normalFields;
default:
throw new org.apache.avro.AvroRuntimeException("Bad index");
}
......@@ -150,7 +150,7 @@ public class TransactionLog extends org.apache.avro.specific.SpecificRecordBase
measures = (Map<CharSequence, Double>) value$;
break;
case 6:
normalfields = (Map<CharSequence, CharSequence>) value$;
normalFields = (Map<CharSequence, CharSequence>) value$;
break;
default:
throw new org.apache.avro.AvroRuntimeException("Bad index");
......@@ -258,7 +258,7 @@ public class TransactionLog extends org.apache.avro.specific.SpecificRecordBase
* @return The value of the 'normalfields' field.
*/
public Map<CharSequence, CharSequence> getNormalFields() {
return normalfields;
return normalFields;
}
/**
......@@ -266,7 +266,7 @@ public class TransactionLog extends org.apache.avro.specific.SpecificRecordBase
* @param value the value to set.
*/
public void setNormalFields(Map<CharSequence, CharSequence> value) {
this.normalfields = value;
this.normalFields = value;
}
/**
......@@ -380,8 +380,8 @@ public class TransactionLog extends org.apache.avro.specific.SpecificRecordBase
this.measures = data().deepCopy(fields()[5].schema(), other.measures);
fieldSetFlags()[5] = true;
}
if (isValidValue(fields()[6], other.normalfields)) {
this.normalfields = data().deepCopy(fields()[6].schema(), other.normalfields);
if (isValidValue(fields()[6], other.normalFields)) {
this.normalfields = data().deepCopy(fields()[6].schema(), other.normalFields);
fieldSetFlags()[6] = true;
}
}
......@@ -670,7 +670,7 @@ public class TransactionLog extends org.apache.avro.specific.SpecificRecordBase
record.offset = fieldSetFlags()[3] ? this.offset : (CharSequence) defaultValue(fields()[3]);
record.dimensions = fieldSetFlags()[4] ? this.dimensions : (Map<CharSequence, CharSequence>) defaultValue(fields()[4]);
record.measures = fieldSetFlags()[5] ? this.measures : (Map<CharSequence, Double>) defaultValue(fields()[5]);
record.normalfields = fieldSetFlags()[6] ? this.normalfields : (Map<CharSequence, CharSequence>) defaultValue(fields()[6]);
record.normalFields = fieldSetFlags()[6] ? this.normalfields : (Map<CharSequence, CharSequence>) defaultValue(fields()[6]);
return record;
} catch (Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
......
......@@ -19,32 +19,32 @@ public class MaskUtil {
/**
* 姓名正则
*/
static Pattern namePattern = Pattern.compile("^([\\u4e00-\u9fa5]{1,20}|[a-zA-Z\\.\\s]{1,20})$");
static Pattern namePattern = Pattern.compile("([\\u4e00-\u9fa5]{1,20}|[a-zA-Z\\.\\s]{1,20})");
/**
* 手机号正则
*/
static Pattern mobilePattern = Pattern.compile("^((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))\\d{8}$");
static Pattern mobilePattern = Pattern.compile("((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))\\d{8}");
/**
* 电话号码正则
*/
static Pattern phonePattern = Pattern.compile("^(\\d{3,4}-)?\\d{6,8}$");
static Pattern phonePattern = Pattern.compile("(\\d{3,4}-)?\\d{6,8}");
/**
* 邮箱正则
*/
static Pattern emailPattern = Pattern.compile("^\\w+([-+.]\\w+)*@\\w+([-.]\\w+)*\\.\\w+([-.]\\w+)*$");
static Pattern emailPattern = Pattern.compile("\\w+([-+.]\\w+)*@\\w+([-.]\\w+)*\\.\\w+([-.]\\w+)*");
/**
* 身份证号码(15位)正则
*/
// static Pattern idPattern15 = Pattern.compile("\\d{17}[0-9Xx]|\\d{15}");
static Pattern idPattern15 = Pattern.compile("^[1-9]\\d{7}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}$");
static Pattern idPattern15 = Pattern.compile("[1-9]\\d{7}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}");
/**
* 身份证号码(18位)正则
*/
static Pattern idPattern18 = Pattern.compile("^[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}([0-9Xx])$");
static Pattern idPattern18 = Pattern.compile("[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}([0-9Xx])");
/**
* 家庭住址正则
*/
static Pattern addressPattern = Pattern.compile("^([\\u4E00-\\u9FA5A-Za-z0-9_]+(省|市|区|县|道|路|街|号)){2,}$");
static Pattern addressPattern = Pattern.compile("([\\u4E00-\\u9FA5A-Za-z0-9_]+(省|市|区|县|道|路|街|号)){2,}");
/**
* ip地址正则
*/
......@@ -55,11 +55,11 @@ public class MaskUtil {
static List<Pattern> patterns = new ArrayList<Pattern>(){{
add(namePattern);
add(idPattern18);
add(idPattern15);
add(mobilePattern);
add(phonePattern);
add(emailPattern);
add(idPattern15);
add(idPattern18);
add(addressPattern);
add(ipPattern);
add(macPattern);
......@@ -69,8 +69,8 @@ public class MaskUtil {
map.forEach((k, v) -> {
String value = v.toString();
for(Pattern pattern:patterns){
Matcher matcher = pattern.matcher(v.toString());
if (matcher.matches()){
Matcher matcher = pattern.matcher(value);
if (matcher.find()){
String replaceStr = "";
for(int i=0; i < matcher.group().length(); i++){
replaceStr = replaceStr.concat("*");
......@@ -86,14 +86,15 @@ public class MaskUtil {
public static void main(String[] args) {
Map map = new HashMap();
map.put("姓名", "王海鹰");
map.put("身份证号", "372925199008075158");
map.put("手机号", "15000101879");
map.put("电话", "021-61341606");
map.put("邮箱", "wanghaiying@zork.com.cn");
map.put("住址", "上海市浦东新区碧波路690号");
map.put("ip地址", "192.168.70.2");
map.put("mac地址", "3c-78-43-25-80-bd");
// map.put("姓名", "王海鹰");
// map.put("身份证号", "372925199008075158");
// map.put("手机号", "15000101879");
// map.put("电话", "021-61341606");
// map.put("邮箱", "wanghaiying@zork.com.cn");
// map.put("住址", "上海市浦东新区碧波路690号");
// map.put("ip地址", "192.168.70.2");
// map.put("mac地址", "3c-78-43-25-80-bd");
map.put("message", "王海鹰-372925199008075158-15000101879");
System.out.println(mask(map));
// String mobile = "15000101879";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment