Commit dd502ffa authored by 谢森's avatar 谢森

<dev>暂存代码

parent 7c58b271
# maven ignore
target
*.jar
*.war
*.zip
*.tar
*.tar.gz
release
sources
lib
plugins
modules
# eclipse ignore
*.class
.settings/
.project
.classpath
# idea ignore
.idea/
*.ipr
*.iml
*.iws
# temp ignore
*.log
*.cache
*.diff
*.patch
*.tmp
logs
# system ignore
.DS_Store
Thumbs.db
**/*.jar
**/*.class
.sincedb
kafka0.10.x/src/docs/*
# Package Files #
*.war
*.ear
dependency-reduced-pom.xml
.vertx/
release
smartdata-streamx
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/
......@@ -13,6 +13,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
......@@ -104,47 +105,34 @@ public class TransactionLogMask {
DataSource<Tuple2<Object, Object>> input = env.createInput(hadoopInputFormat);
input.print();
// FlatMapOperator<Tuple2<AvroKey, AvroValue>, LogData> tuple2ObjectFlatMapOperator
// input.print();
DataSet<Tuple2<Object, Object>> textFileSource = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
/**
* The core method of the FlatMapFunction. Takes an element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
* @param out The collector for returning result values.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
FlatMapOperator<Tuple2<Object, Object>, Object> map = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
@Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> out) throws Exception {
System.out.println("--------------------" + value);
System.out.println("--------------------" + value.getField(0));
System.out.println("--------------------" + value.getField(1));
// LogData transactionLog = avroDeserialize(value.getBytes());
// System.out.println("--------------------" + transactionLog);
// transactionLog.setNormalFields(MaskUtil.mask(transactionLog.getNormalFields()));
out.collect(null);
public void flatMap(Tuple2<Object, Object> value, Collector<Object> collector) throws Exception {
collector.collect(value.f0);
}
}).getInput();
});
map.print();
// DataSet<Tuple2<Object, Object>> textFileSource = input.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
//
// @Override
// public void flatMap(Tuple2<Object, Object> value, Collector<Object> out) throws Exception {
// System.out.println("--------------------" + value);
// System.out.println("--------------------" + value.getField(0));
// System.out.println("--------------------" + value.getField(1));
// out.collect(null);
// }
// });
// textFileSource.print();
// DataSet<Tuple2<AvroKey, AvroValue>> textFileSource = tuple2ObjectFlatMapOperator.getInput();
// DataSet<Tuple2<AvroKey, LogData>> textFileSource = input.flatMap((FlatMapFunction<Tuple2<AvroKey, LogData>, Object>) (value, out) ->
// System.out.println("------------------" + value)).getInput();
// env.createInput(logDataInput).flatMap(new Avro2StrFlatMapFunction());
// DataSet<String> textFileSource = env.readTextFile(logFile).name("hadoop-source");
// DataSet<String> flatMap = textFileSource.map(new Avro2StrFlatMapFunction());
String logFileName = logFile.split("/")[logFile.split("/").length - 1];
// flatMap.print();
String filePath = hdfsDest + logFileName;
System.out.println("---------------writepath-----------------:" + filePath);
textFileSource.writeAsText(filePath, org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).name("hadoop-sink");
// String logFileName = logFile.split("/")[logFile.split("/").length - 1];
// String filePath = hdfsDest + logFileName;
// System.out.println("---------------writepath-----------------:" + filePath);
// textFileSource.writeAsText(filePath, org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).name("hadoop-sink");
try {
env.execute("国泰交易日志脱敏job");
// env.execute("国泰交易日志脱敏job");
} catch (Exception e) {
e.printStackTrace();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment