Commit 0f0dba43 authored by 王海鹰's avatar 王海鹰

commit

parent 7c58b271
package com.zorkdata.datamask;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.zorkdata.datamask.domain.LogData;
import com.zorkdata.datamask.domain.TransactionLog;
import com.zorkdata.datamask.util.MaskUtil;
import com.zorkdata.datamask.util.avro.AvroSerializerFactory;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
......@@ -103,37 +110,40 @@ public class TransactionLogMask {
AvroInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(logFile));
DataSource<Tuple2<Object, Object>> input = env.createInput(hadoopInputFormat);
// input.print();
input.print();
// FlatMapOperator<Tuple2<AvroKey, AvroValue>, LogData> tuple2ObjectFlatMapOperator
DataSet<Tuple2<Object, Object>> textFileSource = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
/**
* The core method of the FlatMapFunction. Takes an element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
* @param out The collector for returning result values.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
/**
* 脱敏算子
*/
FlatMapOperator<Tuple2<Object, Object>, Object> tuple2ObjectFlatMapOperator = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
@Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> out) throws Exception {
System.out.println("--------------------" + value);
System.out.println("--------------------" + value.getField(0));
System.out.println("--------------------" + value.getField(1));
// LogData transactionLog = avroDeserialize(value.getBytes());
// System.out.println("--------------------" + transactionLog);
// transactionLog.setNormalFields(MaskUtil.mask(transactionLog.getNormalFields()));
out.collect(null);
// System.out.println("--------------------value:" + value);
// System.out.println("--------------------getField0" + value.getField(0));
LogData logData = JSON.parseObject(value.getField(0).toString(), new TypeReference<LogData>(){});
logData.setNormalFields(MaskUtil.mask(logData.getNormalFields()));
out.collect(logData);
}
}).getInput();
});
// tuple2ObjectFlatMapOperator.print();
// textFileSource.print();
// DataSet<Tuple2<AvroKey, AvroValue>> textFileSource = tuple2ObjectFlatMapOperator.getInput();
/**
* avro序列化算子
*/
FlatMapOperator<Object, Object> objectObjectFlatMapOperator = tuple2ObjectFlatMapOperator.flatMap(new FlatMapFunction<Object, Object>() {
@Override
public void flatMap(Object value, Collector<Object> out) throws Exception {
// System.out.println("--------------------value:" + value);
LogData logData = (LogData)value;
byte[] bytes = AvroSerializerFactory.getLogAvroSerializer().serializingLog(logData.getLogTypeName(), logData.getTimestamp(),
logData.getSource(), logData.getOffset(), logData.getDimensions(), logData.getMeasures(), logData.getNormalFields());
out.collect(bytes);
}
});
objectObjectFlatMapOperator.print();
// DataSet<Tuple2<AvroKey, LogData>> textFileSource = input.flatMap((FlatMapFunction<Tuple2<AvroKey, LogData>, Object>) (value, out) ->
// System.out.println("------------------" + value)).getInput();
// DataSet<Object> textFileSource = objectObjectFlatMapOperator.getInput();
// textFileSource.print();
// env.createInput(logDataInput).flatMap(new Avro2StrFlatMapFunction());
// DataSet<String> textFileSource = env.readTextFile(logFile).name("hadoop-source");
// DataSet<String> flatMap = textFileSource.map(new Avro2StrFlatMapFunction());
......@@ -141,7 +151,7 @@ public class TransactionLogMask {
// flatMap.print();
String filePath = hdfsDest + logFileName;
System.out.println("---------------writepath-----------------:" + filePath);
textFileSource.writeAsText(filePath, org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).name("hadoop-sink");
objectObjectFlatMapOperator.writeAsText(filePath, org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).name("hadoop-sink");
try {
env.execute("国泰交易日志脱敏job");
......@@ -250,10 +260,10 @@ public class TransactionLogMask {
return logFiles;
}
private static TransactionLog avroSerialize(byte[] data) throws IOException {
DatumReader<TransactionLog> reader = new SpecificDatumReader<TransactionLog>(TransactionLog.class);
private static LogData avroSerialize(byte[] data) throws IOException {
DatumReader<LogData> reader = new SpecificDatumReader<LogData>(LogData.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
TransactionLog transactionLog = reader.read(null, decoder);
LogData transactionLog = reader.read(null, decoder);
System.out.println(transactionLog);
return transactionLog;
}
......
......@@ -75,7 +75,7 @@ public class MaskUtil {
for(int i=0; i < matcher.group().length(); i++){
replaceStr = replaceStr.concat("*");
}
System.out.println(replaceStr);
// System.out.println(replaceStr);
value = value.replace(matcher.group(), replaceStr);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment