Commit 70bee141 authored by 谢森's avatar 谢森

<dev> 添加sink txt

parents dd502ffa 2e1fea55
package com.zorkdata.datamask;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.zorkdata.datamask.domain.LogData;
import com.zorkdata.datamask.domain.TransactionLog;
import com.zorkdata.datamask.util.MaskUtil;
import com.zorkdata.datamask.util.avro.AvroSerializerFactory;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroTextOutputFormat;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
......@@ -27,7 +32,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import java.io.IOException;
import java.net.URI;
......@@ -39,11 +46,6 @@ import java.util.Date;
import java.util.List;
import java.util.Properties;
//import org.apache.flink.formats.avro.AvroInputFormat;
//import org.apache.flink.formats.avro.AvroInputFormat;
//import org.apache.hadoop.mapred.jobcontrol.Job;
/**
* Description : 国泰交易日志脱敏job
*
......@@ -72,6 +74,8 @@ public class TransactionLogMask {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
JobConf jobConf = new JobConf();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
String hdfsSrc = params.get("hdfs-src");
String hdfsDest = params.get("hdfs-dest");
......@@ -86,53 +90,53 @@ public class TransactionLogMask {
add(hdfsSrc);
}
};
System.out.println("---------------logFiles-----------------:" + logFiles);
for (String logFile : logFiles) {
// DataSet<String> textFileSource = env.readTextFile(logFile).name("hadoop-source");
// Job job = Job.getInstance();
// HadoopInputFormat<Text, LogData> hadoopInputFormat = new HadoopInputFormat<Text, LogData>((InputFormat<Text, LogData>) logDataInput, Text.class, LogData.class, new JobConf());
// HadoopInputFormat<Text, LogData> hadoopInputFormat = new HadoopInputFormat<Text, LogData>(new TextInputFormat(), Text.class, LogData.class, new JobConf());
// AvroInputFormat<LogData> logDataAvroFormat = new AvroInputFormat<>(new org.apache.flink.core.fs.Path(logFile), LogData.class);
// ZorkAvroFormat logDataAvroFormat = new ZorkAvroFormat<String, String>();
JobConf jobConf = new JobConf();
HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<Object, Object>
(new AvroInputFormat(), Object.class, Object.class, jobConf);
AvroInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(logFile));
DataSource<Tuple2<Object, Object>> input = env.createInput(hadoopInputFormat);
// input.print();
FlatMapOperator<Tuple2<Object, Object>, Object> map = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
@Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> collector) throws Exception {
collector.collect(value.f0);
}
});
map.print();
// DataSet<Tuple2<Object, Object>> textFileSource = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
//
// @Override
// public void flatMap(Tuple2<Object, Object> value, Collector<Object> out) throws Exception {
// System.out.println("--------------------" + value);
// System.out.println("--------------------" + value.getField(0));
// System.out.println("--------------------" + value.getField(1));
// out.collect(null);
// }
// });
// String logFileName = logFile.split("/")[logFile.split("/").length - 1];
// String filePath = hdfsDest + logFileName;
// System.out.println("---------------writepath-----------------:" + filePath);
// textFileSource.writeAsText(filePath, org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).name("hadoop-sink");
/**
* 脱敏算子
*/
FlatMapOperator<Tuple2<Object, Object>, Object> maskFlatMapOperator = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
@Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> out) throws Exception {
LogData logData = JSON.parseObject(value.getField(0).toString(), new TypeReference<LogData>() {
});
logData.setNormalFields(MaskUtil.mask(logData.getNormalFields()));
out.collect(logData);
}
});
FlatMapOperator<Object, String> objectStringFlatMapOperator = maskFlatMapOperator.flatMap(new FlatMapFunction<Object, String>() {
@Override
public void flatMap(Object value, Collector<String> collector) throws Exception {
LogData logData = (LogData) value;
collector.collect(JSON.toJSONString(logData));
}
});
// objectStringFlatMapOperator.print();
String logFileName = logFile.split("/")[logFile.split("/").length - 1];
String filePath = hdfsDest + logFileName;
System.out.println("---------------writepath-----------------:" + filePath);
objectStringFlatMapOperator.writeAsText(filePath, org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).name("hadoop-sink");
try {
// env.execute("国泰交易日志脱敏job");
env.execute("国泰交易日志脱敏job");
} catch (Exception e) {
e.printStackTrace();
}
......@@ -238,14 +242,6 @@ public class TransactionLogMask {
return logFiles;
}
private static TransactionLog avroSerialize(byte[] data) throws IOException {
DatumReader<TransactionLog> reader = new SpecificDatumReader<TransactionLog>(TransactionLog.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
TransactionLog transactionLog = reader.read(null, decoder);
System.out.println(transactionLog);
return transactionLog;
}
private static LogData avroDeserialize(byte[] data) throws IOException {
DatumReader<LogData> reader = new SpecificDatumReader<LogData>(LogData.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
......
......@@ -75,7 +75,7 @@ public class MaskUtil {
for(int i=0; i < matcher.group().length(); i++){
replaceStr = replaceStr.concat("*");
}
System.out.println(replaceStr);
// System.out.println(replaceStr);
value = value.replace(matcher.group(), replaceStr);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment