Commit 7c58b271 authored by 王海鹰's avatar 王海鹰

commit

parent 8e9f48d7
...@@ -6,7 +6,6 @@ import org.apache.avro.io.DatumReader; ...@@ -6,7 +6,6 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder; import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroInputFormat; import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
...@@ -21,6 +20,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -21,6 +20,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer; import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
...@@ -38,6 +38,8 @@ import java.util.Date; ...@@ -38,6 +38,8 @@ import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
//import org.apache.flink.formats.avro.AvroInputFormat;
//import org.apache.flink.formats.avro.AvroInputFormat; //import org.apache.flink.formats.avro.AvroInputFormat;
//import org.apache.hadoop.mapred.jobcontrol.Job; //import org.apache.hadoop.mapred.jobcontrol.Job;
...@@ -62,7 +64,7 @@ public class TransactionLogMask { ...@@ -62,7 +64,7 @@ public class TransactionLogMask {
/** /**
* hdfs日志文件脱敏 * hdfs日志文件脱敏
* *
* @param params 请求参数 * @param params 请求参数
* @return void * @return void
*/ */
public static void maskHdfsLog(ParameterTool params) throws Exception { public static void maskHdfsLog(ParameterTool params) throws Exception {
...@@ -94,17 +96,44 @@ public class TransactionLogMask { ...@@ -94,17 +96,44 @@ public class TransactionLogMask {
// ZorkAvroFormat logDataAvroFormat = new ZorkAvroFormat<String, String>(); // ZorkAvroFormat logDataAvroFormat = new ZorkAvroFormat<String, String>();
JobConf jobConf = new JobConf(); JobConf jobConf = new JobConf();
// Job jobInstance = Job.getInstance();
HadoopInputFormat<AvroKey, LogData> hadoopInputFormat = new HadoopInputFormat<AvroKey, LogData> HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<Object, Object>
(new AvroInputFormat(), AvroKey.class, LogData.class, jobConf); (new AvroInputFormat(), Object.class, Object.class, jobConf);
AvroInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(logFile)); AvroInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(logFile));
DataSource<Tuple2<AvroKey, LogData>> input = env.createInput(hadoopInputFormat); DataSource<Tuple2<Object, Object>> input = env.createInput(hadoopInputFormat);
input.print();
// FlatMapOperator<Tuple2<AvroKey, AvroValue>, LogData> tuple2ObjectFlatMapOperator
DataSet<Tuple2<Object, Object>> textFileSource = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
/**
* The core method of the FlatMapFunction. Takes an element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
* @param out The collector for returning result values.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
@Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> out) throws Exception {
System.out.println("--------------------" + value);
System.out.println("--------------------" + value.getField(0));
System.out.println("--------------------" + value.getField(1));
// LogData transactionLog = avroDeserialize(value.getBytes());
// System.out.println("--------------------" + transactionLog);
// transactionLog.setNormalFields(MaskUtil.mask(transactionLog.getNormalFields()));
out.collect(null);
}
}).getInput();
DataSet<Tuple2<AvroKey, LogData>> textFileSource = input.flatMap((FlatMapFunction<Tuple2<AvroKey, LogData>, Object>) (value, out) -> // textFileSource.print();
System.out.println("------------------" + value)).getInput(); // DataSet<Tuple2<AvroKey, AvroValue>> textFileSource = tuple2ObjectFlatMapOperator.getInput();
// DataSet<Tuple2<AvroKey, LogData>> textFileSource = input.flatMap((FlatMapFunction<Tuple2<AvroKey, LogData>, Object>) (value, out) ->
// System.out.println("------------------" + value)).getInput();
// env.createInput(logDataInput).flatMap(new Avro2StrFlatMapFunction()); // env.createInput(logDataInput).flatMap(new Avro2StrFlatMapFunction());
// DataSet<String> textFileSource = env.readTextFile(logFile).name("hadoop-source"); // DataSet<String> textFileSource = env.readTextFile(logFile).name("hadoop-source");
// DataSet<String> flatMap = textFileSource.map(new Avro2StrFlatMapFunction()); // DataSet<String> flatMap = textFileSource.map(new Avro2StrFlatMapFunction());
...@@ -125,7 +154,7 @@ public class TransactionLogMask { ...@@ -125,7 +154,7 @@ public class TransactionLogMask {
/** /**
* kafka消息数据脱敏 * kafka消息数据脱敏
* *
* @param params 请求参数 * @param params 请求参数
* @return void * @return void
*/ */
public static void maskKafkaLog(ParameterTool params) { public static void maskKafkaLog(ParameterTool params) {
...@@ -176,10 +205,10 @@ public class TransactionLogMask { ...@@ -176,10 +205,10 @@ public class TransactionLogMask {
/** /**
* 过滤hdfs日志文件 * 过滤hdfs日志文件
* *
* @param hdfs hdfs地址 * @param hdfs hdfs地址
* @param date 日期 * @param date 日期
* @param startTime 起始时间 * @param startTime 起始时间
* @param endTime 结束时间 * @param endTime 结束时间
* @return hdfs文件列表 * @return hdfs文件列表
*/ */
private static List<String> filterHdfsLogFiles(String hdfs, String date, String startTime, String endTime) { private static List<String> filterHdfsLogFiles(String hdfs, String date, String startTime, String endTime) {
...@@ -229,10 +258,10 @@ public class TransactionLogMask { ...@@ -229,10 +258,10 @@ public class TransactionLogMask {
return transactionLog; return transactionLog;
} }
private static TransactionLog avroDeserialize(byte[] data) throws IOException { private static LogData avroDeserialize(byte[] data) throws IOException {
DatumReader<TransactionLog> reader = new SpecificDatumReader<TransactionLog>(TransactionLog.class); DatumReader<LogData> reader = new SpecificDatumReader<LogData>(LogData.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
TransactionLog transactionLog = reader.read(null, decoder); LogData transactionLog = reader.read(null, decoder);
System.out.println(transactionLog); System.out.println(transactionLog);
return transactionLog; return transactionLog;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment