Commit d81193c5 authored by zskycode's avatar zskycode

<dev> 源和目标 hdfs ,两个集群的hdfs配置

parent b46fc271
Pipeline #51186 passed with stages
in 1 minute and 18 seconds
......@@ -53,6 +53,8 @@ public class JobConfig implements Serializable {
}
keyList = keyList.stream().distinct().collect(Collectors.toList());
this.setDesensitizationKeyList(keyList);
this.hdfsSourcePath = jobInitConfig.getHdfsSourcePath();
this.hdfsSinkPath = jobInitConfig.getHdfsSinkPath();
}
private String jobName;
......@@ -72,4 +74,7 @@ public class JobConfig implements Serializable {
private long endTimestamp;
private List<String> desensitizationKeyList;
private String hdfsSourcePath;
  • Remove this unused "hdfsSourcePath" private field. 📘

Please register or sign in to reply
private String hdfsSinkPath;
  • Remove this unused "hdfsSinkPath" private field. 📘

Please register or sign in to reply
}
......@@ -45,6 +45,8 @@ public class JobInitConfig implements Serializable {
this.mac = MapUtils.getString(conf, ConfigConstants.MAC_KEY);
this.position = MapUtils.getString(conf, ConfigConstants.POSITION_KEY);
this.password = MapUtils.getString(conf, ConfigConstants.PASSWORD_KEY);
this.hdfsSourcePath = MapUtils.getString(conf, ConfigConstants.HDFS_SOURCE_PATH);
this.hdfsSinkPath = MapUtils.getString(conf, ConfigConstants.HDFS_SINK_PATH);
}
private String jobName;
......@@ -61,6 +63,8 @@ public class JobInitConfig implements Serializable {
private String endTime;
private long startTimestamp;
private long endTimestamp;
private String hdfsSourcePath;
  • Remove this unused "hdfsSourcePath" private field. 📘

Please register or sign in to reply
private String hdfsSinkPath;
  • Remove this unused "hdfsSinkPath" private field. 📘

Please register or sign in to reply
private String name;
......
......@@ -37,6 +37,8 @@ public final class ConfigConstants {
public static final String MAC_KEY = "mac";
public static final String POSITION_KEY = "position";
public static final String PASSWORD_KEY = "password";
public static final String HDFS_SOURCE_PATH = "hdfs_source_path";
public static final String HDFS_SINK_PATH = "hdfs_sink_path";
public static final String SERVERS = "servers";
public static final String ZOOKEEPER = "zookeeper";
......
......@@ -11,6 +11,7 @@ import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
......@@ -57,14 +58,27 @@ public class HdfsLogDesensitization implements Serializable {
public void desensitizationHdfsLog(JobConfig jobConfig) {
// 初始化flink job env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
JobConf jobConf = new JobConf();
jobConf.set(AVRO_OUTPUT_SCHEMA, jobConfig.getAvroOutputSchema());
JobConf jobConfInput = new JobConf();
if (!StringUtils.isEmpty(jobConfig.getHdfsSourcePath())) {
jobConfInput.addResource(new Path("file://" + jobConfig.getHdfsSourcePath() + "/core-site.xml"));
  • 🚫 Define a constant instead of duplicating this literal "file://" 4 times. 📘

Please register or sign in to reply
jobConfInput.addResource(new Path("file://" + jobConfig.getHdfsSourcePath() + "/hdfs-site.xml"));
/// jobConfInput.addResource(new Path("file://" + jobConfig.getHdfsSourcePath() + "/mapred-site.xml"));
  • This block of commented-out lines of code should be removed. 📘

Please register or sign in to reply
}
JobConf jobConfOutput = new JobConf();
if (!StringUtils.isEmpty(jobConfig.getHdfsSinkPath())) {
jobConfOutput.addResource(new Path("file://" + jobConfig.getHdfsSinkPath() + "/core-site.xml"));
jobConfOutput.addResource(new Path("file://" + jobConfig.getHdfsSinkPath() + "/hdfs-site.xml"));
/// jobConfInput.addResource(new Path("file://" + jobConfig.getHdfsSinkPath() + "/mapred-site.xml"));
  • This block of commented-out lines of code should be removed. 📘

Please register or sign in to reply
}
jobConfInput.set(AVRO_OUTPUT_SCHEMA, jobConfig.getAvroOutputSchema());
jobConfOutput.set(AVRO_OUTPUT_SCHEMA, jobConfig.getAvroOutputSchema());
// source部分
// 1、通过时间获取文件夹信息
List<String> logFiles = filterHdfsLogFiles(jobConfig.getHdfsSrc(), jobConfig.getHdfsUri(), jobConfig.getHdfsUser());
String logFileListString = list2String(logFiles);
HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<>
(new AvroInputFormat(), Object.class, Object.class, jobConf);
(new AvroInputFormat(), Object.class, Object.class, jobConfInput);
AvroInputFormat.addInputPaths(hadoopInputFormat.getJobConf(), logFileListString);
// 2、创建datasource
DataSource<Tuple2<Object, Object>> hdfsLogInput = env
......@@ -75,16 +89,16 @@ public class HdfsLogDesensitization implements Serializable {
// sink部分
// 获取目标hdfs的输出目录
String filePath = jobConfig.getHdfsDest();
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(new AvroOutputFormat(), jobConf);
FileOutputFormat.setOutputPath(jobConf, new Path(filePath));
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(new AvroOutputFormat(), jobConfOutput);
FileOutputFormat.setOutputPath(jobConfOutput, new Path(filePath));
// avro序列化算子(.writeAsText("file:///lmt/output"); 本地写入)
flatMapOperator.map(new MapFunction<LogData, Tuple2<AvroWrapper<LogData>, NullWritable>>() {
@Override
public Tuple2<AvroWrapper<LogData>, NullWritable> map(LogData value) throws Exception {
AvroKey<LogData> key = new AvroKey<>(value);
return new Tuple2<>(key, NullWritable.get());
}
}).setParallelism(jobConfig.getTransformerParallelism())
@Override
public Tuple2<AvroWrapper<LogData>, NullWritable> map(LogData value) throws Exception {
AvroKey<LogData> key = new AvroKey<>(value);
return new Tuple2<>(key, NullWritable.get());
}
}).setParallelism(jobConfig.getTransformerParallelism())
// .writeAsText("file:///lmt/output", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE)
.output(hadoopOutputFormat)
.setParallelism(jobConfig.getSinkParallelism());
......
......@@ -50,6 +50,10 @@ hdfs_src: "/tmp/datawarehouse/net23/"
# hdfs日志写入地址,非必传,默认写到hdfs-src目录下的output目录下,必须以斜杠结尾
hdfs_dest: "/tmp/datawarehouse/net23/output14/"
hdfs_source_path: "/root/hadoop/hadoop-conf"
hdfs_sink_path: "/root/hadoop/ambari"
# cdh下载配置
# cdh能执行hdfs命令的机器的ip
cdh_host_ip: "192.168.70.2"
......
  • SonarQube analysis reported 206 issues

    • 7 blocker
    • 🚫 38 critical
    • 138 major
    • 🔽 20 minor
    • 3 info

    Watch the comments in this conversation to review them.

    Top 30 extra issues

    Note: The following issues were found on lines that were not modified in the commit. Because these issues can't be reported as line comments, they are summarized here:

    1. Remove this hard-coded password. 📘
    2. Remove this hard-coded password. 📘
    3. Remove this hard-coded password. 📘
    4. Remove this hard-coded password. 📘
    5. Remove this hard-coded password. 📘
    6. Remove this hard-coded password. 📘
    7. Remove this hard-coded password. 📘
    8. 🚫 Define a constant instead of duplicating this literal " {\n" 11 times. 📘
    9. 🚫 [Define a constant instead of duplicating this literal " "type": \n" 11 times. 📘
    10. 🚫 Define a constant instead of duplicating this literal " "string",\n" 6 times. 📘
    11. 🚫 Define a constant instead of duplicating this literal " "null"\n" 6 times. 📘
    12. 🚫 [Define a constant instead of duplicating this literal " ]\n" 11 times.](https://git.zorkdata.com/liaomingtao/transaction-log-desensitization/blob/d81193c5a45272f5ec214a092c59d4c40bfceb37/src/main/java/com/zorkdata/desensitization/avro/AvroSchemaDef.java#L23) 📘
    13. 🚫 Define a constant instead of duplicating this literal " },\n" 9 times. 📘
    14. 🚫 Define a constant instead of duplicating this literal " "null",\n" 5 times. 📘
    15. 🚫 Define a constant instead of duplicating this literal " {\n" 5 times. 📘
    16. 🚫 Define a constant instead of duplicating this literal " "type": "map",\n" 5 times. 📘
    17. 🚫 Define a constant instead of duplicating this literal " "values": "string"\n" 3 times. 📘
    18. 🚫 Define a constant instead of duplicating this literal " }\n" 5 times. 📘
    19. 🚫 Define a constant instead of duplicating this literal "序列化失败" 13 times. 📘
    20. 🚫 Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed. 📘
    21. 🚫 Refactor this method to reduce its Cognitive Complexity from 161 to the 15 allowed. 📘
    22. 🚫 Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed. 📘
    23. 🚫 Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed. 📘
    24. 🚫 Refactor this method to reduce its Cognitive Complexity from 19 to the 15 allowed. 📘
    25. 🚫 Change this "try" to a try-with-resources. (sonar.java.source not set. Assuming 7 or greater.) 📘
    26. 🚫 Refactor this code to not throw exceptions in finally blocks. 📘
    27. 🚫 Refactor this code to not throw exceptions in finally blocks. 📘
    28. 🚫 Change this "try" to a try-with-resources. (sonar.java.source not set. Assuming 7 or greater.) 📘
    29. 🚫 Define a constant instead of duplicating this literal "jobName" 8 times. 📘
    30. 🚫 Define a constant instead of duplicating this literal "2020-11-07 21:22:20" 8 times. 📘
    • ... 169 more
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment