Commit 02f2dae8 authored by 王海鹰's avatar 王海鹰

commit

parent 0f0dba43
package com.zorkdata.datamask; package com.zorkdata.datamask;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.zorkdata.datamask.domain.LogData; import com.zorkdata.datamask.domain.LogData;
import com.zorkdata.datamask.domain.TransactionLog;
import com.zorkdata.datamask.util.MaskUtil; import com.zorkdata.datamask.util.MaskUtil;
import com.zorkdata.datamask.util.avro.AvroSerializerFactory; import com.zorkdata.datamask.util.avro.AvroSerializerFactory;
import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumReader;
...@@ -13,9 +11,7 @@ import org.apache.avro.io.DecoderFactory; ...@@ -13,9 +11,7 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroInputFormat; import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.DataSource;
...@@ -45,11 +41,6 @@ import java.util.Date; ...@@ -45,11 +41,6 @@ import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
//import org.apache.flink.formats.avro.AvroInputFormat;
//import org.apache.flink.formats.avro.AvroInputFormat;
//import org.apache.hadoop.mapred.jobcontrol.Job;
/** /**
* Description : 国泰交易日志脱敏job * Description : 国泰交易日志脱敏job
* *
...@@ -78,6 +69,8 @@ public class TransactionLogMask { ...@@ -78,6 +69,8 @@ public class TransactionLogMask {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); env.setParallelism(1);
JobConf jobConf = new JobConf();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
String hdfsSrc = params.get("hdfs-src"); String hdfsSrc = params.get("hdfs-src");
String hdfsDest = params.get("hdfs-dest"); String hdfsDest = params.get("hdfs-dest");
...@@ -92,30 +85,19 @@ public class TransactionLogMask { ...@@ -92,30 +85,19 @@ public class TransactionLogMask {
add(hdfsSrc); add(hdfsSrc);
} }
}; };
System.out.println("---------------logFiles-----------------:" + logFiles);
for (String logFile : logFiles) { for (String logFile : logFiles) {
// DataSet<String> textFileSource = env.readTextFile(logFile).name("hadoop-source");
// Job job = Job.getInstance();
// HadoopInputFormat<Text, LogData> hadoopInputFormat = new HadoopInputFormat<Text, LogData>((InputFormat<Text, LogData>) logDataInput, Text.class, LogData.class, new JobConf());
// HadoopInputFormat<Text, LogData> hadoopInputFormat = new HadoopInputFormat<Text, LogData>(new TextInputFormat(), Text.class, LogData.class, new JobConf());
// AvroInputFormat<LogData> logDataAvroFormat = new AvroInputFormat<>(new org.apache.flink.core.fs.Path(logFile), LogData.class);
// ZorkAvroFormat logDataAvroFormat = new ZorkAvroFormat<String, String>();
JobConf jobConf = new JobConf();
HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<Object, Object> HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<Object, Object>
(new AvroInputFormat(), Object.class, Object.class, jobConf); (new AvroInputFormat(), Object.class, Object.class, jobConf);
AvroInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(logFile)); AvroInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(logFile));
DataSource<Tuple2<Object, Object>> input = env.createInput(hadoopInputFormat); DataSource<Tuple2<Object, Object>> input = env.createInput(hadoopInputFormat);
// input.print(); // input.print();
/** /**
* 脱敏算子 * 脱敏算子
*/ */
FlatMapOperator<Tuple2<Object, Object>, Object> tuple2ObjectFlatMapOperator = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() { FlatMapOperator<Tuple2<Object, Object>, Object> maskFlatMapOperator = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
@Override @Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> out) throws Exception { public void flatMap(Tuple2<Object, Object> value, Collector<Object> out) throws Exception {
// System.out.println("--------------------value:" + value); // System.out.println("--------------------value:" + value);
...@@ -130,7 +112,7 @@ public class TransactionLogMask { ...@@ -130,7 +112,7 @@ public class TransactionLogMask {
/** /**
* avro序列化算子 * avro序列化算子
*/ */
FlatMapOperator<Object, Object> objectObjectFlatMapOperator = tuple2ObjectFlatMapOperator.flatMap(new FlatMapFunction<Object, Object>() { FlatMapOperator<Object, Object> avroFlatMapOperator = maskFlatMapOperator.flatMap(new FlatMapFunction<Object, Object>() {
@Override @Override
public void flatMap(Object value, Collector<Object> out) throws Exception { public void flatMap(Object value, Collector<Object> out) throws Exception {
// System.out.println("--------------------value:" + value); // System.out.println("--------------------value:" + value);
...@@ -140,7 +122,7 @@ public class TransactionLogMask { ...@@ -140,7 +122,7 @@ public class TransactionLogMask {
out.collect(bytes); out.collect(bytes);
} }
}); });
objectObjectFlatMapOperator.print(); avroFlatMapOperator.print();
// DataSet<Object> textFileSource = objectObjectFlatMapOperator.getInput(); // DataSet<Object> textFileSource = objectObjectFlatMapOperator.getInput();
// textFileSource.print(); // textFileSource.print();
...@@ -151,7 +133,7 @@ public class TransactionLogMask { ...@@ -151,7 +133,7 @@ public class TransactionLogMask {
// flatMap.print(); // flatMap.print();
String filePath = hdfsDest + logFileName; String filePath = hdfsDest + logFileName;
System.out.println("---------------writepath-----------------:" + filePath); System.out.println("---------------writepath-----------------:" + filePath);
objectObjectFlatMapOperator.writeAsText(filePath, org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).name("hadoop-sink"); avroFlatMapOperator.writeAsText(filePath, org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).name("hadoop-sink");
try { try {
env.execute("国泰交易日志脱敏job"); env.execute("国泰交易日志脱敏job");
...@@ -260,14 +242,6 @@ public class TransactionLogMask { ...@@ -260,14 +242,6 @@ public class TransactionLogMask {
return logFiles; return logFiles;
} }
private static LogData avroSerialize(byte[] data) throws IOException {
DatumReader<LogData> reader = new SpecificDatumReader<LogData>(LogData.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
LogData transactionLog = reader.read(null, decoder);
System.out.println(transactionLog);
return transactionLog;
}
private static LogData avroDeserialize(byte[] data) throws IOException { private static LogData avroDeserialize(byte[] data) throws IOException {
DatumReader<LogData> reader = new SpecificDatumReader<LogData>(LogData.class); DatumReader<LogData> reader = new SpecificDatumReader<LogData>(LogData.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment