Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
transactionLogMask
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
王海鹰
transactionLogMask
Commits
4e0789d8
Commit
4e0789d8
authored
Oct 21, 2020
by
谢森
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
<dev> 修改项目结构,适配自动化打包
parent
7582bbaa
Changes
20
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
20 changed files
with
928 additions
and
752 deletions
+928
-752
.gitlab-ci.yml
.gitlab-ci.yml
+61
-0
assemblies/assembly.xml
assemblies/assembly.xml
+34
-0
pom.xml
pom.xml
+347
-263
src/main/java/com/zorkdata/datamask/TransactionLogMask.java
src/main/java/com/zorkdata/datamask/TransactionLogMask.java
+16
-265
src/main/java/com/zorkdata/datamask/constant/ParamConstants.java
...n/java/com/zorkdata/datamask/constant/ParamConstants.java
+11
-2
src/main/java/com/zorkdata/datamask/constant/StrConstants.java
...ain/java/com/zorkdata/datamask/constant/StrConstants.java
+15
-0
src/main/java/com/zorkdata/datamask/domain/HadoopParam.java
src/main/java/com/zorkdata/datamask/domain/HadoopParam.java
+32
-0
src/main/java/com/zorkdata/datamask/domain/KafkaParam.java
src/main/java/com/zorkdata/datamask/domain/KafkaParam.java
+33
-0
src/main/java/com/zorkdata/datamask/hadoop/HadoopMask.java
src/main/java/com/zorkdata/datamask/hadoop/HadoopMask.java
+179
-0
src/main/java/com/zorkdata/datamask/kafka/KafkaMask.java
src/main/java/com/zorkdata/datamask/kafka/KafkaMask.java
+75
-0
src/main/java/com/zorkdata/datamask/util/AvroTest.java
src/main/java/com/zorkdata/datamask/util/AvroTest.java
+0
-179
src/main/java/com/zorkdata/datamask/util/ConfigUtils.java
src/main/java/com/zorkdata/datamask/util/ConfigUtils.java
+4
-2
src/main/java/com/zorkdata/datamask/util/DateUtils.java
src/main/java/com/zorkdata/datamask/util/DateUtils.java
+39
-0
src/main/java/com/zorkdata/datamask/util/LoadConf.java
src/main/java/com/zorkdata/datamask/util/LoadConf.java
+7
-4
src/main/java/com/zorkdata/datamask/util/MaskUtil.java
src/main/java/com/zorkdata/datamask/util/MaskUtil.java
+10
-6
src/main/java/com/zorkdata/datamask/util/ParamUtils.java
src/main/java/com/zorkdata/datamask/util/ParamUtils.java
+44
-0
src/main/java/com/zorkdata/datamask/util/ZorkParameterUtil.java
...in/java/com/zorkdata/datamask/util/ZorkParameterUtil.java
+7
-7
src/main/java/com/zorkdata/datamask/util/avro/AvroSerializer.java
.../java/com/zorkdata/datamask/util/avro/AvroSerializer.java
+1
-1
src/main/resources/application.properties
src/main/resources/application.properties
+0
-23
src/main/resources/bin/start.sh
src/main/resources/bin/start.sh
+13
-0
No files found.
.gitlab-ci.yml
0 → 100644
View file @
4e0789d8
stages
:
-
sonar_push
-
sonar_scan
-
maven_clean
-
build_release
before_script
:
-
'
echo
"Build
fold
:
$PWD"'
-
'
echo
"PROJECT
NAME
:
$CI_PROJECT_NAME"'
-
'
echo
"PROJECT
ID
:
$CI_PROJECT_ID"'
-
'
echo
"PROJECT
URL
:
$CI_PROJECT_URL"'
-
'
echo
"ENVIRONMENT
URL
:
$CI_ENVIRONMENT_URL"'
-
'
export
COMMIT_ID_SHORT=${CI_COMMIT_SHA:0:8}'
-
'
export
PATH=$PATH:/usr/bin'
sonar_push
:
stage
:
sonar_push
only
:
-
master
when
:
manual
image
:
hub.zorkdata.com/devops/sonar_runner:00be5a71
allow_failure
:
true
script
:
-
FOLD_LIST=`bash /cicd-script/sonarqube/get_fold_list.sh "src"`
-
'
bash
/cicd-script/sonar_push.sh
"-Dsonar.sources=`echo
$FOLD_LIST`"'
sonar_scan
:
stage
:
sonar_scan
only
:
-
merge_requests
-
branches
-
master
when
:
always
image
:
hub.zorkdata.com/devops/sonar_runner:00be5a71
allow_failure
:
false
script
:
-
FOLD_LIST=`bash /cicd-script/sonarqube/get_fold_list.sh "src"`
-
'
bash
/cicd-script/sonar_review.sh
"-Dsonar.sources=`echo
$FOLD_LIST`"'
maven_clean
:
stage
:
maven_clean
when
:
manual
image
:
hub.zorkdata.com/devops/maven3.6-jdk1.8:9b09951e
allow_failure
:
true
script
:
-
'
bash
/cicd-script/maven/maven_clean.sh'
build_release
:
stage
:
build_release
only
:
-
tags
when
:
always
image
:
hub.zorkdata.com/devops/maven3.6-jdk1.8:9b09951e
allow_failure
:
false
script
:
-
'
bash
/cicd-script/maven/maven_clean.sh'
-
'
bash
/cicd-script/maven/maven_deploy.sh'
-
'
bash
/cicd-script/maven/tag_release.sh
transactionlogmask'
assemblies/assembly.xml
0 → 100644
View file @
4e0789d8
<!--
~ Copyright (c) 2020. log-merge
打包描述文件,以下内容不要修改
-->
<assembly>
<id>
bin
</id>
<formats>
<format>
tar.gz
</format>
</formats>
<fileSets>
<!-- ============ 以下内容不要修改 ============ -->
<fileSet>
<directory>
target/transaction-log-deploy/bin
</directory>
<includes>
<include>
*.sh
</include>
</includes>
<outputDirectory>
bin
</outputDirectory>
<fileMode>
0755
</fileMode>
</fileSet>
<fileSet>
<directory>
target/transaction-log-deploy/conf
</directory>
<outputDirectory>
conf
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/transaction-log-deploy/lib
</directory>
<outputDirectory>
lib
</outputDirectory>
</fileSet>
</fileSets>
</assembly>
\ No newline at end of file
pom.xml
View file @
4e0789d8
This diff is collapsed.
Click to expand it.
src/main/java/com/zorkdata/datamask/TransactionLogMask.java
View file @
4e0789d8
This diff is collapsed.
Click to expand it.
src/main/java/com/zorkdata/datamask/constant
s/
Constants.java
→
src/main/java/com/zorkdata/datamask/constant
/Param
Constants.java
View file @
4e0789d8
package
com.zorkdata.datamask.constants
;
package
com.zorkdata.datamask.constant
;
import
java.util.Date
;
/**
* Description :
...
...
@@ -6,7 +8,8 @@ package com.zorkdata.datamask.constants;
* @author : wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date : Create in 2020/10/20 15:32
*/
public
interface
Constants
{
public
interface
ParamConstants
{
String
SOURCE
=
"source"
;
String
HDFS_SRC
=
"hdfs_src"
;
String
HDFS_DEST
=
"hdfs_dest"
;
...
...
@@ -15,6 +18,12 @@ public interface Constants {
String
START_TIME
=
"start_time"
;
String
END_TIME
=
"end_time"
;
String
SERVERS
=
"servers"
;
String
ZOOKEEPER
=
"zookeeper"
;
String
TOPIC
=
"topic"
;
String
HDFS
=
"hdfs"
;
String
KAFKA
=
"kafka"
;
String
NAME_REG_EXP
=
"name_reg_exp"
;
String
MOBILE_REG_EXP
=
"mobile_reg_exp"
;
String
PHONE_REG_EXP
=
"phone_reg_exp"
;
...
...
src/main/java/com/zorkdata/datamask/constant/StrConstants.java
0 → 100644
View file @
4e0789d8
package
com.zorkdata.datamask.constant
;
/**
* @author 谢森
* @Description 常量定义
* @Email xiesen310@163.com
* @Date 2020/10/21 15:50
*/
public
interface
StrConstants
{
String
FILE_SEPARATOR
=
"/"
;
String
AVRO_SUFFIX
=
".avro"
;
String
EMPTY_STR
=
""
;
}
src/main/java/com/zorkdata/datamask/domain/HadoopParam.java
0 → 100644
View file @
4e0789d8
package
com.zorkdata.datamask.domain
;
import
lombok.Data
;
/**
* @author 谢森
* @Description 参数实体类
* @Email xiesen310@163.com
* @Date 2020/10/21 14:33
*/
@Data
public
class
HadoopParam
{
private
String
source
;
private
String
hdfsSrc
;
private
String
hdfsDest
;
private
String
core
;
private
String
date
;
private
Long
startTime
;
private
Long
endTime
;
public
HadoopParam
(
String
source
,
String
hdfsSrc
,
String
hdfsDest
,
String
core
,
String
date
,
Long
startTime
,
Long
endTime
)
{
this
.
source
=
source
;
this
.
hdfsSrc
=
hdfsSrc
;
this
.
hdfsDest
=
hdfsDest
;
this
.
core
=
core
;
this
.
date
=
date
;
this
.
startTime
=
startTime
;
this
.
endTime
=
endTime
;
}
}
src/main/java/com/zorkdata/datamask/domain/KafkaParam.java
0 → 100644
View file @
4e0789d8
package
com.zorkdata.datamask.domain
;
import
lombok.Data
;
/**
* @author 谢森
* @Description kafka 参数实体类
* @Email xiesen310@163.com
* @Date 2020/10/21 15:07
*/
@Data
public
class
KafkaParam
{
private
String
servers
;
private
String
zookeeper
;
private
String
topic
;
private
String
hdfsDest
;
private
String
core
;
private
String
date
;
private
Long
startTime
;
private
Long
endTime
;
public
KafkaParam
(
String
servers
,
String
zookeeper
,
String
topic
,
String
hdfsDest
,
String
core
,
String
date
,
Long
startTime
,
Long
endTime
)
{
this
.
servers
=
servers
;
this
.
zookeeper
=
zookeeper
;
this
.
topic
=
topic
;
this
.
hdfsDest
=
hdfsDest
;
this
.
core
=
core
;
this
.
date
=
date
;
this
.
startTime
=
startTime
;
this
.
endTime
=
endTime
;
}
}
src/main/java/com/zorkdata/datamask/hadoop/HadoopMask.java
0 → 100644
View file @
4e0789d8
package
com.zorkdata.datamask.hadoop
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.TypeReference
;
import
com.zorkdata.datamask.constant.StrConstants
;
import
com.zorkdata.datamask.domain.LogData
;
import
com.zorkdata.datamask.domain.HadoopParam
;
import
com.zorkdata.datamask.domain.TransactionLog
;
import
com.zorkdata.datamask.util.DateUtils
;
import
com.zorkdata.datamask.util.MaskUtil
;
import
com.zorkdata.datamask.util.ParamUtils
;
import
org.apache.avro.mapred.AvroInputFormat
;
import
org.apache.avro.mapred.AvroKey
;
import
org.apache.avro.mapred.AvroOutputFormat
;
import
org.apache.avro.mapred.AvroWrapper
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.api.java.ExecutionEnvironment
;
import
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat
;
import
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat
;
import
org.apache.flink.api.java.operators.DataSource
;
import
org.apache.flink.api.java.operators.FlatMapOperator
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
org.apache.flink.util.Collector
;
import
org.apache.hadoop.conf.Configuration
;
import
org.apache.hadoop.fs.FileSystem
;
import
org.apache.hadoop.fs.LocatedFileStatus
;
import
org.apache.hadoop.fs.Path
;
import
org.apache.hadoop.fs.RemoteIterator
;
import
org.apache.hadoop.io.NullWritable
;
import
org.apache.hadoop.mapred.FileOutputFormat
;
import
org.apache.hadoop.mapred.JobConf
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.IOException
;
import
java.net.URI
;
import
java.net.URISyntaxException
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
/**
* @author 谢森
* @Description hadoop 文件数据脱敏
* @Email xiesen310@163.com
* @Date 2020/10/21 14:29
*/
public
class
HadoopMask
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
HadoopMask
.
class
);
/**
* hdfs日志文件脱敏
*
* @param conf 请求参数
* @return void
*/
public
static
void
maskHdfsLog
(
Map
<
String
,
String
>
conf
)
throws
Exception
{
ExecutionEnvironment
env
=
ExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setParallelism
(
1
);
JobConf
jobConf
=
new
JobConf
();
jobConf
.
set
(
"avro.output.schema"
,
TransactionLog
.
SCHEMA
$
.
toString
(
true
));
HadoopParam
hadoopParam
=
ParamUtils
.
initHadoopConf
(
conf
);
ParameterTool
parameterTool
=
ParameterTool
.
fromMap
(
conf
);
env
.
getConfig
().
setGlobalJobParameters
(
parameterTool
);
List
<
String
>
logFiles
=
filterHdfsLogFiles
(
hadoopParam
.
getHdfsSrc
(),
hadoopParam
.
getDate
(),
hadoopParam
.
getStartTime
(),
hadoopParam
.
getEndTime
());
for
(
String
logFile
:
logFiles
)
{
/**
* 读取hdfs日志文件,avro反序列化处理
*/
HadoopInputFormat
<
Object
,
Object
>
hadoopInputFormat
=
new
HadoopInputFormat
<
Object
,
Object
>
(
new
AvroInputFormat
(),
Object
.
class
,
Object
.
class
,
jobConf
);
AvroInputFormat
.
addInputPath
(
hadoopInputFormat
.
getJobConf
(),
new
Path
(
logFile
));
DataSource
<
Tuple2
<
Object
,
Object
>>
hdfsLogInput
=
env
.
createInput
(
hadoopInputFormat
);
/**
* 脱敏算子
*/
FlatMapOperator
<
Tuple2
<
Object
,
Object
>,
Object
>
maskFlatMapOperator
=
hdfsLogInput
.
flatMap
(
new
FlatMapFunction
<
Tuple2
<
Object
,
Object
>,
Object
>()
{
@Override
public
void
flatMap
(
Tuple2
<
Object
,
Object
>
value
,
Collector
<
Object
>
collector
)
throws
Exception
{
LogData
logData
=
JSON
.
parseObject
(
value
.
getField
(
0
).
toString
(),
new
TypeReference
<
LogData
>()
{
});
//根据日志事件的核心信息做过滤
if
(
null
!=
hadoopParam
.
getCore
()
&&
logData
.
getDimensions
().
get
(
"hostname"
).
indexOf
(
"c9"
)
>
-
1
)
{
//根据日志事件的timestamp做过滤
Long
timestamp
=
DateUtils
.
utc2timestamp
(
logData
.
getTimestamp
());
boolean
flag
=
null
!=
timestamp
&&
timestamp
>
hadoopParam
.
getStartTime
()
&&
timestamp
<
hadoopParam
.
getEndTime
()
||
Boolean
.
TRUE
;
if
(
flag
)
{
Map
maskResult
=
MaskUtil
.
mask
(
logData
.
getNormalFields
());
logData
.
setNormalFields
(
maskResult
);
collector
.
collect
(
logData
);
}
}
}
});
// 获取目标hdfs的输出目录
String
logFileName
=
logFile
.
split
(
StrConstants
.
FILE_SEPARATOR
)[
logFile
.
split
(
StrConstants
.
FILE_SEPARATOR
).
length
-
1
];
String
filePath
=
hadoopParam
.
getHdfsSrc
()
+
logFileName
.
replace
(
StrConstants
.
AVRO_SUFFIX
,
StrConstants
.
EMPTY_STR
);
HadoopOutputFormat
hadoopOutputFormat
=
new
HadoopOutputFormat
<>(
new
AvroOutputFormat
(),
jobConf
);
FileOutputFormat
.
setOutputPath
(
jobConf
,
new
Path
(
filePath
));
/**
* avro序列化算子
*/
maskFlatMapOperator
.
map
(
new
MapFunction
<
Object
,
Tuple2
<
AvroWrapper
<
LogData
>,
NullWritable
>>()
{
@Override
public
Tuple2
<
AvroWrapper
<
LogData
>,
NullWritable
>
map
(
Object
value
)
throws
Exception
{
AvroKey
<
LogData
>
key
=
new
AvroKey
<
LogData
>((
LogData
)
value
);
Tuple2
<
AvroWrapper
<
LogData
>,
NullWritable
>
tupple
=
new
Tuple2
<
AvroWrapper
<
LogData
>,
NullWritable
>(
key
,
NullWritable
.
get
());
return
tupple
;
}
}).
output
(
hadoopOutputFormat
);
try
{
env
.
execute
(
"国泰交易日志脱敏job"
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
}
/**
* 过滤hdfs日志文件
*
* @param hdfs hdfs地址
* @param date 日期
* @param startTime 起始时间
* @param endTime 结束时间
* @return hdfs文件列表
*/
private
static
List
<
String
>
filterHdfsLogFiles
(
String
hdfs
,
String
date
,
Long
startTime
,
Long
endTime
)
{
if
(!
hdfs
.
endsWith
(
StrConstants
.
FILE_SEPARATOR
))
{
hdfs
+=
StrConstants
.
FILE_SEPARATOR
;
}
String
path
=
hdfs
;
if
(
null
!=
date
)
{
path
=
hdfs
+
date
;
}
Configuration
conf
=
new
Configuration
();
List
<
String
>
logFiles
=
new
ArrayList
<>();
try
{
FileSystem
fileSystem
=
null
;
try
{
fileSystem
=
FileSystem
.
get
(
new
URI
(
"hdfs://cdh-2:8020/"
),
conf
,
"hdfs"
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
RemoteIterator
<
LocatedFileStatus
>
locatedFileStatusRemoteIterator
=
fileSystem
.
listFiles
(
new
Path
(
path
),
false
);
while
(
locatedFileStatusRemoteIterator
.
hasNext
())
{
LocatedFileStatus
next
=
locatedFileStatusRemoteIterator
.
next
();
long
modificationTime
=
next
.
getModificationTime
();
// 根据文件的修改时间做过滤,获取用户指定时间段内的文件
if
(
modificationTime
>
startTime
&&
modificationTime
<
endTime
)
{
Path
hdfsFilePath
=
next
.
getPath
();
logFiles
.
add
(
hdfsFilePath
.
toString
());
}
}
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
catch
(
URISyntaxException
e
)
{
e
.
printStackTrace
();
}
return
logFiles
;
}
}
src/main/java/com/zorkdata/datamask/kafka/KafkaMask.java
0 → 100644
View file @
4e0789d8
package
com.zorkdata.datamask.kafka
;
import
com.zorkdata.datamask.domain.HadoopParam
;
import
com.zorkdata.datamask.domain.KafkaParam
;
import
com.zorkdata.datamask.util.ParamUtils
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
;
import
org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
;
import
java.text.SimpleDateFormat
;
import
java.time.ZoneId
;
import
java.util.Date
;
import
java.util.Map
;
import
java.util.Properties
;
/**
* @author 谢森
* @Description kafka 数据脱敏
* @Email xiesen310@163.com
* @Date 2020/10/21 14:51
*/
public
class
KafkaMask
{
/**
* kafka消息数据脱敏
*
* @param conf 请求参数
* @return void
*/
public
static
void
maskKafkaMsg
(
Map
<
String
,
String
>
conf
)
{
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setParallelism
(
1
);
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd"
);
KafkaParam
kafkaParam
=
ParamUtils
.
initKafkaConf
(
conf
);
ParameterTool
parameterTool
=
ParameterTool
.
fromMap
(
conf
);
env
.
getConfig
().
setGlobalJobParameters
(
parameterTool
);
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
kafkaParam
.
getServers
());
props
.
put
(
"zookeeper.connect"
,
kafkaParam
.
getZookeeper
());
props
.
put
(
"group.id"
,
"group1"
);
props
.
put
(
"enable.auto.commit"
,
false
);
props
.
put
(
"key.deserializer"
,
"org.apache.kafka.common.serialization.StringDeserializer"
);
props
.
put
(
"value.deserializer"
,
"org.apache.kafka.common.serialization.StringDeserializer"
);
props
.
put
(
"auto.offset.reset"
,
"earliest"
);
props
.
put
(
"max.poll.records"
,
1000
);
SingleOutputStreamOperator
<
String
>
dataStreamSource
=
env
.
addSource
(
new
FlinkKafkaConsumer
<>(
kafkaParam
.
getTopic
(),
new
SimpleStringSchema
(),
props
)).
setParallelism
(
1
);
// TODO 根据date、startTime、endTime过滤
BucketingSink
<
String
>
hdfsSink
=
new
BucketingSink
<>(
kafkaParam
.
getHdfsDest
());
//创建一个按照时间创建目录的bucketer,默认是yyyy-MM-dd--HH,时区默认是美国时间。这里我都改了,一天创建一次目录,上海时间
hdfsSink
.
setBucketer
(
new
DateTimeBucketer
<
String
>(
"yyyy-MM-dd"
,
ZoneId
.
of
(
"Asia/Shanghai"
)));
//设置每个文件的最大大小 ,默认是384M(1024 * 1024 * 384)
hdfsSink
.
setBatchSize
(
1024
*
1024
*
384
);
//设置多少时间,就换一个文件写
hdfsSink
.
setBatchRolloverInterval
(
1000
*
60
*
60
);
hdfsSink
.
setPendingSuffix
(
"ccc"
);
hdfsSink
.
setInactiveBucketThreshold
(
60
*
1000L
);
hdfsSink
.
setInactiveBucketCheckInterval
(
60
*
1000L
);
hdfsSink
.
setAsyncTimeout
(
60
*
1000
);
dataStreamSource
.
addSink
(
hdfsSink
);
try
{
env
.
execute
(
"国泰交易日志脱敏job"
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
}
src/main/java/com/zorkdata/datamask/util/AvroTest.java
deleted
100644 → 0
View file @
7582bbaa
package
com.zorkdata.datamask.util
;
import
com.alibaba.fastjson.JSON
;
import
com.zorkdata.datamask.domain.LogData
;
import
com.zorkdata.datamask.domain.TransactionLog
;
import
com.zorkdata.datamask.util.avro.AvroDeserializer
;
import
com.zorkdata.datamask.util.avro.AvroDeserializerFactory
;
import
com.zorkdata.datamask.util.avro.AvroSerializerFactory
;
import
org.apache.avro.Schema
;
import
org.apache.avro.file.DataFileReader
;
import
org.apache.avro.file.DataFileWriter
;
import
org.apache.avro.generic.GenericData
;
import
org.apache.avro.generic.GenericDatumReader
;
import
org.apache.avro.generic.GenericRecord
;
import
org.apache.avro.io.DatumReader
;
import
org.apache.avro.io.DatumWriter
;
import
org.apache.avro.specific.SpecificDatumReader
;
import
org.apache.avro.specific.SpecificDatumWriter
;
import
java.io.*
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.TimeZone
;
/**
* Description :
* https://www.cnblogs.com/fillPv/p/5009737.html
* @author : wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date : Create in 2020/9/23 4:43
*/
// java -jar avro-tools-1.10.0.jar compile schema log.avro .
//
public
class
AvroTest
{
public
static
void
main
(
String
[]
args
)
throws
IOException
{
// Avro序列化,写avro文件
// TransactionLog transactionLog = new TransactionLog();
// LogData transactionLog = new LogData();
// transactionLog.setLogTypeName("kcbp_biz_log");
// transactionLog.setTimestamp("2020-09-18T13:59:53.000+08:00");
// transactionLog.setSource("d:\\\\kcbp\\\\log\\\\run\\\\20200918\\\\runlog23.log");
// transactionLog.setOffset("165683111");
//
// HashMap dimensions = new HashMap() {{
// put("appsystem", "jzjy");
// put("appprogramname", "jzc9-kcbp1_9600");
// put("hostname", "jzc9-kcbp1");
// put("func", "");
// put("nodeid", "");
// put("operway", "W");
// }};
//// transactionLog.setDimensions(dimensions);
//
// HashMap measures = new HashMap<String, Double>() {{
// put("latence", 0.0);
// put("latency", 1.0);
// put("spendtime", 0.5);
// }};
//// transactionLog.setMeasures(measures);
//
// HashMap normalFields = new HashMap() {{
// put("indexTime", "2020-09-18T13:59:54.524+08:00");
// put("bsflag", "");
// put("productcode", "");
// put("developercode", "");
// put("fmillsecond", "");
// put("inputtype", "");
// put("logchecktime", "");
// put("message", "身份证号码:372925199008075158,地址:上海浦东新区张江高科碧波路690号,手机号:15000101879,邮箱:wanghaiying@zork.com.cn");
// put("end_logtime", "");
// put("smillsecond", "585606599");
// put("featurecode", "");
// put("orgid", "");
// put("authcode", "");
// put("collecttime", "2020-09-18T13:59:53.529+08:00");
// put("fundid", "");
// put("deserializerTime", "2020-09-18T13:59:53.671+08:00");
// put("messid", "0000011404342B32233DDCDA");
// put("custid", "");
// put("netputr", "");
// put("versioninfo", "");
// put("beg_logtime", "20200918-135953");
// put("authinfo", "");
// }};
// transactionLog.setNormalFields(normalFields);
// String path = "d:\\transactionlog-20200925.avro"; // avro文件存放目录
// DatumWriter<TransactionLog> logDatumWriter = new SpecificDatumWriter<>(TransactionLog.class);
// DataFileWriter<TransactionLog> dataFileWriter = new DataFileWriter<>(logDatumWriter);
// dataFileWriter.create(transactionLog.getSchema(), new File(path));
// // 把生成的对象写入到avro文件
// dataFileWriter.append(transactionLog);
// dataFileWriter.append(transactionLog);
// dataFileWriter.append(transactionLog);
// dataFileWriter.close();
/**
* 序列化
*/
// byte[] kcbp_biz_logs = AvroSerializerFactory.getLogAvroSerializer().serializingLog("kcbp_biz_log", "2020-09-18T13:59:53.000+08:00",
// "d:\\\\kcbp\\\\log\\\\run\\\\20200918\\\\runlog23.log", "165683111", dimensions, measures, normalFields);
// FileOutputStream fos = null;
// try {
// fos = new FileOutputStream("d:\\transactionlog-20201009.avro");
// } catch (FileNotFoundException e) {
// e.printStackTrace();
// }
// try {
// fos.write(kcbp_biz_logs,0,kcbp_biz_logs.length);
// fos.flush();
// fos.close();
// } catch (IOException e) {
// e.printStackTrace();
// }
/**
* 反序列化
*/
// File file = new File("d:\\part-0-0.avro");
// File file = new File("d:\\hdfs-transactionlog-20200929.avro");
// File file = new File("d:\\transactionlog-20201009.avro");
// byte[] byteBuffer = new byte[(int) file.length()];
//
// FileInputStream fileInputStream = null;
// try {
//// fileInputStream = new FileInputStream("d:\\part-0-0.avro");
// fileInputStream = new FileInputStream("d:\\transactionlog-20201009.avro");
// } catch (FileNotFoundException e) {
// e.printStackTrace();
// }
// try {
// fileInputStream.read(byteBuffer);
// } catch (IOException e) {
// e.printStackTrace();
// }
// GenericRecord genericRecord = AvroDeserializerFactory.getLogsDeserializer().deserializing(byteBuffer);
// System.out.println(genericRecord);
// 读取avro文件,反序列化
// DatumReader<LogData> reader = new SpecificDatumReader<LogData>(LogData.class);
//// DataFileReader<LogData> dataFileReader = new DataFileReader<LogData>(new File("d:\\part-0-0.avro"), reader);
// DataFileReader<LogData> dataFileReader = new DataFileReader<LogData>(new File("d:\\transactionlog-20201009.avro"), reader);
// LogData transactionLogRead = null;
// while (dataFileReader.hasNext()) {
// transactionLogRead = dataFileReader.next();
// System.out.println(transactionLogRead);
// }
Schema
schema
=
new
Schema
.
Parser
().
parse
(
new
File
(
"d:\\log.avro"
));
GenericRecord
emp
=
new
GenericData
.
Record
(
schema
);
File
file
=
new
File
(
"d:\\1 (1).avro"
);
DatumReader
<
GenericRecord
>
datumReader
=
new
GenericDatumReader
<
GenericRecord
>(
schema
);
DataFileReader
<
GenericRecord
>
dataFileReader
=
new
DataFileReader
<
GenericRecord
>(
file
,
datumReader
);
while
(
dataFileReader
.
hasNext
())
{
emp
=
dataFileReader
.
next
();
System
.
out
.
println
(
emp
);
}
// Long aLong = utc2Local("2020-09-29T09:36:20.626+08:00");
// System.out.println(aLong);
}
public
static
Long
utc2Local
(
String
utcTime
)
{
SimpleDateFormat
utcFormater
=
new
SimpleDateFormat
(
"yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
);
utcFormater
.
setTimeZone
(
TimeZone
.
getTimeZone
(
"UTC"
));
//时区定义并进行时间获取
Date
gpsUTCDate
=
null
;
try
{
gpsUTCDate
=
utcFormater
.
parse
(
utcTime
);
}
catch
(
ParseException
e
)
{
System
.
out
.
println
(
"时间戳格式转换异常:"
+
utcTime
+
e
.
getMessage
());
}
return
gpsUTCDate
.
getTime
();
}
}
src/main/java/com/zorkdata/datamask/util/ConfigUtils.java
View file @
4e0789d8
...
...
@@ -6,9 +6,11 @@ package com.zorkdata.datamask.util;
* @Email xiesen@zork.com.cn
*/
public
class
ConfigUtils
{
public
static
final
String
EMPTY_STR
=
""
;
public
static
final
String
NULL_STR
=
"null"
;
public
static
String
getString
(
String
value
,
String
defaultValue
)
{
String
result
=
value
==
null
||
value
.
equals
(
""
)
||
value
.
equals
(
"null"
)
?
defaultValue
:
value
;
String
result
=
value
==
null
||
EMPTY_STR
.
equals
(
value
)
||
NULL_STR
.
equals
(
value
)
?
defaultValue
:
value
;
return
result
;
}
...
...
src/main/java/com/zorkdata/datamask/util/DateUtils.java
0 → 100644
View file @
4e0789d8
package
com.zorkdata.datamask.util
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
import
java.util.TimeZone
;
/**
* @author 谢森
* @Description 时间处理工具类
* @Email xiesen310@163.com
* @Date 2020/10/21 14:39
*/
public
class
DateUtils
{
public
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
DateUtils
.
class
);
private
static
SimpleDateFormat
utcFormatter
=
new
SimpleDateFormat
(
"yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
);
/**
* UTC时间转
*
* @param utcTime UTC时间
* @return unix时间戳
*/
public
static
Long
utc2timestamp
(
String
utcTime
)
{
//时区定义并进行时间获取
utcFormatter
.
setTimeZone
(
TimeZone
.
getTimeZone
(
"asia/shanghai"
));
Date
gpsUtcDate
=
null
;
try
{
gpsUtcDate
=
utcFormatter
.
parse
(
utcTime
);
}
catch
(
ParseException
e
)
{
logger
.
error
(
"时间戳格式转换异常:{} 原因: {}"
,
utcTime
,
e
.
getMessage
());
return
null
;
}
return
gpsUtcDate
.
getTime
();
}
}
src/main/java/com/zorkdata/datamask/util/LoadConf.java
View file @
4e0789d8
...
...
@@ -19,6 +19,7 @@ import java.util.*;
public
class
LoadConf
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
com
.
zorkdata
.
datamask
.
util
.
LoadConf
.
class
);
public
static
final
int
DEFAULT_MAP_CAPACITY
=
16
;
public
LoadConf
()
{
}
...
...
@@ -62,7 +63,7 @@ public class LoadConf {
throw
new
RuntimeException
(
"Could not find config file on classpath "
+
name
);
}
}
else
{
HashMap
var19
=
new
HashMap
();
HashMap
var19
=
new
HashMap
(
DEFAULT_MAP_CAPACITY
);
return
var19
;
}
}
catch
(
IOException
var17
)
{
...
...
@@ -100,11 +101,11 @@ public class LoadConf {
return
getConfigFileInputStream
(
configFilePath
,
true
);
}
public
static
Map
L
oadYaml
(
String
confPath
)
{
public
static
Map
l
oadYaml
(
String
confPath
)
{
return
findAndReadYaml
(
confPath
,
true
,
true
);
}
public
static
Map
L
oadProperty
(
String
prop
)
{
public
static
Map
l
oadProperty
(
String
prop
)
{
InputStream
in
=
null
;
Properties
properties
=
new
Properties
();
...
...
@@ -125,8 +126,10 @@ public class LoadConf {
}
}
Map
ret
=
new
HashMap
();
Map
ret
=
new
HashMap
(
DEFAULT_MAP_CAPACITY
);
ret
.
putAll
(
properties
);
return
ret
;
}
}
src/main/java/com/zorkdata/datamask/util/MaskUtil.java
View file @
4e0789d8
...
...
@@ -15,7 +15,7 @@ import java.util.regex.Pattern;
* RegularExpression
*/
public
class
MaskUtil
{
public
static
final
int
DEFAULT_MAP_CAPACITY
=
16
;
private
MaskRegexConfig
maskRegexConfig
;
/**
* 姓名正则
...
...
@@ -25,7 +25,8 @@ public class MaskUtil {
/**
* 手机号正则
*/
static
Pattern
mobilePattern
=
Pattern
.
compile
(
"((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))\\d{8}"
);
static
Pattern
mobilePattern
=
Pattern
.
compile
(
"((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))"
+
"\\d{8}"
);
// Pattern mobilePattern = Pattern.compile(maskRegexConfig.getMobileRegExp());
/**
* 电话号码正则
...
...
@@ -45,7 +46,8 @@ public class MaskUtil {
/**
* 身份证号码(18位)正则
*/
static
Pattern
idPattern18
=
Pattern
.
compile
(
"[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}([0-9Xx])"
);
static
Pattern
idPattern18
=
Pattern
.
compile
(
"[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}"
+
"([0-9Xx])"
);
// Pattern idPattern18 = Pattern.compile(maskRegexConfig.getIdRegExp18());
/**
* 家庭住址正则
...
...
@@ -54,9 +56,11 @@ public class MaskUtil {
// Pattern addressPattern = Pattern.compile(maskRegexConfig.getAddressRegExp());
/**
* ip地址正则
* // static Pattern ipPattern = Pattern.compile("^((\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])\\.){3}
* // (\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])$");
*/
// static Pattern ipPattern = Pattern.compile("^((\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])\\.){3}(\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])$");
static
Pattern
ipPattern
=
Pattern
.
compile
(
"((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}
(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)"
);
static
Pattern
ipPattern
=
Pattern
.
compile
(
"((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}"
+
"
(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)"
);
// Pattern ipPattern = Pattern.compile(maskRegexConfig.getIpRegExp());
/**
...
...
@@ -99,7 +103,7 @@ public class MaskUtil {
public
static
void
main
(
String
[]
args
)
{
MaskUtil
maskUtil
=
new
MaskUtil
();
Map
map
=
new
HashMap
();
Map
map
=
new
HashMap
(
DEFAULT_MAP_CAPACITY
);
map
.
put
(
"姓名"
,
"王海鹰"
);
map
.
put
(
"身份证号"
,
"372925199008075158"
);
map
.
put
(
"手机号"
,
"15000101879"
);
...
...
src/main/java/com/zorkdata/datamask/util/ParamUtils.java
0 → 100644
View file @
4e0789d8
package
com.zorkdata.datamask.util
;
import
com.zorkdata.datamask.constant.ParamConstants
;
import
com.zorkdata.datamask.domain.HadoopParam
;
import
com.zorkdata.datamask.domain.KafkaParam
;
import
java.util.Map
;
/**
* @author 谢森
* @Description 参数工具类
* @Email xiesen310@163.com
* @Date 2020/10/21 14:42
*/
public
class
ParamUtils
{
/**
* 初始化配置文件
*
* @param conf
*/
public
static
HadoopParam
initHadoopConf
(
Map
conf
)
{
String
source
=
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
SOURCE
)).
trim
();
String
hdfsSrc
=
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
HDFS_SRC
)).
trim
();
String
hdfsDest
=
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
HDFS_DEST
)).
trim
();
String
core
=
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
CORE
)).
trim
();
String
date
=
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
DATE
)).
trim
();
Long
startTime
=
Long
.
parseLong
(
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
START_TIME
)).
trim
());
Long
endTime
=
Long
.
parseLong
(
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
END_TIME
)).
trim
());
return
new
HadoopParam
(
source
,
hdfsSrc
,
hdfsDest
,
core
,
date
,
startTime
,
endTime
);
}
public
static
KafkaParam
initKafkaConf
(
Map
conf
)
{
String
servers
=
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
SERVERS
)).
trim
();
String
zookeeper
=
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
ZOOKEEPER
)).
trim
();
String
topic
=
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
TOPIC
)).
trim
();
String
hdfsDest
=
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
HDFS_DEST
)).
trim
();
String
core
=
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
CORE
)).
trim
();
String
date
=
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
DATE
)).
trim
();
Long
startTime
=
Long
.
parseLong
(
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
START_TIME
)).
trim
());
Long
endTime
=
Long
.
parseLong
(
String
.
valueOf
(
conf
.
get
(
ParamConstants
.
END_TIME
)).
trim
());
return
new
KafkaParam
(
servers
,
zookeeper
,
topic
,
hdfsDest
,
core
,
date
,
startTime
,
endTime
);
}
}
src/main/java/com/zorkdata/datamask/util/ZorkParameterUtil.java
View file @
4e0789d8
...
...
@@ -13,6 +13,7 @@ import java.util.Map;
*/
public
class
ZorkParameterUtil
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
com
.
zorkdata
.
datamask
.
util
.
ZorkParameterUtil
.
class
);
public
static
final
String
YML_SUFFIX
=
"yml"
;
/**
* 读取参数
...
...
@@ -26,22 +27,21 @@ public class ZorkParameterUtil {
String
configPath
;
try
{
ParameterTool
parameterTool
=
ParameterTool
.
fromArgs
(
args
);
configPath
=
parameterTool
.
get
(
"conf
igPath
"
);
configPath
=
parameterTool
.
get
(
"conf"
);
}
catch
(
Exception
e
)
{
// configPath = "/etc/flinkConfig.yaml";
configPath
=
"D:\\zork\\transactionLogMask\\src\\main\\resources\\application.yml"
;
throw
new
RuntimeException
(
"读取配置文件失败,请检查配置路径."
);
}
logger
.
info
(
"read config path is "
+
configPath
);
if
(!
configPath
.
endsWith
(
"yml"
))
{
if
(!
configPath
.
endsWith
(
YML_SUFFIX
))
{
System
.
err
.
println
(
"Please input correct configuration file and flink run mode!"
);
System
.
exit
(-
1
);
throw
new
RuntimeException
(
"Please input correct configuration file and flink run mode!"
);
}
else
{
conf
=
LoadConf
.
L
oadYaml
(
configPath
);
conf
=
LoadConf
.
l
oadYaml
(
configPath
);
if
(
conf
==
null
)
{
logger
.
error
(
"配置文件"
+
args
[
0
]
+
"不存在,系统退出"
);
System
.
exit
(-
1
);
throw
new
RuntimeException
(
"配置文件"
+
args
[
0
]
+
"不存在,系统退出"
);
}
}
return
conf
;
...
...
src/main/java/com/zorkdata/datamask/util/avro/AvroSerializer.java
View file @
4e0789d8
...
...
@@ -100,7 +100,7 @@ public class AvroSerializer {
*/
public
synchronized
byte
[]
serializing
(
String
json
)
{
byte
[]
byteArray
=
null
;
JSONObject
jsonObject
=
(
JSONObject
)
JSONObject
.
parse
(
json
);
// new TypeReference<Object>() {}
JSONObject
jsonObject
=
(
JSONObject
)
JSONObject
.
parse
(
json
);
GenericRecord
genericRecord
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到genericRecord中
for
(
int
i
=
0
;
i
<
filedsArrayList
.
size
();
i
++)
{
...
...
src/main/resources/application.properties
deleted
100644 → 0
View file @
7582bbaa
# 不做脱敏的字段白名单
fieldsWhiteList
=
fundid,custid,orgid,brhid,secuid,bankcode,market,ordersno,ordergroup,count,poststr,stkcode,bsflag,
\
orderamt,price,qty,bankcode,tacode,ofcode,transacc,taacc
# 脱敏用的正则表达式
# 姓名正则
nameRegExp
=
"[
\一
-龥]{1,20}|[a-zA-Z
\\
.
\\
s]{1,20}"
# 手机号正则
mobileRegExp
=
"(13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))
\\
d{8}"
# 电话号码正则
phoneRegExp
=
"(
\\
d{3,4}-)?
\\
d{6,8}"
# 邮箱正则
emailRegExp
=
"
\\
w+([-+.]
\\
w+)*@
\\
w+([-.]
\\
w+)*
\\
.
\\
w+([-.]
\\
w+)*"
# 身份证号码(15位)正则
idRegExp15
=
"[1-9]
\\
d{7}((0
\\
d)|(1[0-2]))(([0|1|2]
\\
d)|3[0-1])
\\
d{3}"
#身份证号码(18位)正则
idRegExp18
=
"[1-9]
\\
d{5}[1-9]
\\
d{3}((0
\\
d)|(1[0-2]))(([0|1|2]
\\
d)|3[0-1])
\\
d{3}([0-9Xx])"
# 家庭住址正则
addressRegExp
=
"([
\一
-
\龥
A-Za-z0-9_]+(省|市|区|县|道|路|街|号|弄|条|室)){2,}"
# ip地址正则
ipRegExp
=
"((2[0-4]
\\
d|25[0-5]|[01]?
\\
d
\\
d?)
\\
.){3}(2[0-4]
\\
d|25[0-5]|[01]?
\\
d
\\
d?)"
# mac地址正则
macRegExp
=
"([A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}"
\ No newline at end of file
src/main/resources/bin/start.sh
0 → 100644
View file @
4e0789d8
#!/usr/bin/env bash
export
base_path
=
$(
cd
`
dirname
$0
`
;
pwd
)
deploy_path
=
${
base_path
%/*
}
if
[
!
-d
"
$deploy_path
/logs"
]
;
then
mkdir
-p
$deploy_path
/logs
fi
FLINK_TASK_CONF
=
gmas-config.yaml
parallelism
=
1
flink run
-p
$parallelism
$deploy_path
/lib/transactionLogMask-0.1.jar
$$
deploy_path/conf/
$FLINK_TASK_CONF
>
$$
deploy_path/logs/transactionLogMask-0.1.log &
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment