Commit 5f09be42 authored by DeleMing's avatar DeleMing

<dev>

1. 优化hdfs下载脚本
parent 69f90fb7
Pipeline #15133 failed with stages
in 40 seconds
......@@ -49,5 +49,14 @@
</includes>
</fileSet>
<!-- 打包环境变量配置文件 -->
<fileSet>
<directory>package/env</directory>
<outputDirectory>${file.separator}</outputDirectory>
<includes>
<include>.env</include>
</includes>
</fileSet>
</fileSets>
</assembly>
#!/usr/bin/env bash
FLINK_TASK_CONF=application.yml
CDH_HOST_IP=192.168.70.2
CDH_HOST_USER=root
CDH_HOST_PASSWORD=NuqUtwbJUBRmUwgh
HDFS_DEST=/tmp/datawarehouse/jzjy/kcbp_biz_log/output1/
DOWNLOAD_PATH=/tmp/
JOB_NAME="国泰交易日志脱敏job"
SCP_PATH=$DOWNLOAD_PATH$(echo $HDFS_DEST|rev |cut -d '/' -f 2 | rev)/
LOCAL_IP=$(ip a |grep inet| grep -v inet6 | grep -v 127 | cut -d '/' -f1 | cut -d ' ' -f6)
export BASE_PATH=$(cd `dirname $0`; pwd)
DEPLOY_PATH=${BASE_PATH%/*}
if [ ! -d "$DEPLOY_PATH/logs" ]; then
mkdir -p $DEPLOY_PATH/logs
fi
FLINK_TASK_CONF=application.yml
flink run -d -c com.zorkdata.desensitization.TransactionLogDesensitization $DEPLOY_PATH/lib/transaction-log-desensitization-0.1.jar --conf $DEPLOY_PATH/conf/$FLINK_TASK_CONF> $DEPLOY_PATH/logs/transaction-log-desensitization-0.1.log &
download(){
expect <<EOF
set timeout 10
spawn ssh $CDH_HOST_USER@$CDH_HOST_IP
expect {
"yes/no" { send "yes\n";exp_continue }
"password:" { send "$CDH_HOST_PASSWORD\n" }
}
expect "]# " { send "sudo -u hdfs hadoop fs -copyToLocal $HDFS_DEST $DOWNLOAD_PATH\n" }
expect "]# " {
send "scp -r $SCP_PATH root@$LOCAL_IP:/tmp/\n"
expect {
"yes/no" { send "yes\n";exp_continue }
"password: " { send "$CDH_HOST_PASSWORD\n" }
}
expect "]# " { send "exit\n" }
}
expect "]# " { send "exit\n" }
EOF
}
flink run $DEPLOY_PATH/lib/transaction-log-desensitization-0.1.jar --conf $DEPLOY_PATH/conf/$FLINK_TASK_CONF> $DEPLOY_PATH/logs/transaction-log-desensitization-0.1.log &
\ No newline at end of file
sleep 10
while :
do
FLINK_LIST_RUNNING=$(flink list -r )
FLAG=$(echo $FLINK_LIST_RUNNING | grep "$JOB_NAME")
if [[ "$FLAG" == "" ]]
then
download
break
fi
done
\ No newline at end of file
This diff is collapsed.
......@@ -8,6 +8,8 @@ package com.zorkdata.desensitization.constans;
*/
public final class ConfigConstants {
public static final String JOB_NAME = "job_name";
public static final String SOURCE = "source";
public static final String PARALLELISM = "parallelism";
......
......@@ -54,6 +54,7 @@ public class HdfsLogDesensitization implements Serializable {
private static final String AVRO_OUTPUT_SCHEMA = "avro.output.schema";
private static final String HOSTNAME = "hostname";
private String jobName;
private int parallelism;
private int maxFileNum;
private String avroOutputSchema;
......@@ -70,6 +71,7 @@ public class HdfsLogDesensitization implements Serializable {
private Map<String, String> conf;
public HdfsLogDesensitization initConf(Map<String, String> conf) {
this.jobName = String.valueOf(conf.get(ConfigConstants.JOB_NAME));
if (!conf.containsKey(PARALLELISM_KEY)) {
this.parallelism = DEFAULT_PARALLELISM;
} else {
......@@ -107,81 +109,82 @@ public class HdfsLogDesensitization implements Serializable {
RegularExpressions regularExpressions = new RegularExpressions(conf);
DesensitizationFunction desensitizationFunction = new DesensitizationFunction(regularExpressions);
List<String> logFiles = filterHdfsLogFiles(hdfsSrc, hdfsUri, hdfsUser);
// String logFileListString = list2String(logFiles);
List<String> fileStringList = changeList(logFiles);
for (String logFileListString : fileStringList) {
/**
* 读取hdfs日志文件,avro反序列化处理
*/
HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<>
(new AvroInputFormat(), Object.class, Object.class, jobConf);
AvroInputFormat.addInputPaths(hadoopInputFormat.getJobConf(), logFileListString);
DataSource<Tuple2<Object, Object>> hdfsLogInput = env.createInput(hadoopInputFormat);
/**
* 脱敏算子
*/
FlatMapOperator<Tuple2<Object, Object>, Object> maskFlatMapOperator =
hdfsLogInput.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
@Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> collector) {
LogData logData = JSON.parseObject(value.getField(0).toString(),
new TypeReference<LogData>() {
});
//根据日志事件的核心信息做过滤
if (null != core && logData.getDimensions().get(HOSTNAME).contains(core)) {
//根据日志事件的timestamp做过滤
Long timestamp = DateUtil.utc2timestamp(logData.getTimestamp());
List<String> dataFormats = new ArrayList<String>() {{
add(",");
add(".");
add("@");
add("-");
}};
// Map desensitization = desensitizationFunction.
// desensitization(logData.getNormalFields(), fieldsWhiteList, dataFormats);
// logData.setNormalFields(desensitization);
// log.error("转换数据成功,转换后数据:{}", JSON.toJSONString(logData));
// collector.collect(logData);
if (null != timestamp && timestamp.compareTo(startTimestamp) >= 0 &&
timestamp.compareTo(endTimestamp) <= 0) {
Map desensitization = desensitizationFunction.
desensitization(logData.getNormalFields(), fieldsWhiteList, dataFormats);
logData.setNormalFields(desensitization);
log.error("转换数据成功,转换后数据:{}", JSON.toJSONString(logData));
collector.collect(logData);
} else {
// log.error("转换数据失败,原始数据:{}", JSON.toJSONString(logData));
}
String logFileListString = list2String(logFiles);
// List<String> fileStringList = changeList(logFiles);
  • This block of commented-out lines of code should be removed. 📘

Please register or sign in to reply
// DataSource<Tuple2<Object, Object>> dataSource = null;
// for (String logFileListString : fileStringList) {
//
// }
  • This block of commented-out lines of code should be removed. 📘

Please register or sign in to reply
/**
* 读取hdfs日志文件,avro反序列化处理
*/
HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<>
(new AvroInputFormat(), Object.class, Object.class, jobConf);
AvroInputFormat.addInputPaths(hadoopInputFormat.getJobConf(), logFileListString);
DataSource<Tuple2<Object, Object>> hdfsLogInput = env.createInput(hadoopInputFormat);
hdfsLogInput.union(hdfsLogInput);
/**
* 脱敏算子
*/
FlatMapOperator<Tuple2<Object, Object>, Object> maskFlatMapOperator =
hdfsLogInput.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
@Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> collector) {
LogData logData = JSON.parseObject(value.getField(0).toString(),
new TypeReference<LogData>() {
});
//根据日志事件的核心信息做过滤
if (null != core && logData.getDimensions().get(HOSTNAME).contains(core)) {
//根据日志事件的timestamp做过滤
Long timestamp = DateUtil.utc2timestamp(logData.getTimestamp());
List<String> dataFormats = new ArrayList<String>() {{
  • Move the contents of this initializer to a standard constructor or to field initializers. 📘 🔽 Use another way to initialize this instance. 📘

Please register or sign in to reply
add(",");
add(".");
add("@");
add("-");
}};
// Map desensitization = desensitizationFunction.
// desensitization(logData.getNormalFields(), fieldsWhiteList, dataFormats);
  • This block of commented-out lines of code should be removed. 📘

Please register or sign in to reply
// logData.setNormalFields(desensitization);
// log.error("转换数据成功,转换后数据:{}", JSON.toJSONString(logData));
// collector.collect(logData);
  • 及时清理不再使用的代码段或配置信息。 📘

Please register or sign in to reply
if (null != timestamp && timestamp.compareTo(startTimestamp) >= 0 &&
timestamp.compareTo(endTimestamp) <= 0) {
Map desensitization = desensitizationFunction.
desensitization(logData.getNormalFields(), fieldsWhiteList, dataFormats);
logData.setNormalFields(desensitization);
log.error("转换数据成功,转换后数据:{}", JSON.toJSONString(logData));
collector.collect(logData);
} else {
// log.error("转换数据失败,原始数据:{}", JSON.toJSONString(logData));
  • This block of commented-out lines of code should be removed. 📘

Please register or sign in to reply
}
}
});
// 获取目标hdfs的输出目录
String logFileName = "output.avro";
String filePath = hdfsDest + logFileName.replace(GeneralConstants.AVRO_SUFFIX,
GeneralConstants.EMPTY_STR);
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(new AvroOutputFormat(), jobConf);
FileOutputFormat.setOutputPath(jobConf, new Path(filePath));
}
});
// 获取目标hdfs的输出目录
  • 及时清理不再使用的代码段或配置信息。 📘

Please register or sign in to reply
String logFileName = "output.avro";
String filePath = hdfsDest + logFileName.replace(GeneralConstants.AVRO_SUFFIX,
GeneralConstants.EMPTY_STR);
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(new AvroOutputFormat(), jobConf);
FileOutputFormat.setOutputPath(jobConf, new Path(filePath));
/**
* avro序列化算子
*/
maskFlatMapOperator.map(new MapFunction<Object, Tuple2<AvroWrapper<LogData>, NullWritable>>() {
@Override
public Tuple2<AvroWrapper<LogData>, NullWritable> map(Object value) throws Exception {
AvroKey<LogData> key = new AvroKey<>((LogData) value);
Tuple2<AvroWrapper<LogData>, NullWritable> tuple = new Tuple2<>(key, NullWritable.get());
return tuple;
}
}).output(hadoopOutputFormat);
try {
env.execute("国泰交易日志脱敏job");
} catch (Exception e) {
System.out.println(e.getMessage());
log.error(String.valueOf(e));
/**
* avro序列化算子
*/
maskFlatMapOperator.map(new MapFunction<Object, Tuple2<AvroWrapper<LogData>, NullWritable>>() {
@Override
public Tuple2<AvroWrapper<LogData>, NullWritable> map(Object value) throws Exception {
AvroKey<LogData> key = new AvroKey<>((LogData) value);
Tuple2<AvroWrapper<LogData>, NullWritable> tuple = new Tuple2<>(key, NullWritable.get());
  • 🔽 Immediately return this expression instead of assigning it to the temporary variable "tuple". 📘

Please register or sign in to reply
return tuple;
}
}).output(hadoopOutputFormat);
try {
env.execute(jobName);
} catch (Exception e) {
System.out.println(e.getMessage());
  • Replace this use of System.out or System.err by a logger. 📘

Please register or sign in to reply
log.error(String.valueOf(e));
}
// for (String logFile : logFiles) {
// /**
// * 读取hdfs日志文件,avro反序列化处理
......
# 任务配置
job_name: "国泰交易日志脱敏job"
# 并行度
parallelism: "1"
# 文件个数、此为最大文件个数合并为一个任务,防止任务由于打开文件个数导致任务挂
......
  • SonarQube analysis reported 112 issues

    • 🚫 21 critical
    • 72 major
    • 🔽 18 minor
    • 1 info

    Watch the comments in this conversation to review them.

    Top 30 extra issues

    Note: The following issues were found on lines that were not modified in the commit. Because these issues can't be reported as line comments, they are summarized here:

    1. 🚫 Add a default case to this switch. 📘
    2. 🚫 switch中每个case需要通过break/return等来终止 📘
    3. 🚫 switch块缺少default语句 📘
    4. 🚫 Define a constant instead of duplicating this literal " {\n" 11 times. 📘
    5. 🚫 [Define a constant instead of duplicating this literal " "type": \n" 11 times. 📘
    6. 🚫 Define a constant instead of duplicating this literal " "string",\n" 6 times. 📘
    7. 🚫 Define a constant instead of duplicating this literal " "null"\n" 6 times. 📘
    8. 🚫 [Define a constant instead of duplicating this literal " ]\n" 11 times.](https://git.zorkdata.com/liaomingtao/transaction_log_desensitization/blob/5f09be4271e1918927fe8feee24f14215090ac45/src/main/java/com/zorkdata/desensitization/avro/AvroSchemaDef.java#L23) 📘
    9. 🚫 Define a constant instead of duplicating this literal " },\n" 9 times. 📘
    10. 🚫 Define a constant instead of duplicating this literal " "null",\n" 5 times. 📘
    11. 🚫 Define a constant instead of duplicating this literal " {\n" 5 times. 📘
    12. 🚫 Define a constant instead of duplicating this literal " "type": "map",\n" 5 times. 📘
    13. 🚫 Define a constant instead of duplicating this literal " "values": "string"\n" 3 times. 📘
    14. 🚫 Define a constant instead of duplicating this literal " }\n" 5 times. 📘
    15. 🚫 Define a constant instead of duplicating this literal "序列化失败" 13 times. 📘
    16. 🚫 Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed. 📘
    17. 🚫 Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed. 📘
    18. 🚫 Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation. 📘
    19. 🚫 Change this "try" to a try-with-resources. (sonar.java.source not set. Assuming 7 or greater.) 📘
    20. 🚫 Refactor this code to not throw exceptions in finally blocks. 📘
    21. 🚫 Refactor this code to not throw exceptions in finally blocks. 📘
    22. This block of commented-out lines of code should be removed. 📘
    23. 及时清理不再使用的代码段或配置信息。 📘
    24. Replace this use of System.out or System.err by a logger. 📘
    25. Replace this use of System.out or System.err by a logger. 📘
    26. String contains no format specifiers. 📘
    27. Replace this use of System.out or System.err by a logger. 📘
    28. Rename "jsonObject" which hides the field declared at line 39. 📘
    29. Remove this expression which always evaluates to "true" 📘
    30. Remove this expression which always evaluates to "true" 📘
    • ... 72 more
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment