Commit 1cb26b74 authored by DeleMing's avatar DeleMing

<dev>

1. 整理代码
parent ec3e44b8
Pipeline #15189 failed with stages
in 1 minute and 21 seconds
......@@ -2,14 +2,6 @@
FLINK_TASK_CONF=application.yml
REGULAR_TASK_NAME=regular
CDH_HOST_IP=192.168.70.2
CDH_HOST_USER=root
CDH_HOST_PASSWORD=NuqUtwbJUBRmUwgh
HDFS_DEST=/tmp/datawarehouse/jzjy/kcbp_biz_log/output1/
DOWNLOAD_PATH=/tmp/
JOB_NAME="国泰交易日志脱敏job"
SCP_PATH=$DOWNLOAD_PATH$(echo $HDFS_DEST|rev |cut -d '/' -f 2 | rev)/
LOCAL_IP=$(ip a |grep inet| grep -v inet6 | grep -v 127 | cut -d '/' -f1 | cut -d ' ' -f6)
export BASE_PATH=$(cd `dirname $0`; pwd)
DEPLOY_PATH=${BASE_PATH%/*}
......@@ -18,44 +10,3 @@ if [ ! -d "$DEPLOY_PATH/logs" ]; then
fi
flink run -d -c com.zorkdata.desensitization.TransactionLogDesensitization $DEPLOY_PATH/lib/transaction-log-desensitization-0.1.jar --conf $DEPLOY_PATH/conf/$FLINK_TASK_CONF --regular $DEPLOY_PATH/conf/$REGULAR_TASK_NAME > $DEPLOY_PATH/logs/submit.log &
sleep 10
TASK_STATUS=$(cat $DEPLOY_PATH/logs/submit.log |grep "Job has been submitted with JobID")
if [[ "$TASK_STATUS" == "" ]]
then
echo "提交任务失败"
exit 8
fi
download(){
expect <<EOF
set timeout 10
spawn ssh $CDH_HOST_USER@$CDH_HOST_IP
expect {
"yes/no" { send "yes\n";exp_continue }
"password:" { send "$CDH_HOST_PASSWORD\n" }
}
expect "]# " { send "sudo -u hdfs hadoop fs -copyToLocal $HDFS_DEST $DOWNLOAD_PATH\n" }
expect "]# " {
send "scp -r $SCP_PATH root@$LOCAL_IP:/tmp/\n"
expect {
"yes/no" { send "yes\n";exp_continue }
"password: " { send "$CDH_HOST_PASSWORD\n" }
}
expect "]# " { send "exit\n" }
}
expect "]# " { send "exit\n" }
EOF
}
while :
do
FLINK_LIST_RUNNING=$(flink list -r )
FLAG=$(echo $FLINK_LIST_RUNNING | grep "$JOB_NAME")
if [[ "$FLAG" == "" ]]
then
download
break
fi
done
\ No newline at end of file
#!/usr/bin/env bash
FLINK_TASK_CONF=application.yml
REGULAR_TASK_NAME=regular
CDH_HOST_IP=192.168.70.2
CDH_HOST_USER=root
CDH_HOST_PASSWORD=NuqUtwbJUBRmUwgh
HDFS_DEST=/tmp/datawarehouse/jzjy/kcbp_biz_log/output1/
DOWNLOAD_PATH=/tmp/
JOB_NAME="国泰交易日志脱敏job"
SCP_PATH=$DOWNLOAD_PATH$(echo $HDFS_DEST|rev |cut -d '/' -f 2 | rev)/
LOCAL_IP=$(ip a |grep inet| grep -v inet6 | grep -v 127 | cut -d '/' -f1 | cut -d ' ' -f6)
export BASE_PATH=$(cd `dirname $0`; pwd)
DEPLOY_PATH=${BASE_PATH%/*}
if [ ! -d "$DEPLOY_PATH/logs" ]; then
mkdir -p $DEPLOY_PATH/logs
fi
flink run -d -c com.zorkdata.desensitization.TransactionLogDesensitization $DEPLOY_PATH/lib/transaction-log-desensitization-0.1.jar --conf $DEPLOY_PATH/conf/$FLINK_TASK_CONF --regular $DEPLOY_PATH/conf/$REGULAR_TASK_NAME > $DEPLOY_PATH/logs/submit.log &
sleep 10
TASK_STATUS=$(cat $DEPLOY_PATH/logs/submit.log |grep "Job has been submitted with JobID")
if [[ "$TASK_STATUS" == "" ]]
then
echo "提交任务失败"
exit 8
fi
download(){
expect <<EOF
set timeout 10
spawn ssh $CDH_HOST_USER@$CDH_HOST_IP
expect {
"yes/no" { send "yes\n";exp_continue }
"password:" { send "$CDH_HOST_PASSWORD\n" }
}
expect "]# " { send "sudo -u hdfs hadoop fs -copyToLocal $HDFS_DEST $DOWNLOAD_PATH\n" }
expect "]# " {
send "scp -r $SCP_PATH root@$LOCAL_IP:/tmp/\n"
expect {
"yes/no" { send "yes\n";exp_continue }
"password: " { send "$CDH_HOST_PASSWORD\n" }
}
expect "]# " { send "exit\n" }
}
expect "]# " { send "exit\n" }
EOF
}
while :
do
FLINK_LIST_RUNNING=$(flink list -r )
FLAG=$(echo $FLINK_LIST_RUNNING | grep "$JOB_NAME")
if [[ "$FLAG" == "" ]]
then
download
break
fi
done
\ No newline at end of file
......@@ -18,9 +18,12 @@ public class TransactionLogDesensitization {
private static final int PARAM_LENGTH = 4;
public static void main(String[] args) {
public static void main(String[] args) throws Exception{
long start = System.currentTimeMillis();
if (args.length != PARAM_LENGTH) {
String error = "参数缺失,请输入配置文件,例如: --conf E:\\Codes\\fork\\transaction_log_desensitization\\src\\main\\resources\\application.yml --regular E:\\Codes\\fork\\transaction_log_desensitization\\src\\main\\resources\\regular ";
String error = "参数缺失,请输入配置文件,例如: " +
"--conf E:\\Codes\\fork\\transaction_log_desensitization\\src\\main\\resources\\application.yml " +
"--regular E:\\Codes\\fork\\transaction_log_desensitization\\src\\main\\resources\\regular ";
log.error(error);
}
try {
......@@ -36,6 +39,7 @@ public class TransactionLogDesensitization {
} catch (ZorkException e) {
log.info(String.valueOf(e));
}
long stop = System.currentTimeMillis();
System.out.println("耗时统计:" + (stop - start) + "ms");
  • Replace this use of System.out or System.err by a logger. 📘

Please register or sign in to reply
}
}
......@@ -12,8 +12,9 @@ public final class ConfigConstants {
public static final String SOURCE = "source";
public static final String PARALLELISM = "parallelism";
public static final String MAX_FILE_NUM = "max_file_num";
public static final String SOURCE_PARALLELISM = "source.parallelism";
public static final String TRANSFORMER_PARALLELISM = "transformer.parallelism";
public static final String SINK_PARALLELISM = "sink.parallelism";
public static final String HDFS_URI = "hdfs_uri";
public static final String HDFS_USER = "hdfs_user";
......
# 任务配置
job_name: "国泰交易日志脱敏job"
# 并行度
parallelism: "1"
# 文件个数、此为最大文件个数合并为一个任务,防止任务由于打开文件个数导致任务挂
max_file_num: "300"
source.parallelism: "4"
transformer.parallelism: "4"
sink.parallelism: "4"
# 数据来源,支持hdfs和kafka,必传
source: "hdfs"
......@@ -12,9 +12,9 @@ source: "hdfs"
core: "c9"
# 查询日志起始
start_time: "2020-09-29 11:07:29"
start_time: "2020-11-07 16:36:20"
# 查询日志结束
end_time: "2020-09-29 11:07:30"
end_time: "2020-11-07 17:40:30"
# hadoop 相关配置
# hdfs 地址
......@@ -24,7 +24,7 @@ hdfs_user: "hdfs"
# hdfs日志源文件地址,若source为hdfs,则该地址必传
hdfs_src: "hdfs://cdh-2:8020/tmp/datawarehouse4/jzjy/kcbp_biz_log/"
# hdfs日志写入地址,非必传,默认写到hdfs-src目录下的output目录下
hdfs_dest: "hdfs://cdh-2:8020/tmp/datawarehouse/jzjy/kcbp_biz_log/output3/"
hdfs_dest: "hdfs://cdh-2:8020/tmp/datawarehouse/jzjy/kcbp_biz_log/output1/"
# 不做脱敏的字段白名单
fields_white_list: "messid,fundid,custid,orgid,brhid,secuid,bankcode,market,ordersno,ordergroup,count,poststr,stkcode,bsflag,orderamt,price,qty,bankcode,tacode,ofcode,transacc,taacc,indexTime,logchecktime,end_logtime,collecttime,deserializerTime,versioninfo,fmillsecond,smillsecond"
......
  • SonarQube analysis reported 105 issues

    • 🚫 22 critical
    • 58 major
    • 🔽 24 minor
    • 1 info

    Watch the comments in this conversation to review them.

    Top 30 extra issues

    Note: The following issues were found on lines that were not modified in the commit. Because these issues can't be reported as line comments, they are summarized here:

    1. 🚫 Add a default case to this switch. 📘
    2. 🚫 switch中每个case需要通过break/return等来终止 📘
    3. 🚫 switch块缺少default语句 📘
    4. 🚫 Define a constant instead of duplicating this literal " {\n" 11 times. 📘
    5. 🚫 [Define a constant instead of duplicating this literal " "type": \n" 11 times. 📘
    6. 🚫 Define a constant instead of duplicating this literal " "string",\n" 6 times. 📘
    7. 🚫 Define a constant instead of duplicating this literal " "null"\n" 6 times. 📘
    8. 🚫 [Define a constant instead of duplicating this literal " ]\n" 11 times.](https://git.zorkdata.com/liaomingtao/transaction_log_desensitization/blob/1cb26b747dbf966f4d0a2dce559bbb93bde9f340/src/main/java/com/zorkdata/desensitization/avro/AvroSchemaDef.java#L23) 📘
    9. 🚫 Define a constant instead of duplicating this literal " },\n" 9 times. 📘
    10. 🚫 Define a constant instead of duplicating this literal " "null",\n" 5 times. 📘
    11. 🚫 Define a constant instead of duplicating this literal " {\n" 5 times. 📘
    12. 🚫 Define a constant instead of duplicating this literal " "type": "map",\n" 5 times. 📘
    13. 🚫 Define a constant instead of duplicating this literal " "values": "string"\n" 3 times. 📘
    14. 🚫 Define a constant instead of duplicating this literal " }\n" 5 times. 📘
    15. 🚫 Define a constant instead of duplicating this literal "序列化失败" 13 times. 📘
    16. 🚫 Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed. 📘
    17. 🚫 Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed. 📘
    18. 🚫 Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation. 📘
    19. 🚫 Change this "try" to a try-with-resources. (sonar.java.source not set. Assuming 7 or greater.) 📘
    20. 🚫 Refactor this code to not throw exceptions in finally blocks. 📘
    21. 🚫 Refactor this code to not throw exceptions in finally blocks. 📘
    22. 🚫 Change this "try" to a try-with-resources. (sonar.java.source not set. Assuming 7 or greater.) 📘
    23. This block of commented-out lines of code should be removed. 📘
    24. 及时清理不再使用的代码段或配置信息。 📘
    25. Replace this use of System.out or System.err by a logger. 📘
    26. Replace this use of System.out or System.err by a logger. 📘
    27. String contains no format specifiers. 📘
    28. Replace this use of System.out or System.err by a logger. 📘
    29. Rename "jsonObject" which hides the field declared at line 39. 📘
    30. Remove this expression which always evaluates to "true" 📘
    • ... 73 more
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment