Commit c5a13c7c authored by 王海鹰's avatar 王海鹰

Merge branch 'master' into 'master'

添加sink txt

See merge request !1
parents 2e1fea55 70bee141
# maven ignore
target
*.jar
*.war
*.zip
*.tar
*.tar.gz
release
sources
lib
plugins
modules
# eclipse ignore
*.class
.settings/
.project
.classpath
# idea ignore
.idea/
*.ipr
*.iml
*.iws
# temp ignore
*.log
*.cache
*.diff
*.patch
*.tmp
logs
# system ignore
.DS_Store
Thumbs.db
**/*.jar
**/*.class
.sincedb
kafka0.10.x/src/docs/*
# Package Files #
*.war
*.ear
dependency-reduced-pom.xml
.vertx/
release
smartdata-streamx
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/
......@@ -9,15 +9,18 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroTextOutputFormat;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
......@@ -29,7 +32,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import java.io.IOException;
import java.net.URI;
......@@ -92,7 +97,13 @@ public class TransactionLogMask {
AvroInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(logFile));
DataSource<Tuple2<Object, Object>> input = env.createInput(hadoopInputFormat);
// input.print();
FlatMapOperator<Tuple2<Object, Object>, Object> map = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
@Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> collector) throws Exception {
collector.collect(value.f0);
}
});
/**
* 脱敏算子
......@@ -100,34 +111,29 @@ public class TransactionLogMask {
FlatMapOperator<Tuple2<Object, Object>, Object> maskFlatMapOperator = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
@Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> out) throws Exception {
// System.out.println("--------------------value:" + value);
// System.out.println("--------------------getField0" + value.getField(0));
LogData logData = JSON.parseObject(value.getField(0).toString(), new TypeReference<LogData>(){});
LogData logData = JSON.parseObject(value.getField(0).toString(), new TypeReference<LogData>() {
});
logData.setNormalFields(MaskUtil.mask(logData.getNormalFields()));
out.collect(logData);
}
});
// tuple2ObjectFlatMapOperator.print();
/**
* avro序列化算子
*/
FlatMapOperator<Object, Object> avroFlatMapOperator = maskFlatMapOperator.flatMap(new FlatMapFunction<Object, Object>() {
FlatMapOperator<Object, String> objectStringFlatMapOperator = maskFlatMapOperator.flatMap(new FlatMapFunction<Object, String>() {
@Override
public void flatMap(Object value, Collector<Object> out) throws Exception {
// System.out.println("--------------------value:" + value);
LogData logData = (LogData)value;
byte[] bytes = AvroSerializerFactory.getLogAvroSerializer().serializingLog(logData.getLogTypeName(), logData.getTimestamp(),
logData.getSource(), logData.getOffset(), logData.getDimensions(), logData.getMeasures(), logData.getNormalFields());
out.collect(bytes);
public void flatMap(Object value, Collector<String> collector) throws Exception {
LogData logData = (LogData) value;
collector.collect(JSON.toJSONString(logData));
}
});
avroFlatMapOperator.print();
// objectStringFlatMapOperator.print();
String logFileName = logFile.split("/")[logFile.split("/").length - 1];
String filePath = hdfsDest + logFileName;
System.out.println("---------------writepath-----------------:" + filePath);
avroFlatMapOperator.writeAsText(filePath, org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).name("hadoop-sink");
objectStringFlatMapOperator.writeAsText(filePath, org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).name("hadoop-sink");
try {
env.execute("国泰交易日志脱敏job");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment