Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
transactionLogMask
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
王海鹰
transactionLogMask
Commits
1e097d43
Commit
1e097d43
authored
Oct 20, 2020
by
王海鹰
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
commit
parent
a8f1c55e
Changes
11
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
452 additions
and
242 deletions
+452
-242
pom.xml
pom.xml
+9
-5
src/main/java/com/zorkdata/datamask/TransactionLogMask.java
src/main/java/com/zorkdata/datamask/TransactionLogMask.java
+101
-87
src/main/java/com/zorkdata/datamask/constants/Constants.java
src/main/java/com/zorkdata/datamask/constants/Constants.java
+17
-0
src/main/java/com/zorkdata/datamask/domain/LogData.java
src/main/java/com/zorkdata/datamask/domain/LogData.java
+16
-87
src/main/java/com/zorkdata/datamask/function/Avro2StrFlatMapFunction.java
...m/zorkdata/datamask/function/Avro2StrFlatMapFunction.java
+0
-43
src/main/java/com/zorkdata/datamask/util/ConfigUtils.java
src/main/java/com/zorkdata/datamask/util/ConfigUtils.java
+39
-0
src/main/java/com/zorkdata/datamask/util/LoadConf.java
src/main/java/com/zorkdata/datamask/util/LoadConf.java
+132
-0
src/main/java/com/zorkdata/datamask/util/MaskRegexConfig.java
...main/java/com/zorkdata/datamask/util/MaskRegexConfig.java
+30
-0
src/main/java/com/zorkdata/datamask/util/MaskUtil.java
src/main/java/com/zorkdata/datamask/util/MaskUtil.java
+36
-20
src/main/java/com/zorkdata/datamask/util/ZorkParameterUtil.java
...in/java/com/zorkdata/datamask/util/ZorkParameterUtil.java
+49
-0
src/main/resources/application.yml
src/main/resources/application.yml
+23
-0
No files found.
pom.xml
View file @
1e097d43
...
...
@@ -26,7 +26,6 @@ under the License.
<packaging>
jar
</packaging>
<name>
Guotai Transaction Log Mask Job
</name>
<properties>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<flink.version>
1.8.1
</flink.version>
...
...
@@ -60,20 +59,20 @@ under the License.
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-java
</artifactId>
<version>
${flink.version}
</version>
<scope>
provided
</scope
>
<!-- <scope>provided</scope>--
>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 -->
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-shaded-hadoop-2
</artifactId>
<version>
2.6.5-10.0
</version>
<scope>
provided
</scope
>
<!-- <scope>provided</scope>--
>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-streaming-java_${scala.binary.version}
</artifactId>
<version>
${flink.version}
</version>
<scope>
provided
</scope
>
<!-- <scope>provided</scope>--
>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
...
...
@@ -103,7 +102,6 @@ under the License.
<version>${flink.version}</version>
</dependency>
-->
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>
org.apache.avro
</groupId>
...
...
@@ -159,6 +157,12 @@ under the License.
<version>
1.18.12
</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>
org.yaml
</groupId>
<artifactId>
snakeyaml
</artifactId>
<version>
1.16
</version>
</dependency>
</dependencies>
<build>
...
...
src/main/java/com/zorkdata/datamask/TransactionLogMask.java
View file @
1e097d43
...
...
@@ -2,24 +2,21 @@ package com.zorkdata.datamask;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.TypeReference
;
import
com.zorkdata.datamask.constants.Constants
;
import
com.zorkdata.datamask.domain.LogData
;
import
com.zorkdata.datamask.domain.TransactionLog
;
import
com.zorkdata.datamask.util.MaskUtil
;
import
org.apache.avro.mapreduce.AvroJob
;
import
com.zorkdata.datamask.util.avro.AvroSerializerFactory
;
import
org.apache.avro.io.DatumReader
;
import
org.apache.avro.io.Decoder
;
import
org.apache.avro.io.DecoderFactory
;
import
com.zorkdata.datamask.util.ZorkParameterUtil
;
import
org.apache.avro.mapred.AvroInputFormat
;
import
org.apache.avro.mapred.AvroKey
;
import
org.apache.avro.mapred.AvroOutputFormat
;
import
org.apache.avro.mapred.AvroWrapper
;
import
org.apache.avro.specific.SpecificDatumReader
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.api.java.ExecutionEnvironment
;
import
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat
;
import
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat
;
import
org.apache.flink.api.java.operators.DataSink
;
import
org.apache.flink.api.java.operators.DataSource
;
import
org.apache.flink.api.java.operators.FlatMapOperator
;
import
org.apache.flink.api.java.tuple.Tuple2
;
...
...
@@ -38,18 +35,16 @@ import org.apache.hadoop.fs.RemoteIterator;
import
org.apache.hadoop.io.NullWritable
;
import
org.apache.hadoop.mapred.FileOutputFormat
;
import
org.apache.hadoop.mapred.JobConf
;
import
org.
apache.hadoop.mapreduce.Job
;
import
sun.rmi.runtime.Log
;
import
org.
slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.IOException
;
import
java.net.URI
;
import
java.net.URISyntaxException
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
import
java.time.ZoneId
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Properties
;
import
java.util.*
;
/**
* Description : 国泰交易日志脱敏job
...
...
@@ -58,105 +53,110 @@ import java.util.Properties;
* Date : Create in 2020/9/18 17:35
*/
public
class
TransactionLogMask
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
ParameterTool
params
=
ParameterTool
.
fromArgs
(
args
);
String
source
=
params
.
get
(
"source"
,
"hdfs"
);
private
static
Logger
LOG
=
LoggerFactory
.
getLogger
(
TransactionLogMask
.
class
);
private
static
String
source
=
"hdfs"
;
private
static
String
hdfsSrc
;
private
static
String
hdfsDest
;
private
static
String
core
;
private
static
String
date
;
private
static
Long
startTime
;
private
static
Long
endTime
;
public
static
void
main
(
String
[]
args
)
throws
Exception
{
if
(
"hdfs"
.
equals
(
source
))
{
maskHdfsLog
(
param
s
);
maskHdfsLog
(
arg
s
);
}
else
if
(
"kafka"
.
equals
(
source
))
{
maskKafka
Log
(
param
s
);
maskKafka
Msg
(
arg
s
);
}
}
/**
* 初始化配置文件
*
* @param conf
*/
private
static
void
initConf
(
Map
conf
)
{
source
=
String
.
valueOf
(
conf
.
get
(
Constants
.
SOURCE
)).
trim
();
hdfsSrc
=
String
.
valueOf
(
conf
.
get
(
Constants
.
HDFS_SRC
)).
trim
();
hdfsDest
=
String
.
valueOf
(
conf
.
get
(
Constants
.
HDFS_DEST
)).
trim
();
core
=
String
.
valueOf
(
conf
.
get
(
Constants
.
CORE
)).
trim
();
date
=
String
.
valueOf
(
conf
.
get
(
Constants
.
DATE
)).
trim
();
startTime
=
Long
.
parseLong
(
String
.
valueOf
(
conf
.
get
(
Constants
.
START_TIME
)).
trim
());
endTime
=
Long
.
parseLong
(
String
.
valueOf
(
conf
.
get
(
Constants
.
END_TIME
)).
trim
());
}
/**
* hdfs日志文件脱敏
*
* @param
param
s 请求参数
* @param
arg
s 请求参数
* @return void
*/
public
static
void
maskHdfsLog
(
ParameterTool
param
s
)
throws
Exception
{
public
static
void
maskHdfsLog
(
String
[]
arg
s
)
throws
Exception
{
ExecutionEnvironment
env
=
ExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setParallelism
(
1
);
JobConf
jobConf
=
new
JobConf
();
Job
job
=
Job
.
getInstance
();
jobConf
.
set
(
"avro.output.schema"
,
TransactionLog
.
SCHEMA
$
.
toString
(
true
));
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd"
);
Map
<
String
,
String
>
conf
=
ZorkParameterUtil
.
readParameter
(
args
);
LOG
.
info
(
"配置文件: "
+
conf
);
initConf
(
conf
);
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd"
);
String
hdfsSrc
=
params
.
get
(
"hdfs-src"
);
String
hdfsDest
=
params
.
get
(
"hdfs-dest"
);
String
core
=
params
.
get
(
"core"
,
"c1"
);
String
date
=
params
.
get
(
"date"
,
sdf
.
format
(
new
Date
()));
Long
startTime
=
Long
.
parseLong
(
params
.
get
(
"startTime"
));
Long
endTime
=
Long
.
parseLong
(
params
.
get
(
"endTime"
));
ParameterTool
parameterTool
=
ParameterTool
.
fromMap
(
conf
);
env
.
getConfig
().
setGlobalJobParameters
(
parameterTool
);
List
<
String
>
logFiles
=
filterHdfsLogFiles
(
hdfsSrc
,
date
,
startTime
,
endTime
);
System
.
out
.
println
(
"-----------logFiles-------------:"
+
logFiles
);
// List<String> logFiles = new ArrayList<String>() {
// {
// add(hdfsSrc);
// }
// };
for
(
String
logFile
:
logFiles
)
{
System
.
out
.
println
(
"-----------logFile-------------:"
+
logFile
);
/**
* 读取hdfs日志文件,avro反序列化处理
*/
HadoopInputFormat
<
Object
,
Object
>
hadoopInputFormat
=
new
HadoopInputFormat
<
Object
,
Object
>
(
new
AvroInputFormat
(),
Object
.
class
,
Object
.
class
,
jobConf
);
AvroInputFormat
.
addInputPath
(
hadoopInputFormat
.
getJobConf
(),
new
Path
(
logFile
));
DataSource
<
Tuple2
<
Object
,
Object
>>
i
nput
=
env
.
createInput
(
hadoopInputFormat
);
//
i
nput.print();
DataSource
<
Tuple2
<
Object
,
Object
>>
hdfsLogI
nput
=
env
.
createInput
(
hadoopInputFormat
);
//
hdfsLogI
nput.print();
/**
* 脱敏算子
*/
FlatMapOperator
<
Tuple2
<
Object
,
Object
>,
Object
>
maskFlatMapOperator
=
i
nput
.
flatMap
(
new
FlatMapFunction
<
Tuple2
<
Object
,
Object
>,
Object
>()
{
FlatMapOperator
<
Tuple2
<
Object
,
Object
>,
Object
>
maskFlatMapOperator
=
hdfsLogI
nput
.
flatMap
(
new
FlatMapFunction
<
Tuple2
<
Object
,
Object
>,
Object
>()
{
@Override
public
void
flatMap
(
Tuple2
<
Object
,
Object
>
value
,
Collector
<
Object
>
collector
)
throws
Exception
{
// System.out.println("--------------------value:" + value);
// System.out.println("--------------------getField0" + value.getField(0));
LogData
logData
=
JSON
.
parseObject
(
value
.
getField
(
0
).
toString
(),
new
TypeReference
<
LogData
>(){});
logData
.
setNormalFields
(
MaskUtil
.
mask
(
logData
.
getNormalFields
()));
collector
.
collect
(
logData
);
LogData
logData
=
JSON
.
parseObject
(
value
.
getField
(
0
).
toString
(),
new
TypeReference
<
LogData
>()
{
});
//根据日志事件的核心信息做过滤
if
(
null
!=
core
&&
logData
.
getDimensions
().
get
(
"hostname"
).
indexOf
(
"c9"
)
==
-
1
)
{
return
;
}
//根据日志事件的timestamp做过滤
Long
timestamp
=
utc2timestamp
(
logData
.
getTimestamp
());
if
(
null
!=
timestamp
&&
timestamp
>
startTime
&&
timestamp
<
endTime
||
Boolean
.
TRUE
)
{
Map
maskResult
=
MaskUtil
.
mask
(
logData
.
getNormalFields
());
logData
.
setNormalFields
(
maskResult
);
collector
.
collect
(
logData
);
}
}
});
// tuple2ObjectFlatMapOperator.print();
// maskFlatMapOperator.print();
// 获取目标hdfs的输出目录
String
logFileName
=
logFile
.
split
(
"/"
)[
logFile
.
split
(
"/"
).
length
-
1
];
String
filePath
=
hdfsDest
+
logFileName
.
replace
(
".avro"
,
""
);
HadoopOutputFormat
hadoopOutputFormat
=
new
HadoopOutputFormat
<>(
new
AvroOutputFormat
(),
jobConf
);
FileOutputFormat
.
setOutputPath
(
jobConf
,
new
Path
(
filePath
));
/**
* avro序列化算子
*/
FlatMapOperator
<
Object
,
Object
>
avroFlatMapOperator
=
maskFlatMapOperator
.
flatMap
(
new
FlatMapFunction
<
Object
,
Object
>()
{
maskFlatMapOperator
.
map
(
new
MapFunction
<
Object
,
Tuple2
<
AvroWrapper
<
LogData
>,
NullWritable
>
>()
{
@Override
public
void
flatMap
(
Object
value
,
Collector
<
Object
>
collector
)
throws
Exception
{
LogData
logData
=
(
LogData
)
value
;
// byte[] bytes = AvroSerializerFactory.getLogAvroSerializer().serializingLog(logData.getLogTypeName(), logData.getTimestamp(),
// logData.getSource(), logData.getOffset(), logData.getDimensions(), logData.getMeasures(), logData.getNormalFields());
// out.collect(bytes);
collector
.
collect
(
JSON
.
toJSONString
(
logData
));
public
Tuple2
<
AvroWrapper
<
LogData
>,
NullWritable
>
map
(
Object
value
)
throws
Exception
{
AvroKey
<
LogData
>
key
=
new
AvroKey
<
LogData
>((
LogData
)
value
);
Tuple2
<
AvroWrapper
<
LogData
>,
NullWritable
>
tupple
=
new
Tuple2
<
AvroWrapper
<
LogData
>,
NullWritable
>(
key
,
NullWritable
.
get
());
return
tupple
;
}
});
avroFlatMapOperator
.
print
();
String
logFileName
=
logFile
.
split
(
"/"
)[
logFile
.
split
(
"/"
).
length
-
1
];
String
filePath
=
hdfsDest
+
logFileName
.
replace
(
".avro"
,
"out.avro"
);
System
.
out
.
println
(
"---------------writepath-----------------:"
+
filePath
);
HadoopOutputFormat
hadoopOutputFormat
=
new
HadoopOutputFormat
<>(
new
AvroOutputFormat
(),
jobConf
);
AvroOutputFormat
<
LogData
>
logDataAvroOutputFormat
=
new
AvroOutputFormat
<>();
HadoopOutputFormat
hadoopOutputFormat2
=
new
HadoopOutputFormat
<>(
logDataAvroOutputFormat
,
jobConf
);
// AvroOutputFormat.
jobConf
.
set
(
"avro.output.schema"
,
TransactionLog
.
SCHEMA
$
.
toString
());
AvroJob
.
setInputKeySchema
(
job
,
TransactionLog
.
getClassSchema
());
FileOutputFormat
.
setOutputPath
(
jobConf
,
new
Path
(
filePath
));
avroFlatMapOperator
.
output
(
hadoopOutputFormat
);
}).
output
(
hadoopOutputFormat
);
// avroFlatMapOperator.writeAsText(filePath, org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).name("hadoop-sink");
try
{
env
.
execute
(
"国泰交易日志脱敏job"
);
}
catch
(
Exception
e
)
{
...
...
@@ -168,14 +168,16 @@ public class TransactionLogMask {
/**
* kafka消息数据脱敏
*
* @param
param
s 请求参数
* @param
arg
s 请求参数
* @return void
*/
public
static
void
maskKafka
Log
(
ParameterTool
param
s
)
{
public
static
void
maskKafka
Msg
(
String
[]
arg
s
)
{
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setParallelism
(
1
);
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd"
);
ParameterTool
params
=
ParameterTool
.
fromArgs
(
args
);
String
servers
=
params
.
get
(
"servers"
);
String
zookeeper
=
params
.
get
(
"zookeeper"
);
String
topic
=
params
.
get
(
"topic"
);
...
...
@@ -234,15 +236,8 @@ public class TransactionLogMask {
path
=
hdfs
+
date
;
}
Configuration
conf
=
new
Configuration
();
// conf.set("dfs.replication", "3");
// conf.set("fs.defaultFS", "hdfs://cdh-2:8020");
// conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
// conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
System
.
out
.
println
(
"---------------conf-----------------:"
+
conf
.
toString
());
List
<
String
>
logFiles
=
new
ArrayList
<>();
try
{
// FileSystem fileSystem = FileSystem.get(conf);
FileSystem
fileSystem
=
null
;
try
{
fileSystem
=
FileSystem
.
get
(
new
URI
(
"hdfs://cdh-2:8020/"
),
conf
,
"hdfs"
);
...
...
@@ -253,9 +248,9 @@ public class TransactionLogMask {
while
(
locatedFileStatusRemoteIterator
.
hasNext
())
{
LocatedFileStatus
next
=
locatedFileStatusRemoteIterator
.
next
();
long
modificationTime
=
next
.
getModificationTime
();
if
(
modificationTime
>
startTime
&&
modificationTime
<
endTime
){
// 根据文件的修改时间做过滤,获取用户指定时间段内的文件
if
(
modificationTime
>
startTime
&&
modificationTime
<
endTime
)
{
Path
hdfsFilePath
=
next
.
getPath
();
System
.
out
.
println
(
"---------------hdfsFilePath-----------------:"
+
hdfsFilePath
.
toString
());
logFiles
.
add
(
hdfsFilePath
.
toString
());
}
}
...
...
@@ -266,4 +261,23 @@ public class TransactionLogMask {
}
return
logFiles
;
}
}
\ No newline at end of file
/**
* UTC时间转
*
* @param utcTime UTC时间
* @return unix时间戳
*/
public
static
Long
utc2timestamp
(
String
utcTime
)
{
SimpleDateFormat
utcFormater
=
new
SimpleDateFormat
(
"yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
);
utcFormater
.
setTimeZone
(
TimeZone
.
getTimeZone
(
"asia/shanghai"
));
//时区定义并进行时间获取
Date
gpsUTCDate
=
null
;
try
{
gpsUTCDate
=
utcFormater
.
parse
(
utcTime
);
}
catch
(
ParseException
e
)
{
System
.
out
.
println
(
"时间戳格式转换异常:"
+
utcTime
+
e
.
getMessage
());
return
null
;
}
return
gpsUTCDate
.
getTime
();
}
}
src/main/java/com/zorkdata/datamask/constants/Constants.java
0 → 100644
View file @
1e097d43
package
com.zorkdata.datamask.constants
;
/**
* Description :
*
* @author : wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date : Create in 2020/10/20 15:32
*/
public
interface
Constants
{
String
SOURCE
=
"source"
;
String
HDFS_SRC
=
"hdfs_src"
;
String
HDFS_DEST
=
"hdfs_dest"
;
String
CORE
=
"core"
;
String
DATE
=
"date"
;
String
START_TIME
=
"startTime"
;
String
END_TIME
=
"endTime"
;
}
src/main/java/com/zorkdata/datamask/domain/LogData.java
View file @
1e097d43
package
com.zorkdata.datamask.domain
;
import
lombok.Data
;
import
org.apache.avro.mapred.AvroWrapper
;
import
org.apache.hadoop.io.WritableComparable
;
import
java.io.DataInput
;
...
...
@@ -29,110 +30,38 @@ public class LogData implements Serializable, WritableComparable {
*/
private
String
timestamp
;
/**
*
source
*
事件来源
*/
private
String
source
;
@Override
public
int
compareTo
(
Object
o
)
{
return
0
;
}
@Override
public
void
write
(
DataOutput
dataOutput
)
throws
IOException
{
}
@Override
public
void
readFields
(
DataInput
dataInput
)
throws
IOException
{
}
/**
*
offset
偏移量
* 偏移量
*/
private
String
offset
;
/**
*
dimensions
维度
* 维度
*/
private
Map
<
String
,
String
>
dimensions
;
/**
*
measures
*
指标
*/
private
Map
<
String
,
Double
>
measures
;
/**
*
normalFields
*
普通列
*/
private
Map
<
String
,
String
>
normalFields
;
@Override
public
int
compareTo
(
Object
o
)
{
return
0
;
}
// public LogData() {
// }
@Override
public
void
write
(
DataOutput
dataOutput
)
throws
IOException
{
}
// public String getLogTypeName() {
// return logTypeName;
// }
//
// public void setLogTypeName(String logTypeName) {
// this.logTypeName = logTypeName;
// }
//
// public String getTimestamp() {
// return timestamp;
// }
//
// public void setTimestamp(String timestamp) {
// this.timestamp = timestamp;
// }
//
// public String getSource() {
// return source;
// }
//
// public void setSource(String source) {
// this.source = source;
// }
//
// public String getOffset() {
// return offset;
// }
//
// public void setOffset(String offset) {
// this.offset = offset;
// }
//
// public Map<String, String> getDimensions() {
// return dimensions;
// }
//
// public void setDimensions(Map<String, String> dimensions) {
// this.dimensions = new HashMap<>(50);
// for (Map.Entry entry : dimensions.entrySet()) {
// this.dimensions.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
// }
// }
//
// public Map<String, Double> getMeasures() {
// return measures;
// }
//
// public void setMeasures(Map<String, Double> measures) {
// this.measures = new HashMap<>(50);
// for (Map.Entry entry : measures.entrySet()) {
// this.measures.put(String.valueOf(entry.getKey()), Double.valueOf(String.valueOf(entry.getValue())));
// }
// }
//
// public Map<String, String> getNormalFields() {
// return normalFields;
// }
//
// public void setNormalFields(Map<String, String> normalFields) {
// this.normalFields = new HashMap<>(50);
// for (Map.Entry entry : normalFields.entrySet()) {
// this.normalFields.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
// }
// }
@Override
public
void
readFields
(
DataInput
dataInput
)
throws
IOException
{
}
// @Override
// public String toString() {
...
...
src/main/java/com/zorkdata/datamask/function/Avro2StrFlatMapFunction.java
deleted
100644 → 0
View file @
a8f1c55e
package
com.zorkdata.datamask.function
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.TypeReference
;
import
com.zorkdata.datamask.domain.LogData
;
import
com.zorkdata.datamask.util.avro.AvroDeserializer
;
import
com.zorkdata.datamask.util.avro.AvroDeserializerFactory
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.avro.generic.GenericRecord
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.RichFlatMapFunction
;
import
org.apache.flink.util.Collector
;
/**
* @author xiese
* @Description Avro2StrFlatMapFunction
* @Email xiesen310@163.com
* @Date 2020/9/26 23:14
*/
@Slf4j
public
class
Avro2StrFlatMapFunction
implements
FlatMapFunction
<
String
,
LogData
>
{
@Override
public
void
flatMap
(
String
value
,
Collector
<
LogData
>
out
)
throws
Exception
{
try
{
if
(
null
!=
value
)
{
AvroDeserializer
logsDeserializer
=
AvroDeserializerFactory
.
getLogsDeserializer
();
GenericRecord
record
=
logsDeserializer
.
deserializing
(
value
.
getBytes
());
// System.out.println("----------record---------"+record);
if
(
null
!=
record
)
{
LogData
logData
=
JSON
.
parseObject
(
record
.
toString
(),
new
TypeReference
<
LogData
>()
{
});
// System.out.println("----------logData---------"+logData);
// out.collect(JSON.toJSONString(logData));
out
.
collect
(
logData
);
}
}
}
catch
(
Exception
e
)
{
log
.
error
(
"avro 反序列化失败,错误信息: {}"
,
e
.
getMessage
(),
e
);
}
}
}
src/main/java/com/zorkdata/datamask/util/ConfigUtils.java
0 → 100644
View file @
1e097d43
package
com.zorkdata.datamask.util
;
/**
* @author 谢森
* @Description 配置文件工具类
* @Email xiesen@zork.com.cn
*/
public
class
ConfigUtils
{
public
static
String
getString
(
String
value
,
String
defaultValue
)
{
String
result
=
value
==
null
||
value
.
equals
(
""
)
||
value
.
equals
(
"null"
)
?
defaultValue
:
value
;
return
result
;
}
public
static
Integer
getInteger
(
Integer
value
,
Integer
defaultValue
)
{
Integer
result
=
value
<
0
?
defaultValue
:
value
;
return
result
;
}
public
static
Double
getDouble
(
Double
value
,
Double
defaultValue
)
{
Double
result
=
value
==
null
?
defaultValue
:
value
;
return
result
;
}
public
static
Float
getFloat
(
Float
value
,
Float
defaultValue
)
{
Float
result
=
value
==
null
?
defaultValue
:
value
;
return
result
;
}
public
static
Long
getLong
(
Long
value
,
Long
defaultValue
)
{
Long
result
=
value
==
null
?
defaultValue
:
value
;
return
result
;
}
public
static
Boolean
getBoolean
(
Boolean
value
,
Boolean
defaultValue
)
{
Boolean
result
=
value
==
null
?
defaultValue
:
value
;
return
result
;
}
}
src/main/java/com/zorkdata/datamask/util/LoadConf.java
0 → 100644
View file @
1e097d43
package
com.zorkdata.datamask.util
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.yaml.snakeyaml.Yaml
;
import
org.yaml.snakeyaml.constructor.SafeConstructor
;
import
java.io.*
;
import
java.net.URL
;
import
java.util.*
;
/**
* ClassName: LoadConf
* Email: zhuzhigang@zork.com.cn
* Date: 2019\6\27 0027
*
* @author: zhuzhigang
**/
public
class
LoadConf
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
com
.
zorkdata
.
datamask
.
util
.
LoadConf
.
class
);
public
LoadConf
()
{
}
public
static
List
<
URL
>
findResources
(
String
name
)
{
try
{
Enumeration
<
URL
>
resources
=
Thread
.
currentThread
().
getContextClassLoader
().
getResources
(
name
);
ArrayList
ret
=
new
ArrayList
();
while
(
resources
.
hasMoreElements
())
{
ret
.
add
(
resources
.
nextElement
());
}
return
ret
;
}
catch
(
IOException
var3
)
{
throw
new
RuntimeException
(
var3
);
}
}
public
static
Map
findAndReadYaml
(
String
name
,
boolean
mustExist
,
boolean
canMultiple
)
{
InputStream
in
=
null
;
boolean
confFileEmpty
=
false
;
try
{
in
=
getConfigFileInputStream
(
name
,
canMultiple
);
if
(
null
!=
in
)
{
Yaml
yaml
=
new
Yaml
(
new
SafeConstructor
());
Map
ret
=
(
Map
)
yaml
.
load
(
new
InputStreamReader
(
in
));
if
(
null
!=
ret
)
{
HashMap
var7
=
new
HashMap
(
ret
);
return
var7
;
}
confFileEmpty
=
true
;
}
if
(
mustExist
)
{
if
(
confFileEmpty
)
{
throw
new
RuntimeException
(
"Config file "
+
name
+
" doesn't have any valid storm configs"
);
}
else
{
throw
new
RuntimeException
(
"Could not find config file on classpath "
+
name
);
}
}
else
{
HashMap
var19
=
new
HashMap
();
return
var19
;
}
}
catch
(
IOException
var17
)
{
throw
new
RuntimeException
(
var17
);
}
finally
{
if
(
null
!=
in
)
{
try
{
in
.
close
();
}
catch
(
IOException
var16
)
{
throw
new
RuntimeException
(
var16
);
}
}
}
}
public
static
InputStream
getConfigFileInputStream
(
String
configFilePath
,
boolean
canMultiple
)
throws
IOException
{
if
(
null
==
configFilePath
)
{
throw
new
IOException
(
"Could not find config file, name not specified"
);
}
else
{
HashSet
<
URL
>
resources
=
new
HashSet
(
findResources
(
configFilePath
));
if
(
resources
.
isEmpty
())
{
File
configFile
=
new
File
(
configFilePath
);
return
configFile
.
exists
()
?
new
FileInputStream
(
configFile
)
:
null
;
}
else
if
(
resources
.
size
()
>
1
&&
!
canMultiple
)
{
throw
new
IOException
(
"Found multiple "
+
configFilePath
+
" resources. You're probably bundling the Storm jars with your topology jar. "
+
resources
);
}
else
{
LOG
.
info
(
"Using "
+
configFilePath
+
" from resources"
);
URL
resource
=
(
URL
)
resources
.
iterator
().
next
();
return
resource
.
openStream
();
}
}
}
public
static
InputStream
getConfigFileInputStream
(
String
configFilePath
)
throws
IOException
{
return
getConfigFileInputStream
(
configFilePath
,
true
);
}
public
static
Map
LoadYaml
(
String
confPath
)
{
return
findAndReadYaml
(
confPath
,
true
,
true
);
}
public
static
Map
LoadProperty
(
String
prop
)
{
InputStream
in
=
null
;
Properties
properties
=
new
Properties
();
try
{
in
=
getConfigFileInputStream
(
prop
);
properties
.
load
(
in
);
}
catch
(
FileNotFoundException
var12
)
{
throw
new
RuntimeException
(
"No such file "
+
prop
);
}
catch
(
Exception
var13
)
{
throw
new
RuntimeException
(
"Failed to read config file"
);
}
finally
{
if
(
null
!=
in
)
{
try
{
in
.
close
();
}
catch
(
IOException
var11
)
{
throw
new
RuntimeException
(
var11
);
}
}
}
Map
ret
=
new
HashMap
();
ret
.
putAll
(
properties
);
return
ret
;
}
}
src/main/java/com/zorkdata/datamask/util/MaskRegexConfig.java
0 → 100644
View file @
1e097d43
package
com.zorkdata.datamask.util
;
/**
* Description :
*
* @author : wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date : Create in 2020/10/19 16:43
*/
public
class
MaskRegexConfig
{
private
String
fieldsWhiteList
;
private
String
nameRegExp
;
private
String
mobileRegExp
;
private
String
phoneRegExp
;
private
String
emailRegExp
;
private
String
idRegExp15
;
private
String
idRegExp18
;
private
String
addressRegExp
;
private
String
ipRegExp
;
private
String
macRegExp
;
}
src/main/java/com/zorkdata/datamask/util/MaskUtil.java
View file @
1e097d43
...
...
@@ -16,63 +16,75 @@ import java.util.regex.Pattern;
*/
public
class
MaskUtil
{
private
MaskRegexConfig
maskRegexConfig
;
/**
* 姓名正则
*/
static
Pattern
namePattern
=
Pattern
.
compile
(
"([\\u4e00-\u9fa5]{1,20}|[a-zA-Z\\.\\s]{1,20})"
);
// Pattern namePattern = Pattern.compile(maskRegexConfig.getNameRegExp());
/**
* 手机号正则
*/
static
Pattern
mobilePattern
=
Pattern
.
compile
(
"((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))\\d{8}"
);
// Pattern mobilePattern = Pattern.compile(maskRegexConfig.getMobileRegExp());
/**
* 电话号码正则
*/
static
Pattern
phonePattern
=
Pattern
.
compile
(
"(\\d{3,4}-)?\\d{6,8}"
);
// Pattern phonePattern = Pattern.compile(maskRegexConfig.getPhoneRegExp());
/**
* 邮箱正则
*/
static
Pattern
emailPattern
=
Pattern
.
compile
(
"\\w+([-+.]\\w+)*@\\w+([-.]\\w+)*\\.\\w+([-.]\\w+)*"
);
// Pattern emailPattern = Pattern.compile(maskRegexConfig.getEmailRegExp());
/**
* 身份证号码(15位)正则
*/
// static Pattern idPattern15 = Pattern.compile("\\d{17}[0-9Xx]|\\d{15}");
static
Pattern
idPattern15
=
Pattern
.
compile
(
"[1-9]\\d{7}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}"
);
// Pattern idPattern15 = Pattern.compile(maskRegexConfig.getIdRegExp15());
/**
* 身份证号码(18位)正则
*/
static
Pattern
idPattern18
=
Pattern
.
compile
(
"[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}([0-9Xx])"
);
// Pattern idPattern18 = Pattern.compile(maskRegexConfig.getIdRegExp18());
/**
* 家庭住址正则
*/
static
Pattern
addressPattern
=
Pattern
.
compile
(
"([\\u4E00-\\u9FA5A-Za-z0-9_]+(省|市|区|县|道|路|街|号)){2,}"
);
static
Pattern
addressPattern
=
Pattern
.
compile
(
"([\\u4E00-\\u9FA5A-Za-z0-9_]+(省|市|区|县|道|路|街|号|弄|条|室)){2,}"
);
// Pattern addressPattern = Pattern.compile(maskRegexConfig.getAddressRegExp());
/**
* ip地址正则
*/
// static Pattern ipPattern = Pattern.compile("^((\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])\\.){3}(\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])$");
static
Pattern
ipPattern
=
Pattern
.
compile
(
"((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)"
);
// Pattern ipPattern = Pattern.compile(maskRegexConfig.getIpRegExp());
/**
* mac地址正则
*/
static
Pattern
macPattern
=
Pattern
.
compile
(
"([A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}"
);
// Pattern macPattern = Pattern.compile(maskRegexConfig.getMacRegExp());
static
List
<
Pattern
>
patterns
=
new
ArrayList
<
Pattern
>(){{
static
List
<
Pattern
>
patterns
=
new
ArrayList
<
Pattern
>()
{{
add
(
macPattern
);
add
(
emailPattern
);
add
(
ipPattern
);
add
(
namePattern
);
add
(
idPattern18
);
add
(
idPattern15
);
add
(
mobilePattern
);
add
(
phonePattern
);
add
(
emailPattern
);
add
(
addressPattern
);
add
(
ipPattern
);
add
(
macPattern
);
}};
public
static
Map
mask
(
Map
map
){
public
static
Map
mask
(
Map
map
)
{
map
.
forEach
((
k
,
v
)
->
{
String
value
=
v
.
toString
();
for
(
Pattern
pattern:
patterns
)
{
for
(
Pattern
pattern
:
patterns
)
{
Matcher
matcher
=
pattern
.
matcher
(
value
);
if
(
matcher
.
find
()){
if
(
matcher
.
find
())
{
String
replaceStr
=
""
;
for
(
int
i
=
0
;
i
<
matcher
.
group
().
length
();
i
++)
{
for
(
int
i
=
0
;
i
<
matcher
.
group
().
length
();
i
++)
{
replaceStr
=
replaceStr
.
concat
(
"*"
);
}
// System.out.println(replaceStr);
...
...
@@ -85,17 +97,21 @@ public class MaskUtil {
}
public
static
void
main
(
String
[]
args
)
{
MaskUtil
maskUtil
=
new
MaskUtil
();
Map
map
=
new
HashMap
();
// map.put("姓名", "王海鹰");
// map.put("身份证号", "372925199008075158");
// map.put("手机号", "15000101879");
// map.put("电话", "021-61341606");
// map.put("邮箱", "wanghaiying@zork.com.cn");
// map.put("住址", "上海市浦东新区碧波路690号");
// map.put("ip地址", "192.168.70.2");
// map.put("mac地址", "3c-78-43-25-80-bd");
map
.
put
(
"message"
,
"王海鹰-372925199008075158-15000101879"
);
System
.
out
.
println
(
mask
(
map
));
map
.
put
(
"姓名"
,
"王海鹰"
);
map
.
put
(
"身份证号"
,
"372925199008075158"
);
map
.
put
(
"手机号"
,
"15000101879"
);
map
.
put
(
"电话"
,
"021-61341606"
);
map
.
put
(
"邮箱"
,
"wanghaiying@zork.com"
);
map
.
put
(
"住址"
,
"上海市浦东新区碧波路690号1弄"
);
map
.
put
(
"住址2"
,
"上海市浦东新区张江微电子港304-2室"
);
map
.
put
(
"ip地址"
,
"192.168.70.2"
);
map
.
put
(
"mac地址"
,
"3c-78-43-25-80-bd"
);
map
.
put
(
"message"
,
"王海鹰,372925199008075158#15000101879"
);
map
.
put
(
"messid"
,
"0000011404342B32233DDCDA"
);
System
.
out
.
println
(
maskUtil
.
mask
(
map
));
// String mobile = "15000101879";
//
...
...
src/main/java/com/zorkdata/datamask/util/ZorkParameterUtil.java
0 → 100644
View file @
1e097d43
package
com.zorkdata.datamask.util
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.Map
;
/**
* @author 谢森
* @Description 参数读取工具类
* @Email xiesen@zork.com.cn
*/
public
class
ZorkParameterUtil
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
com
.
zorkdata
.
datamask
.
util
.
ZorkParameterUtil
.
class
);
/**
* 读取参数
*
* @param args 参数名称
* 这里默认使用 configPath 参数来标识配置文件的路径
* @return
*/
public
static
Map
<
String
,
String
>
readParameter
(
String
[]
args
)
{
Map
<
String
,
String
>
conf
=
null
;
String
configPath
;
try
{
ParameterTool
parameterTool
=
ParameterTool
.
fromArgs
(
args
);
configPath
=
parameterTool
.
get
(
"configPath"
);
}
catch
(
Exception
e
)
{
// configPath = "/etc/flinkConfig.yaml";
configPath
=
"D:\\zork\\transactionLogMask\\src\\main\\resources\\application.yml"
;
}
logger
.
info
(
"read config path is "
+
configPath
);
if
(!
configPath
.
endsWith
(
"yaml"
))
{
System
.
err
.
println
(
"Please input correct configuration file and flink run mode!"
);
System
.
exit
(-
1
);
}
else
{
conf
=
LoadConf
.
LoadYaml
(
configPath
);
if
(
conf
==
null
)
{
logger
.
error
(
"配置文件"
+
args
[
0
]
+
"不存在,系统退出"
);
System
.
exit
(-
1
);
}
}
return
conf
;
}
}
src/main/resources/application.yml
0 → 100644
View file @
1e097d43
# 不做脱敏的字段白名单
fieldsWhiteList=fundid,custid,orgid,brhid,secuid,bankcode,market,ordersno,ordergroup,count,poststr,stkcode,bsflag,\
orderamt,price,qty,bankcode,tacode,ofcode,transacc,taacc
# 脱敏用的正则表达式
# 姓名正则
nameRegExp = "[\一-龥]{1,20}|[a-zA-Z\\.\\s]{1,20}"
# 手机号正则
mobileRegExp = "(13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))\\d{8}"
# 电话号码正则
phoneRegExp = "(\\d{3,4}-)?\\d{6,8}"
# 邮箱正则
emailRegExp = "\\w+([-+.]\\w+)*@\\w+([-.]\\w+)*\\.\\w+([-.]\\w+)*"
# 身份证号码(15位)正则
idRegExp15 = "[1-9]\\d{7}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}"
#身份证号码(18位)正则
idRegExp18 = "[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}([0-9Xx])"
# 家庭住址正则
addressRegExp = "([\一-\龥A-Za-z0-9_]+(省|市|区|县|道|路|街|号|弄|条|室)){2,}"
# ip地址正则
ipRegExp = "((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)"
# mac地址正则
macRegExp = "([A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}"
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment