Commit 8e9f48d7 authored by 王海鹰's avatar 王海鹰

LogData类实现WritableComparable接口

parent d90249ad
......@@ -2,43 +2,31 @@ package com.zorkdata.datamask;
import com.zorkdata.datamask.domain.LogData;
import com.zorkdata.datamask.domain.TransactionLog;
import com.zorkdata.datamask.util.avro.ZorkAvroFormat;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.io.InputSplit;
//import org.apache.flink.formats.avro.AvroInputFormat;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
//import org.apache.hadoop.mapred.jobcontrol.Job;
import java.io.IOException;
import java.net.URI;
......@@ -50,6 +38,9 @@ import java.util.Date;
import java.util.List;
import java.util.Properties;
//import org.apache.flink.formats.avro.AvroInputFormat;
//import org.apache.hadoop.mapred.jobcontrol.Job;
/**
* Description : 国泰交易日志脱敏job
*
......@@ -86,7 +77,6 @@ public class TransactionLogMask {
String startTime = params.get("startTime");
String endTime = params.get("endTime");
// List<String> logFiles = filterHdfsLogFiles(hdfsSrc, date, startTime, endTime);
List<String> logFiles = new ArrayList<String>() {
{
......@@ -104,25 +94,18 @@ public class TransactionLogMask {
// ZorkAvroFormat logDataAvroFormat = new ZorkAvroFormat<String, String>();
JobConf jobConf = new JobConf();
// Job jobInstance = Job.getInstance();
HadoopInputFormat<AvroKey, LogData> hadoopInputFormat = new HadoopInputFormat<AvroKey, LogData>
(new AvroInputFormat(),
AvroKey.class,
LogData.class,
jobConf);
(new AvroInputFormat(), AvroKey.class, LogData.class, jobConf);
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(logFile));
AvroInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(logFile));
DataSource<Tuple2<AvroKey, LogData>> input = env.createInput(hadoopInputFormat);
FlatMapOperator<Tuple2<AvroKey, LogData>, Object> logDataFlatMapOperator = input.flatMap(new FlatMapFunction<Tuple2<AvroKey, LogData>, Object>() {
@Override
public void flatMap(Tuple2<AvroKey, LogData> value, Collector<Object> out) throws Exception {
System.out.println("------------------" + value);
}
});
// env.createInput(logDataInput).flatMap(new Avro2StrFlatMapFunction());
DataSet<Tuple2<AvroKey, LogData>> textFileSource = logDataFlatMapOperator.getInput();
DataSet<Tuple2<AvroKey, LogData>> textFileSource = input.flatMap((FlatMapFunction<Tuple2<AvroKey, LogData>, Object>) (value, out) ->
System.out.println("------------------" + value)).getInput();
// env.createInput(logDataInput).flatMap(new Avro2StrFlatMapFunction());
// DataSet<String> textFileSource = env.readTextFile(logFile).name("hadoop-source");
// DataSet<String> flatMap = textFileSource.map(new Avro2StrFlatMapFunction());
String logFileName = logFile.split("/")[logFile.split("/").length - 1];
......
package com.zorkdata.datamask.domain;
import lombok.Data;
import org.joda.time.DateTime;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
......@@ -15,7 +17,7 @@ import java.util.Map;
*/
@Data
@SuppressWarnings("all")
public class LogData implements Serializable {
public class LogData implements Serializable, WritableComparable {
private static final long serialVersionUID = 1L;
/**
......@@ -30,6 +32,22 @@ public class LogData implements Serializable {
* source
*/
private String source;
@Override
public int compareTo(Object o) {
return 0;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
}
@Override
public void readFields(DataInput dataInput) throws IOException {
}
/**
* offset 偏移量
*/
......@@ -116,10 +134,10 @@ public class LogData implements Serializable {
// }
// }
@Override
public String toString() {
return new DateTime(timestamp).toDate().getTime() + " ZorkLogData{" + "logTypeName='" + logTypeName + '\'' + ", timestamp='" + timestamp + '\'' + ", source='"
+ source + '\'' + ", offset='" + offset + '\'' + ", dimensions=" + dimensions + ", measures=" + measures
+ ", normalFields=" + normalFields + '}';
}
// @Override
// public String toString() {
// return new DateTime(timestamp).toDate().getTime() + " ZorkLogData{" + "logTypeName='" + logTypeName + '\'' + ", timestamp='" + timestamp + '\'' + ", source='"
// + source + '\'' + ", offset='" + offset + '\'' + ", dimensions=" + dimensions + ", measures=" + measures
// + ", normalFields=" + normalFields + '}';
// }
}
......@@ -52,7 +52,7 @@
]
},
{
"name": "normalfields",
"name": "normalFields",
"type": [
"null",
{
......
......@@ -6,8 +6,11 @@ import com.zorkdata.datamask.domain.TransactionLog;
import com.zorkdata.datamask.util.avro.AvroDeserializer;
import com.zorkdata.datamask.util.avro.AvroDeserializerFactory;
import com.zorkdata.datamask.util.avro.AvroSerializerFactory;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
......@@ -27,59 +30,58 @@ import java.util.HashMap;
// java -jar avro-tools-1.10.0.jar compile schema log.avro .
//
public class AvroTest {
public static void main(String[] args) {
public static void main(String[] args) throws IOException {
// Avro序列化,写avro文件
// TransactionLog transactionLog = new TransactionLog();
LogData transactionLog = new LogData();
transactionLog.setLogTypeName("kcbp_biz_log");
transactionLog.setTimestamp("2020-09-18T13:59:53.000+08:00");
transactionLog.setSource("d:\\\\kcbp\\\\log\\\\run\\\\20200918\\\\runlog23.log");
transactionLog.setOffset("165683111");
HashMap dimensions = new HashMap() {{
put("appsystem", "jzjy");
put("appprogramname", "jzc9-kcbp1_9600");
put("hostname", "jzc9-kcbp1");
put("func", "");
put("nodeid", "");
put("operway", "W");
}};
transactionLog.setDimensions(dimensions);
HashMap measures = new HashMap<String, Double>() {{
put("latence", 0.0);
put("latency", 1.0);
put("spendtime", 0.5);
}};
transactionLog.setMeasures(measures);
HashMap normalFields = new HashMap() {{
put("indexTime", "2020-09-18T13:59:54.524+08:00");
put("bsflag", "");
put("productcode", "");
put("developercode", "");
put("fmillsecond", "");
put("inputtype", "");
put("logchecktime", "");
put("message", "身份证号码:372925199008075158,地址:上海浦东新区张江高科碧波路690号,手机号:15000101879,邮箱:wanghaiying@zork.com.cn");
put("end_logtime", "");
put("smillsecond", "585606599");
put("featurecode", "");
put("orgid", "");
put("authcode", "");
put("collecttime", "2020-09-18T13:59:53.529+08:00");
put("fundid", "");
put("deserializerTime", "2020-09-18T13:59:53.671+08:00");
put("messid", "0000011404342B32233DDCDA");
put("custid", "");
put("netputr", "");
put("versioninfo", "");
put("beg_logtime", "20200918-135953");
put("authinfo", "");
}};
// transactionLog.setNormalfields(normalFields);
transactionLog.setNormalFields(normalFields);
// LogData transactionLog = new LogData();
// transactionLog.setLogTypeName("kcbp_biz_log");
// transactionLog.setTimestamp("2020-09-18T13:59:53.000+08:00");
// transactionLog.setSource("d:\\\\kcbp\\\\log\\\\run\\\\20200918\\\\runlog23.log");
// transactionLog.setOffset("165683111");
//
// HashMap dimensions = new HashMap() {{
// put("appsystem", "jzjy");
// put("appprogramname", "jzc9-kcbp1_9600");
// put("hostname", "jzc9-kcbp1");
// put("func", "");
// put("nodeid", "");
// put("operway", "W");
// }};
//// transactionLog.setDimensions(dimensions);
//
// HashMap measures = new HashMap<String, Double>() {{
// put("latence", 0.0);
// put("latency", 1.0);
// put("spendtime", 0.5);
// }};
//// transactionLog.setMeasures(measures);
//
// HashMap normalFields = new HashMap() {{
// put("indexTime", "2020-09-18T13:59:54.524+08:00");
// put("bsflag", "");
// put("productcode", "");
// put("developercode", "");
// put("fmillsecond", "");
// put("inputtype", "");
// put("logchecktime", "");
// put("message", "身份证号码:372925199008075158,地址:上海浦东新区张江高科碧波路690号,手机号:15000101879,邮箱:wanghaiying@zork.com.cn");
// put("end_logtime", "");
// put("smillsecond", "585606599");
// put("featurecode", "");
// put("orgid", "");
// put("authcode", "");
// put("collecttime", "2020-09-18T13:59:53.529+08:00");
// put("fundid", "");
// put("deserializerTime", "2020-09-18T13:59:53.671+08:00");
// put("messid", "0000011404342B32233DDCDA");
// put("custid", "");
// put("netputr", "");
// put("versioninfo", "");
// put("beg_logtime", "20200918-135953");
// put("authinfo", "");
// }};
// transactionLog.setNormalFields(normalFields);
// String path = "d:\\transactionlog-20200925.avro"; // avro文件存放目录
// DatumWriter<TransactionLog> logDatumWriter = new SpecificDatumWriter<>(TransactionLog.class);
......@@ -93,11 +95,11 @@ public class AvroTest {
/**
* 序列化
*/
byte[] kcbp_biz_logs = AvroSerializerFactory.getLogAvroSerializer().serializingLog("kcbp_biz_log", "2020-09-18T13:59:53.000+08:00",
"d:\\\\kcbp\\\\log\\\\run\\\\20200918\\\\runlog23.log", "165683111", dimensions, measures, normalFields);
// byte[] kcbp_biz_logs = AvroSerializerFactory.getLogAvroSerializer().serializingLog("kcbp_biz_log", "2020-09-18T13:59:53.000+08:00",
// "d:\\\\kcbp\\\\log\\\\run\\\\20200918\\\\runlog23.log", "165683111", dimensions, measures, normalFields);
// FileOutputStream fos = null;
// try {
// fos = new FileOutputStream("d:\\transactionlog-20200929.avro");
// fos = new FileOutputStream("d:\\transactionlog-20201009.avro");
// } catch (FileNotFoundException e) {
// e.printStackTrace();
// }
......@@ -112,35 +114,47 @@ public class AvroTest {
/**
* 反序列化
*/
// File file = new File("d:\\zork\\part-0-0.avro");
File file = new File("c:\\part-0-0.avro");
// File file = new File("d:\\part-0-0.avro");
// File file = new File("d:\\hdfs-transactionlog-20200929.avro");
byte[] byteBuffer = new byte[(int) file.length()];
FileInputStream fileInputStream = null;
try {
// fileInputStream = new FileInputStream("d:\\zork\\part-0-0.avro");
fileInputStream = new FileInputStream("c:\\part-0-0.avro");
// fileInputStream = new FileInputStream("d:\\hdfs-transactionlog-20200929.avro");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
try {
fileInputStream.read(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
GenericRecord genericRecord = AvroDeserializerFactory.getLogsDeserializer().deserializing(byteBuffer);
System.out.println(genericRecord);
// File file = new File("d:\\transactionlog-20201009.avro");
// byte[] byteBuffer = new byte[(int) file.length()];
//
// FileInputStream fileInputStream = null;
// try {
//// fileInputStream = new FileInputStream("d:\\part-0-0.avro");
// fileInputStream = new FileInputStream("d:\\transactionlog-20201009.avro");
// } catch (FileNotFoundException e) {
// e.printStackTrace();
// }
// try {
// fileInputStream.read(byteBuffer);
// } catch (IOException e) {
// e.printStackTrace();
// }
// GenericRecord genericRecord = AvroDeserializerFactory.getLogsDeserializer().deserializing(byteBuffer);
// System.out.println(genericRecord);
// 读取avro文件,反序列化
// DatumReader<TransactionLog> reader = new SpecificDatumReader<TransactionLog>(TransactionLog.class);
// DataFileReader<TransactionLog> dataFileReader = new DataFileReader<TransactionLog>(new File("d:\\transactionlog-20200925.avro"), reader);
//// DataFileReader<TransactionLog> dataFileReader = new DataFileReader<TransactionLog>(new File("D:\\test.avro"), reader);
// TransactionLog transactionLogRead = null;
// DatumReader<LogData> reader = new SpecificDatumReader<LogData>(LogData.class);
//// DataFileReader<LogData> dataFileReader = new DataFileReader<LogData>(new File("d:\\part-0-0.avro"), reader);
// DataFileReader<LogData> dataFileReader = new DataFileReader<LogData>(new File("d:\\transactionlog-20201009.avro"), reader);
// LogData transactionLogRead = null;
// while (dataFileReader.hasNext()) {
// transactionLogRead = dataFileReader.next();
// System.out.println(transactionLogRead);
// }
Schema schema = new Schema.Parser().parse(new File("d:\\log.avro"));
GenericRecord emp = new GenericData.Record(schema);
File file = new File("d:\\part-0-0.avro");
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader);
while (dataFileReader.hasNext())
{
emp = dataFileReader.next();
System.out.println(emp);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment