Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
transactionLogMask
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
谢森
transactionLogMask
Commits
01a8a8e5
Commit
01a8a8e5
authored
Oct 09, 2020
by
王海鹰
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
init
parents
Changes
15
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
2277 additions
and
0 deletions
+2277
-0
.idea/.gitignore
.idea/.gitignore
+8
-0
pom.xml
pom.xml
+297
-0
src/main/java/com/zorkdata/datamask/TransactionLogMask.java
src/main/java/com/zorkdata/datamask/TransactionLogMask.java
+257
-0
src/main/java/com/zorkdata/datamask/domain/LogData.java
src/main/java/com/zorkdata/datamask/domain/LogData.java
+125
-0
src/main/java/com/zorkdata/datamask/domain/TransactionLog.java
...ain/java/com/zorkdata/datamask/domain/TransactionLog.java
+701
-0
src/main/java/com/zorkdata/datamask/domain/log.avro
src/main/java/com/zorkdata/datamask/domain/log.avro
+65
-0
src/main/java/com/zorkdata/datamask/function/Avro2StrFlatMapFunction.java
...m/zorkdata/datamask/function/Avro2StrFlatMapFunction.java
+43
-0
src/main/java/com/zorkdata/datamask/util/AvroTest.java
src/main/java/com/zorkdata/datamask/util/AvroTest.java
+146
-0
src/main/java/com/zorkdata/datamask/util/MaskUtil.java
src/main/java/com/zorkdata/datamask/util/MaskUtil.java
+117
-0
src/main/java/com/zorkdata/datamask/util/avro/AvroDeserializer.java
...ava/com/zorkdata/datamask/util/avro/AvroDeserializer.java
+68
-0
src/main/java/com/zorkdata/datamask/util/avro/AvroDeserializerFactory.java
.../zorkdata/datamask/util/avro/AvroDeserializerFactory.java
+42
-0
src/main/java/com/zorkdata/datamask/util/avro/AvroSerializer.java
.../java/com/zorkdata/datamask/util/avro/AvroSerializer.java
+283
-0
src/main/java/com/zorkdata/datamask/util/avro/AvroSerializerFactory.java
...om/zorkdata/datamask/util/avro/AvroSerializerFactory.java
+27
-0
src/main/java/com/zorkdata/datamask/util/avro/LogAvroMacroDef.java
...java/com/zorkdata/datamask/util/avro/LogAvroMacroDef.java
+75
-0
src/main/resources/log4j.properties
src/main/resources/log4j.properties
+23
-0
No files found.
.idea/.gitignore
0 → 100644
View file @
01a8a8e5
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/
pom.xml
0 → 100644
View file @
01a8a8e5
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<groupId>
com.zorkdata.datamask
</groupId>
<artifactId>
transactionLogMask
</artifactId>
<version>
0.1
</version>
<packaging>
jar
</packaging>
<name>
Guotai Transaction Log Mask Job
</name>
<properties>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<flink.version>
1.8.1
</flink.version>
<!-- <flink.version>1.11.2</flink.version>-->
<java.version>
1.8
</java.version>
<kafka.version>
0.8
</kafka.version>
<hadoop.version>
2.6.5
</hadoop.version>
<scala.binary.version>
2.11
</scala.binary.version>
<maven.compiler.source>
${java.version}
</maven.compiler.source>
<maven.compiler.target>
${java.version}
</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>
apache.snapshots
</id>
<name>
Apache Development Snapshot Repository
</name>
<url>
https://repository.apache.org/content/repositories/snapshots/
</url>
<releases>
<enabled>
false
</enabled>
</releases>
<snapshots>
<enabled>
true
</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-java
</artifactId>
<version>
${flink.version}
</version>
<scope>
provided
</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 -->
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-shaded-hadoop-2
</artifactId>
<version>
2.6.5-10.0
</version>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-streaming-java_${scala.binary.version}
</artifactId>
<version>
${flink.version}
</version>
<scope>
provided
</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-connector-kafka_2.12
</artifactId>
<version>
1.11.2
</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-filesystem -->
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-connector-filesystem_2.12
</artifactId>
<version>
1.8.3
</version>
</dependency>
<dependency>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-client
</artifactId>
<version>
${hadoop.version}
</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
-->
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>
org.apache.avro
</groupId>
<artifactId>
avro
</artifactId>
<version>
1.8.2
</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-tools -->
<dependency>
<groupId>
org.apache.avro
</groupId>
<artifactId>
avro-tools
</artifactId>
<version>
1.8.2
</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-avro
</artifactId>
<version>
1.8.3
</version>
</dependency>
<dependency>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-common
</artifactId>
<version>
2.6.5
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-hadoop-compatibility_2.11
</artifactId>
<version>
${flink.version}
</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-log4j12
</artifactId>
<version>
1.7.7
</version>
<!-- <scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>
log4j
</groupId>
<artifactId>
log4j
</artifactId>
<version>
1.2.17
</version>
<!-- <scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>
com.alibaba
</groupId>
<artifactId>
fastjson
</artifactId>
<version>
1.2.73
</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>
org.projectlombok
</groupId>
<artifactId>
lombok
</artifactId>
<version>
1.18.12
</version>
<!-- <scope>provided</scope>-->
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<version>
3.1
</version>
<configuration>
<source>
${java.version}
</source>
<target>
${java.version}
</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-shade-plugin
</artifactId>
<version>
3.0.0
</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>
package
</phase>
<goals>
<goal>
shade
</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>
org.apache.flink:force-shading
</exclude>
<exclude>
com.google.code.findbugs:jsr305
</exclude>
<exclude>
org.slf4j:*
</exclude>
<exclude>
log4j:*
</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>
*:*
</artifact>
<excludes>
<exclude>
META-INF/*.SF
</exclude>
<exclude>
META-INF/*.DSA
</exclude>
<exclude>
META-INF/*.RSA
</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation=
"org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"
>
<mainClass>
com.zorkdata.datamask.TransactionLogMask
</mainClass>
</transformer>
<transformer
implementation=
"org.apache.maven.plugins.shade.resource.AppendingTransformer"
>
<resource>
reference.conf
</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- <!– This improves the out-of-the-box experience in Eclipse by resolving some warnings. –>-->
<!-- <plugin>-->
<!-- <groupId>org.eclipse.m2e</groupId>-->
<!-- <artifactId>lifecycle-mapping</artifactId>-->
<!-- <version>1.0.0</version>-->
<!-- <configuration>-->
<!-- <lifecycleMappingMetadata>-->
<!-- <pluginExecutions>-->
<!-- <pluginExecution>-->
<!-- <pluginExecutionFilter>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-shade-plugin</artifactId>-->
<!-- <versionRange>[3.0.0,)</versionRange>-->
<!-- <goals>-->
<!-- <goal>shade</goal>-->
<!-- </goals>-->
<!-- </pluginExecutionFilter>-->
<!-- <action>-->
<!-- <ignore/>-->
<!-- </action>-->
<!-- </pluginExecution>-->
<!-- <pluginExecution>-->
<!-- <pluginExecutionFilter>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-compiler-plugin</artifactId>-->
<!-- <versionRange>[3.1,)</versionRange>-->
<!-- <goals>-->
<!-- <goal>testCompile</goal>-->
<!-- <goal>compile</goal>-->
<!-- </goals>-->
<!-- </pluginExecutionFilter>-->
<!-- <action>-->
<!-- <ignore/>-->
<!-- </action>-->
<!-- </pluginExecution>-->
<!-- </pluginExecutions>-->
<!-- </lifecycleMappingMetadata>-->
<!-- </configuration>-->
<!-- </plugin>-->
</plugins>
</pluginManagement>
</build>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>
add-dependencies-for-IDEA
</id>
<activation>
<property>
<name>
idea.version
</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-java
</artifactId>
<version>
${flink.version}
</version>
<scope>
compile
</scope>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-streaming-java_${scala.binary.version}
</artifactId>
<version>
${flink.version}
</version>
<scope>
compile
</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
\ No newline at end of file
src/main/java/com/zorkdata/datamask/TransactionLogMask.java
0 → 100644
View file @
01a8a8e5
package
com.zorkdata.datamask
;
import
com.zorkdata.datamask.domain.LogData
;
import
com.zorkdata.datamask.domain.TransactionLog
;
import
com.zorkdata.datamask.util.avro.ZorkAvroFormat
;
import
org.apache.avro.io.DatumReader
;
import
org.apache.avro.io.Decoder
;
import
org.apache.avro.io.DecoderFactory
;
import
org.apache.avro.mapred.AvroKey
;
import
org.apache.avro.specific.SpecificDatumReader
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.api.java.DataSet
;
import
org.apache.flink.api.java.ExecutionEnvironment
;
import
org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase
;
import
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat
;
import
org.apache.flink.api.java.operators.DataSource
;
import
org.apache.flink.api.java.operators.FlatMapOperator
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
org.apache.flink.core.io.InputSplit
;
//import org.apache.flink.formats.avro.AvroInputFormat;
import
org.apache.avro.mapred.AvroInputFormat
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
;
import
org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
;
import
org.apache.flink.util.Collector
;
import
org.apache.hadoop.conf.Configuration
;
import
org.apache.hadoop.fs.FileSystem
;
import
org.apache.hadoop.fs.LocatedFileStatus
;
import
org.apache.hadoop.fs.Path
;
import
org.apache.hadoop.fs.RemoteIterator
;
import
org.apache.hadoop.io.LongWritable
;
import
org.apache.hadoop.io.Text
;
import
org.apache.hadoop.mapred.InputFormat
;
import
org.apache.hadoop.mapred.JobConf
;
import
org.apache.hadoop.mapred.TextInputFormat
;
import
org.apache.hadoop.mapreduce.Job
;
//import org.apache.hadoop.mapred.jobcontrol.Job;
import
java.io.IOException
;
import
java.net.URI
;
import
java.net.URISyntaxException
;
import
java.text.SimpleDateFormat
;
import
java.time.ZoneId
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Properties
;
/**
* Description : 国泰交易日志脱敏job
*
* @author : wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date : Create in 2020/9/18 17:35
*/
public
class
TransactionLogMask
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
ParameterTool
params
=
ParameterTool
.
fromArgs
(
args
);
String
source
=
params
.
get
(
"source"
,
"hdfs"
);
if
(
"hdfs"
.
equals
(
source
))
{
maskHdfsLog
(
params
);
}
else
if
(
"kafka"
.
equals
(
source
))
{
maskKafkaLog
(
params
);
}
}
/**
* hdfs日志文件脱敏
*
* @param params 请求参数
* @return void
*/
public
static
void
maskHdfsLog
(
ParameterTool
params
)
throws
Exception
{
ExecutionEnvironment
env
=
ExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setParallelism
(
1
);
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd"
);
String
hdfsSrc
=
params
.
get
(
"hdfs-src"
);
String
hdfsDest
=
params
.
get
(
"hdfs-dest"
);
String
core
=
params
.
get
(
"core"
,
"c1"
);
String
date
=
params
.
get
(
"date"
,
sdf
.
format
(
new
Date
()));
String
startTime
=
params
.
get
(
"startTime"
);
String
endTime
=
params
.
get
(
"endTime"
);
// List<String> logFiles = filterHdfsLogFiles(hdfsSrc, date, startTime, endTime);
List
<
String
>
logFiles
=
new
ArrayList
<
String
>()
{
{
add
(
hdfsSrc
);
}
};
System
.
out
.
println
(
"---------------logFiles-----------------:"
+
logFiles
);
for
(
String
logFile
:
logFiles
)
{
// DataSet<String> textFileSource = env.readTextFile(logFile).name("hadoop-source");
// Job job = Job.getInstance();
// HadoopInputFormat<Text, LogData> hadoopInputFormat = new HadoopInputFormat<Text, LogData>((InputFormat<Text, LogData>) logDataInput, Text.class, LogData.class, new JobConf());
// HadoopInputFormat<Text, LogData> hadoopInputFormat = new HadoopInputFormat<Text, LogData>(new TextInputFormat(), Text.class, LogData.class, new JobConf());
// AvroInputFormat<LogData> logDataAvroFormat = new AvroInputFormat<>(new org.apache.flink.core.fs.Path(logFile), LogData.class);
// ZorkAvroFormat logDataAvroFormat = new ZorkAvroFormat<String, String>();
JobConf
jobConf
=
new
JobConf
();
HadoopInputFormat
<
AvroKey
,
LogData
>
hadoopInputFormat
=
new
HadoopInputFormat
<
AvroKey
,
LogData
>
(
new
AvroInputFormat
(),
AvroKey
.
class
,
LogData
.
class
,
jobConf
);
TextInputFormat
.
addInputPath
(
hadoopInputFormat
.
getJobConf
(),
new
Path
(
logFile
));
DataSource
<
Tuple2
<
AvroKey
,
LogData
>>
input
=
env
.
createInput
(
hadoopInputFormat
);
FlatMapOperator
<
Tuple2
<
AvroKey
,
LogData
>,
Object
>
logDataFlatMapOperator
=
input
.
flatMap
(
new
FlatMapFunction
<
Tuple2
<
AvroKey
,
LogData
>,
Object
>()
{
@Override
public
void
flatMap
(
Tuple2
<
AvroKey
,
LogData
>
value
,
Collector
<
Object
>
out
)
throws
Exception
{
System
.
out
.
println
(
"------------------"
+
value
);
}
});
// env.createInput(logDataInput).flatMap(new Avro2StrFlatMapFunction());
DataSet
<
Tuple2
<
AvroKey
,
LogData
>>
textFileSource
=
logDataFlatMapOperator
.
getInput
();
// DataSet<String> textFileSource = env.readTextFile(logFile).name("hadoop-source");
// DataSet<String> flatMap = textFileSource.map(new Avro2StrFlatMapFunction());
String
logFileName
=
logFile
.
split
(
"/"
)[
logFile
.
split
(
"/"
).
length
-
1
];
// flatMap.print();
String
filePath
=
hdfsDest
+
logFileName
;
System
.
out
.
println
(
"---------------writepath-----------------:"
+
filePath
);
textFileSource
.
writeAsText
(
filePath
,
org
.
apache
.
flink
.
core
.
fs
.
FileSystem
.
WriteMode
.
OVERWRITE
).
name
(
"hadoop-sink"
);
try
{
env
.
execute
(
"国泰交易日志脱敏job"
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
}
/**
* kafka消息数据脱敏
*
* @param params 请求参数
* @return void
*/
public
static
void
maskKafkaLog
(
ParameterTool
params
)
{
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setParallelism
(
1
);
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd"
);
String
servers
=
params
.
get
(
"servers"
);
String
zookeeper
=
params
.
get
(
"zookeeper"
);
String
topic
=
params
.
get
(
"topic"
);
String
hdfsDest
=
params
.
get
(
"hdfs-dest"
);
String
core
=
params
.
get
(
"core"
,
"c1"
);
String
date
=
params
.
get
(
"date"
,
sdf
.
format
(
new
Date
()));
String
startTime
=
params
.
get
(
"startTime"
);
String
endTime
=
params
.
get
(
"endTime"
);
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
servers
);
props
.
put
(
"zookeeper.connect"
,
zookeeper
);
props
.
put
(
"group.id"
,
"group1"
);
props
.
put
(
"enable.auto.commit"
,
false
);
props
.
put
(
"key.deserializer"
,
"org.apache.kafka.common.serialization.StringDeserializer"
);
props
.
put
(
"value.deserializer"
,
"org.apache.kafka.common.serialization.StringDeserializer"
);
props
.
put
(
"auto.offset.reset"
,
"earliest"
);
props
.
put
(
"max.poll.records"
,
1000
);
SingleOutputStreamOperator
<
String
>
dataStreamSource
=
env
.
addSource
(
new
FlinkKafkaConsumer
<>(
topic
,
new
SimpleStringSchema
(),
props
)).
setParallelism
(
1
);
// TODO 根据date、startTime、endTime过滤
BucketingSink
<
String
>
hdfsSink
=
new
BucketingSink
<>(
hdfsDest
);
//创建一个按照时间创建目录的bucketer,默认是yyyy-MM-dd--HH,时区默认是美国时间。这里我都改了,一天创建一次目录,上海时间
hdfsSink
.
setBucketer
(
new
DateTimeBucketer
<
String
>(
"yyyy-MM-dd"
,
ZoneId
.
of
(
"Asia/Shanghai"
)));
//设置每个文件的最大大小 ,默认是384M(1024 * 1024 * 384)
hdfsSink
.
setBatchSize
(
1024
*
1024
*
384
);
//设置多少时间,就换一个文件写
hdfsSink
.
setBatchRolloverInterval
(
1000
*
60
*
60
);
hdfsSink
.
setPendingSuffix
(
"ccc"
);
hdfsSink
.
setInactiveBucketThreshold
(
60
*
1000L
);
hdfsSink
.
setInactiveBucketCheckInterval
(
60
*
1000L
);
hdfsSink
.
setAsyncTimeout
(
60
*
1000
);
dataStreamSource
.
addSink
(
hdfsSink
);
try
{
env
.
execute
(
"国泰交易日志脱敏job"
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
/**
* 过滤hdfs日志文件
*
* @param hdfs hdfs地址
* @param date 日期
* @param startTime 起始时间
* @param endTime 结束时间
* @return hdfs文件列表
*/
private
static
List
<
String
>
filterHdfsLogFiles
(
String
hdfs
,
String
date
,
String
startTime
,
String
endTime
)
{
if
(!
hdfs
.
endsWith
(
"/"
))
{
hdfs
+=
"/"
;
}
String
path
=
hdfs
;
if
(
null
!=
date
)
{
path
=
hdfs
+
date
;
}
Configuration
conf
=
new
Configuration
();
// conf.set("dfs.replication", "3");
// conf.set("fs.defaultFS", "hdfs://cdh-2:8020");
// conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
// conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
System
.
out
.
println
(
"---------------conf-----------------:"
+
conf
.
toString
());
List
<
String
>
logFiles
=
new
ArrayList
<>();
try
{
// FileSystem fileSystem = FileSystem.get(conf);
FileSystem
fileSystem
=
null
;
try
{
fileSystem
=
FileSystem
.
get
(
new
URI
(
"hdfs://cdh-2:8020/"
),
conf
,
"hdfs"
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
RemoteIterator
<
LocatedFileStatus
>
locatedFileStatusRemoteIterator
=
fileSystem
.
listFiles
(
new
Path
(
path
),
false
);
while
(
locatedFileStatusRemoteIterator
.
hasNext
())
{
LocatedFileStatus
next
=
locatedFileStatusRemoteIterator
.
next
();
Path
path1
=
next
.
getPath
();
System
.
out
.
println
(
"---------------path1-----------------:"
+
path1
.
toString
());
logFiles
.
add
(
path1
.
toString
());
}
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
catch
(
URISyntaxException
e
)
{
e
.
printStackTrace
();
}
return
logFiles
;
}
private
static
TransactionLog
avroSerialize
(
byte
[]
data
)
throws
IOException
{
DatumReader
<
TransactionLog
>
reader
=
new
SpecificDatumReader
<
TransactionLog
>(
TransactionLog
.
class
);
Decoder
decoder
=
DecoderFactory
.
get
().
binaryDecoder
(
data
,
null
);
TransactionLog
transactionLog
=
reader
.
read
(
null
,
decoder
);
System
.
out
.
println
(
transactionLog
);
return
transactionLog
;
}
private
static
TransactionLog
avroDeserialize
(
byte
[]
data
)
throws
IOException
{
DatumReader
<
TransactionLog
>
reader
=
new
SpecificDatumReader
<
TransactionLog
>(
TransactionLog
.
class
);
Decoder
decoder
=
DecoderFactory
.
get
().
binaryDecoder
(
data
,
null
);
TransactionLog
transactionLog
=
reader
.
read
(
null
,
decoder
);
System
.
out
.
println
(
transactionLog
);
return
transactionLog
;
}
}
\ No newline at end of file
src/main/java/com/zorkdata/datamask/domain/LogData.java
0 → 100644
View file @
01a8a8e5
package
com.zorkdata.datamask.domain
;
import
lombok.Data
;
import
org.joda.time.DateTime
;
import
java.io.Serializable
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* @author wanghaiying
* @Description LogData
* @Email wanghaiying@zork.com.cn
* @Date 2020/9/25 10:00
*/
@Data
@SuppressWarnings
(
"all"
)
public
class
LogData
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;
/**
* logTypeName 日志类型
*/
private
String
logTypeName
;
/**
* timestamp 时间戳
*/
private
String
timestamp
;
/**
* source
*/
private
String
source
;
/**
* offset 偏移量
*/
private
String
offset
;
/**
* dimensions 维度
*/
private
Map
<
String
,
String
>
dimensions
;
/**
* measures
*/
private
Map
<
String
,
Double
>
measures
;
/**
* normalFields
*/
private
Map
<
String
,
String
>
normalFields
;
// public LogData() {
// }
// public String getLogTypeName() {
// return logTypeName;
// }
//
// public void setLogTypeName(String logTypeName) {
// this.logTypeName = logTypeName;
// }
//
// public String getTimestamp() {
// return timestamp;
// }
//
// public void setTimestamp(String timestamp) {
// this.timestamp = timestamp;
// }
//
// public String getSource() {
// return source;
// }
//
// public void setSource(String source) {
// this.source = source;
// }
//
// public String getOffset() {
// return offset;
// }
//
// public void setOffset(String offset) {
// this.offset = offset;
// }
//
// public Map<String, String> getDimensions() {
// return dimensions;
// }
//
// public void setDimensions(Map<String, String> dimensions) {
// this.dimensions = new HashMap<>(50);
// for (Map.Entry entry : dimensions.entrySet()) {
// this.dimensions.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
// }
// }
//
// public Map<String, Double> getMeasures() {
// return measures;
// }
//
// public void setMeasures(Map<String, Double> measures) {
// this.measures = new HashMap<>(50);
// for (Map.Entry entry : measures.entrySet()) {
// this.measures.put(String.valueOf(entry.getKey()), Double.valueOf(String.valueOf(entry.getValue())));
// }
// }
//
// public Map<String, String> getNormalFields() {
// return normalFields;
// }
//
// public void setNormalFields(Map<String, String> normalFields) {
// this.normalFields = new HashMap<>(50);
// for (Map.Entry entry : normalFields.entrySet()) {
// this.normalFields.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
// }
// }
@Override
public
String
toString
()
{
return
new
DateTime
(
timestamp
).
toDate
().
getTime
()
+
" ZorkLogData{"
+
"logTypeName='"
+
logTypeName
+
'\''
+
", timestamp='"
+
timestamp
+
'\''
+
", source='"
+
source
+
'\''
+
", offset='"
+
offset
+
'\''
+
", dimensions="
+
dimensions
+
", measures="
+
measures
+
", normalFields="
+
normalFields
+
'}'
;
}
}
src/main/java/com/zorkdata/datamask/domain/TransactionLog.java
0 → 100644
View file @
01a8a8e5
/**
* Autogenerated by Avro
* <p>
* DO NOT EDIT DIRECTLY
*/
package
com.zorkdata.datamask.domain
;
import
org.apache.avro.message.BinaryMessageDecoder
;
import
org.apache.avro.message.BinaryMessageEncoder
;
import
org.apache.avro.message.SchemaStore
;
import
org.apache.avro.specific.SpecificData
;
import
java.util.Map
;
@SuppressWarnings
(
"all"
)
@org
.
apache
.
avro
.
specific
.
AvroGenerated
public
class
TransactionLog
extends
org
.
apache
.
avro
.
specific
.
SpecificRecordBase
implements
org
.
apache
.
avro
.
specific
.
SpecificRecord
{
private
static
final
long
serialVersionUID
=
-
4444562953482178409L
;
public
static
final
org
.
apache
.
avro
.
Schema
SCHEMA
$
=
new
org
.
apache
.
avro
.
Schema
.
Parser
().
parse
(
"{\"type\":\"record\",\"name\":\"logs\",\"namespace\":\"com.zork.logs\",\"fields\":[{\"name\":\"logTypeName\",\"type\":[\"string\",\"null\"]},{\"name\":\"timestamp\",\"type\":[\"string\",\"null\"]},{\"name\":\"source\",\"type\":[\"string\",\"null\"]},{\"name\":\"offset\",\"type\":[\"string\",\"null\"]},{\"name\":\"dimensions\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}]},{\"name\":\"measures\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"double\"}]},{\"name\":\"normalfields\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}]}]}"
);
public
static
org
.
apache
.
avro
.
Schema
getClassSchema
()
{
return
SCHEMA
$
;
}
private
static
SpecificData
MODEL
$
=
new
SpecificData
();
private
static
final
BinaryMessageEncoder
<
TransactionLog
>
ENCODER
=
new
BinaryMessageEncoder
<
TransactionLog
>(
MODEL
$
,
SCHEMA
$
);
private
static
final
BinaryMessageDecoder
<
TransactionLog
>
DECODER
=
new
BinaryMessageDecoder
<
TransactionLog
>(
MODEL
$
,
SCHEMA
$
);
/**
* Return the BinaryMessageDecoder instance used by this class.
*/
public
static
BinaryMessageDecoder
<
TransactionLog
>
getDecoder
()
{
return
DECODER
;
}
/**
* Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
* @param resolver a {@link SchemaStore} used to find schemas by fingerprint
*/
public
static
BinaryMessageDecoder
<
TransactionLog
>
createDecoder
(
SchemaStore
resolver
)
{
return
new
BinaryMessageDecoder
<
TransactionLog
>(
MODEL
$
,
SCHEMA
$
,
resolver
);
}
/** Serializes this TransactionLog to a ByteBuffer. */
public
java
.
nio
.
ByteBuffer
toByteBuffer
()
throws
java
.
io
.
IOException
{
return
ENCODER
.
encode
(
this
);
}
/** Deserializes a TransactionLog from a ByteBuffer. */
public
static
TransactionLog
fromByteBuffer
(
java
.
nio
.
ByteBuffer
b
)
throws
java
.
io
.
IOException
{
return
DECODER
.
decode
(
b
);
}
@Deprecated
public
CharSequence
logTypeName
;
@Deprecated
public
CharSequence
timestamp
;
@Deprecated
public
CharSequence
source
;
@Deprecated
public
CharSequence
offset
;
@Deprecated
public
Map
<
CharSequence
,
CharSequence
>
dimensions
;
@Deprecated
public
Map
<
CharSequence
,
Double
>
measures
;
@Deprecated
public
Map
<
CharSequence
,
CharSequence
>
normalfields
;
/**
* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
*/
public
TransactionLog
()
{
}
/**
* All-args constructor.
* @param logTypeName The new value for logTypeName
* @param timestamp The new value for timestamp
* @param source The new value for source
* @param offset The new value for offset
* @param dimensions The new value for dimensions
* @param measures The new value for measures
* @param normalfields The new value for normalfields
*/
public
TransactionLog
(
CharSequence
logTypeName
,
CharSequence
timestamp
,
CharSequence
source
,
CharSequence
offset
,
Map
<
CharSequence
,
CharSequence
>
dimensions
,
Map
<
CharSequence
,
Double
>
measures
,
Map
<
CharSequence
,
CharSequence
>
normalfields
)
{
this
.
logTypeName
=
logTypeName
;
this
.
timestamp
=
timestamp
;
this
.
source
=
source
;
this
.
offset
=
offset
;
this
.
dimensions
=
dimensions
;
this
.
measures
=
measures
;
this
.
normalfields
=
normalfields
;
}
public
org
.
apache
.
avro
.
Schema
getSchema
()
{
return
SCHEMA
$
;
}
// Used by DatumWriter. Applications should not call.
public
Object
get
(
int
field
$
)
{
switch
(
field
$
)
{
case
0
:
return
logTypeName
;
case
1
:
return
timestamp
;
case
2
:
return
source
;
case
3
:
return
offset
;
case
4
:
return
dimensions
;
case
5
:
return
measures
;
case
6
:
return
normalfields
;
default
:
throw
new
org
.
apache
.
avro
.
AvroRuntimeException
(
"Bad index"
);
}
}
// Used by DatumReader. Applications should not call.
@SuppressWarnings
(
value
=
"unchecked"
)
public
void
put
(
int
field
$
,
Object
value
$
)
{
switch
(
field
$
)
{
case
0
:
logTypeName
=
(
CharSequence
)
value
$
;
break
;
case
1
:
timestamp
=
(
CharSequence
)
value
$
;
break
;
case
2
:
source
=
(
CharSequence
)
value
$
;
break
;
case
3
:
offset
=
(
CharSequence
)
value
$
;
break
;
case
4
:
dimensions
=
(
Map
<
CharSequence
,
CharSequence
>)
value
$
;
break
;
case
5
:
measures
=
(
Map
<
CharSequence
,
Double
>)
value
$
;
break
;
case
6
:
normalfields
=
(
Map
<
CharSequence
,
CharSequence
>)
value
$
;
break
;
default
:
throw
new
org
.
apache
.
avro
.
AvroRuntimeException
(
"Bad index"
);
}
}
/**
* Gets the value of the 'logTypeName' field.
* @return The value of the 'logTypeName' field.
*/
public
CharSequence
getLogTypeName
()
{
return
logTypeName
;
}
/**
* Sets the value of the 'logTypeName' field.
* @param value the value to set.
*/
public
void
setLogTypeName
(
CharSequence
value
)
{
this
.
logTypeName
=
value
;
}
/**
* Gets the value of the 'timestamp' field.
* @return The value of the 'timestamp' field.
*/
public
CharSequence
getTimestamp
()
{
return
timestamp
;
}
/**
* Sets the value of the 'timestamp' field.
* @param value the value to set.
*/
public
void
setTimestamp
(
CharSequence
value
)
{
this
.
timestamp
=
value
;
}
/**
* Gets the value of the 'source' field.
* @return The value of the 'source' field.
*/
public
CharSequence
getSource
()
{
return
source
;
}
/**
* Sets the value of the 'source' field.
* @param value the value to set.
*/
public
void
setSource
(
CharSequence
value
)
{
this
.
source
=
value
;
}
/**
* Gets the value of the 'offset' field.
* @return The value of the 'offset' field.
*/
public
CharSequence
getOffset
()
{
return
offset
;
}
/**
* Sets the value of the 'offset' field.
* @param value the value to set.
*/
public
void
setOffset
(
CharSequence
value
)
{
this
.
offset
=
value
;
}
/**
* Gets the value of the 'dimensions' field.
* @return The value of the 'dimensions' field.
*/
public
Map
<
CharSequence
,
CharSequence
>
getDimensions
()
{
return
dimensions
;
}
/**
* Sets the value of the 'dimensions' field.
* @param value the value to set.
*/
public
void
setDimensions
(
Map
<
CharSequence
,
CharSequence
>
value
)
{
this
.
dimensions
=
value
;
}
/**
* Gets the value of the 'measures' field.
* @return The value of the 'measures' field.
*/
public
Map
<
CharSequence
,
Double
>
getMeasures
()
{
return
measures
;
}
/**
* Sets the value of the 'measures' field.
* @param value the value to set.
*/
public
void
setMeasures
(
Map
<
CharSequence
,
Double
>
value
)
{
this
.
measures
=
value
;
}
/**
* Gets the value of the 'normalfields' field.
* @return The value of the 'normalfields' field.
*/
public
Map
<
CharSequence
,
CharSequence
>
getNormalFields
()
{
return
normalfields
;
}
/**
* Sets the value of the 'normalfields' field.
* @param value the value to set.
*/
public
void
setNormalFields
(
Map
<
CharSequence
,
CharSequence
>
value
)
{
this
.
normalfields
=
value
;
}
/**
* Creates a new TransactionLog RecordBuilder.
* @return A new TransactionLog RecordBuilder
*/
public
static
Builder
newBuilder
()
{
return
new
Builder
();
}
/**
* Creates a new TransactionLog RecordBuilder by copying an existing Builder.
* @param other The existing builder to copy.
* @return A new TransactionLog RecordBuilder
*/
public
static
Builder
newBuilder
(
Builder
other
)
{
return
new
Builder
(
other
);
}
/**
* Creates a new TransactionLog RecordBuilder by copying an existing TransactionLog instance.
* @param other The existing instance to copy.
* @return A new TransactionLog RecordBuilder
*/
public
static
Builder
newBuilder
(
TransactionLog
other
)
{
return
new
Builder
(
other
);
}
/**
* RecordBuilder for TransactionLog instances.
*/
public
static
class
Builder
extends
org
.
apache
.
avro
.
specific
.
SpecificRecordBuilderBase
<
TransactionLog
>
implements
org
.
apache
.
avro
.
data
.
RecordBuilder
<
TransactionLog
>
{
private
CharSequence
logTypeName
;
private
CharSequence
timestamp
;
private
CharSequence
source
;
private
CharSequence
offset
;
private
Map
<
CharSequence
,
CharSequence
>
dimensions
;
private
Map
<
CharSequence
,
Double
>
measures
;
private
Map
<
CharSequence
,
CharSequence
>
normalfields
;
/** Creates a new Builder */
private
Builder
()
{
super
(
SCHEMA
$
);
}
/**
* Creates a Builder by copying an existing Builder.
* @param other The existing Builder to copy.
*/
private
Builder
(
Builder
other
)
{
super
(
other
);
if
(
isValidValue
(
fields
()[
0
],
other
.
logTypeName
))
{
this
.
logTypeName
=
data
().
deepCopy
(
fields
()[
0
].
schema
(),
other
.
logTypeName
);
fieldSetFlags
()[
0
]
=
true
;
}
if
(
isValidValue
(
fields
()[
1
],
other
.
timestamp
))
{
this
.
timestamp
=
data
().
deepCopy
(
fields
()[
1
].
schema
(),
other
.
timestamp
);
fieldSetFlags
()[
1
]
=
true
;
}
if
(
isValidValue
(
fields
()[
2
],
other
.
source
))
{
this
.
source
=
data
().
deepCopy
(
fields
()[
2
].
schema
(),
other
.
source
);
fieldSetFlags
()[
2
]
=
true
;
}
if
(
isValidValue
(
fields
()[
3
],
other
.
offset
))
{
this
.
offset
=
data
().
deepCopy
(
fields
()[
3
].
schema
(),
other
.
offset
);
fieldSetFlags
()[
3
]
=
true
;
}
if
(
isValidValue
(
fields
()[
4
],
other
.
dimensions
))
{
this
.
dimensions
=
data
().
deepCopy
(
fields
()[
4
].
schema
(),
other
.
dimensions
);
fieldSetFlags
()[
4
]
=
true
;
}
if
(
isValidValue
(
fields
()[
5
],
other
.
measures
))
{
this
.
measures
=
data
().
deepCopy
(
fields
()[
5
].
schema
(),
other
.
measures
);
fieldSetFlags
()[
5
]
=
true
;
}
if
(
isValidValue
(
fields
()[
6
],
other
.
normalfields
))
{
this
.
normalfields
=
data
().
deepCopy
(
fields
()[
6
].
schema
(),
other
.
normalfields
);
fieldSetFlags
()[
6
]
=
true
;
}
}
/**
* Creates a Builder by copying an existing TransactionLog instance
* @param other The existing instance to copy.
*/
private
Builder
(
TransactionLog
other
)
{
super
(
SCHEMA
$
);
if
(
isValidValue
(
fields
()[
0
],
other
.
logTypeName
))
{
this
.
logTypeName
=
data
().
deepCopy
(
fields
()[
0
].
schema
(),
other
.
logTypeName
);
fieldSetFlags
()[
0
]
=
true
;
}
if
(
isValidValue
(
fields
()[
1
],
other
.
timestamp
))
{
this
.
timestamp
=
data
().
deepCopy
(
fields
()[
1
].
schema
(),
other
.
timestamp
);
fieldSetFlags
()[
1
]
=
true
;
}
if
(
isValidValue
(
fields
()[
2
],
other
.
source
))
{
this
.
source
=
data
().
deepCopy
(
fields
()[
2
].
schema
(),
other
.
source
);
fieldSetFlags
()[
2
]
=
true
;
}
if
(
isValidValue
(
fields
()[
3
],
other
.
offset
))
{
this
.
offset
=
data
().
deepCopy
(
fields
()[
3
].
schema
(),
other
.
offset
);
fieldSetFlags
()[
3
]
=
true
;
}
if
(
isValidValue
(
fields
()[
4
],
other
.
dimensions
))
{
this
.
dimensions
=
data
().
deepCopy
(
fields
()[
4
].
schema
(),
other
.
dimensions
);
fieldSetFlags
()[
4
]
=
true
;
}
if
(
isValidValue
(
fields
()[
5
],
other
.
measures
))
{
this
.
measures
=
data
().
deepCopy
(
fields
()[
5
].
schema
(),
other
.
measures
);
fieldSetFlags
()[
5
]
=
true
;
}
if
(
isValidValue
(
fields
()[
6
],
other
.
normalfields
))
{
this
.
normalfields
=
data
().
deepCopy
(
fields
()[
6
].
schema
(),
other
.
normalfields
);
fieldSetFlags
()[
6
]
=
true
;
}
}
/**
* Gets the value of the 'logTypeName' field.
* @return The value.
*/
public
CharSequence
getLogTypeName
()
{
return
logTypeName
;
}
/**
* Sets the value of the 'logTypeName' field.
* @param value The value of 'logTypeName'.
* @return This builder.
*/
public
Builder
setLogTypeName
(
CharSequence
value
)
{
validate
(
fields
()[
0
],
value
);
this
.
logTypeName
=
value
;
fieldSetFlags
()[
0
]
=
true
;
return
this
;
}
/**
* Checks whether the 'logTypeName' field has been set.
* @return True if the 'logTypeName' field has been set, false otherwise.
*/
public
boolean
hasLogTypeName
()
{
return
fieldSetFlags
()[
0
];
}
/**
* Clears the value of the 'logTypeName' field.
* @return This builder.
*/
public
Builder
clearLogTypeName
()
{
logTypeName
=
null
;
fieldSetFlags
()[
0
]
=
false
;
return
this
;
}
/**
* Gets the value of the 'timestamp' field.
* @return The value.
*/
public
CharSequence
getTimestamp
()
{
return
timestamp
;
}
/**
* Sets the value of the 'timestamp' field.
* @param value The value of 'timestamp'.
* @return This builder.
*/
public
Builder
setTimestamp
(
CharSequence
value
)
{
validate
(
fields
()[
1
],
value
);
this
.
timestamp
=
value
;
fieldSetFlags
()[
1
]
=
true
;
return
this
;
}
/**
* Checks whether the 'timestamp' field has been set.
* @return True if the 'timestamp' field has been set, false otherwise.
*/
public
boolean
hasTimestamp
()
{
return
fieldSetFlags
()[
1
];
}
/**
* Clears the value of the 'timestamp' field.
* @return This builder.
*/
public
Builder
clearTimestamp
()
{
timestamp
=
null
;
fieldSetFlags
()[
1
]
=
false
;
return
this
;
}
/**
* Gets the value of the 'source' field.
* @return The value.
*/
public
CharSequence
getSource
()
{
return
source
;
}
/**
* Sets the value of the 'source' field.
* @param value The value of 'source'.
* @return This builder.
*/
public
Builder
setSource
(
CharSequence
value
)
{
validate
(
fields
()[
2
],
value
);
this
.
source
=
value
;
fieldSetFlags
()[
2
]
=
true
;
return
this
;
}
/**
* Checks whether the 'source' field has been set.
* @return True if the 'source' field has been set, false otherwise.
*/
public
boolean
hasSource
()
{
return
fieldSetFlags
()[
2
];
}
/**
* Clears the value of the 'source' field.
* @return This builder.
*/
public
Builder
clearSource
()
{
source
=
null
;
fieldSetFlags
()[
2
]
=
false
;
return
this
;
}
/**
* Gets the value of the 'offset' field.
* @return The value.
*/
public
CharSequence
getOffset
()
{
return
offset
;
}
/**
* Sets the value of the 'offset' field.
* @param value The value of 'offset'.
* @return This builder.
*/
public
Builder
setOffset
(
CharSequence
value
)
{
validate
(
fields
()[
3
],
value
);
this
.
offset
=
value
;
fieldSetFlags
()[
3
]
=
true
;
return
this
;
}
/**
* Checks whether the 'offset' field has been set.
* @return True if the 'offset' field has been set, false otherwise.
*/
public
boolean
hasOffset
()
{
return
fieldSetFlags
()[
3
];
}
/**
* Clears the value of the 'offset' field.
* @return This builder.
*/
public
Builder
clearOffset
()
{
offset
=
null
;
fieldSetFlags
()[
3
]
=
false
;
return
this
;
}
/**
* Gets the value of the 'dimensions' field.
* @return The value.
*/
public
Map
<
CharSequence
,
CharSequence
>
getDimensions
()
{
return
dimensions
;
}
/**
* Sets the value of the 'dimensions' field.
* @param value The value of 'dimensions'.
* @return This builder.
*/
public
Builder
setDimensions
(
Map
<
CharSequence
,
CharSequence
>
value
)
{
validate
(
fields
()[
4
],
value
);
this
.
dimensions
=
value
;
fieldSetFlags
()[
4
]
=
true
;
return
this
;
}
/**
* Checks whether the 'dimensions' field has been set.
* @return True if the 'dimensions' field has been set, false otherwise.
*/
public
boolean
hasDimensions
()
{
return
fieldSetFlags
()[
4
];
}
/**
* Clears the value of the 'dimensions' field.
* @return This builder.
*/
public
Builder
clearDimensions
()
{
dimensions
=
null
;
fieldSetFlags
()[
4
]
=
false
;
return
this
;
}
/**
* Gets the value of the 'measures' field.
* @return The value.
*/
public
Map
<
CharSequence
,
Double
>
getMeasures
()
{
return
measures
;
}
/**
* Sets the value of the 'measures' field.
* @param value The value of 'measures'.
* @return This builder.
*/
public
Builder
setMeasures
(
Map
<
CharSequence
,
Double
>
value
)
{
validate
(
fields
()[
5
],
value
);
this
.
measures
=
value
;
fieldSetFlags
()[
5
]
=
true
;
return
this
;
}
/**
* Checks whether the 'measures' field has been set.
* @return True if the 'measures' field has been set, false otherwise.
*/
public
boolean
hasMeasures
()
{
return
fieldSetFlags
()[
5
];
}
/**
* Clears the value of the 'measures' field.
* @return This builder.
*/
public
Builder
clearMeasures
()
{
measures
=
null
;
fieldSetFlags
()[
5
]
=
false
;
return
this
;
}
/**
* Gets the value of the 'normalfields' field.
* @return The value.
*/
public
Map
<
CharSequence
,
CharSequence
>
getNormalfields
()
{
return
normalfields
;
}
/**
* Sets the value of the 'normalfields' field.
* @param value The value of 'normalfields'.
* @return This builder.
*/
public
Builder
setNormalfields
(
Map
<
CharSequence
,
CharSequence
>
value
)
{
validate
(
fields
()[
6
],
value
);
this
.
normalfields
=
value
;
fieldSetFlags
()[
6
]
=
true
;
return
this
;
}
/**
* Checks whether the 'normalfields' field has been set.
* @return True if the 'normalfields' field has been set, false otherwise.
*/
public
boolean
hasNormalfields
()
{
return
fieldSetFlags
()[
6
];
}
/**
* Clears the value of the 'normalfields' field.
* @return This builder.
*/
public
Builder
clearNormalfields
()
{
normalfields
=
null
;
fieldSetFlags
()[
6
]
=
false
;
return
this
;
}
@Override
@SuppressWarnings
(
"unchecked"
)
public
TransactionLog
build
()
{
try
{
TransactionLog
record
=
new
TransactionLog
();
record
.
logTypeName
=
fieldSetFlags
()[
0
]
?
this
.
logTypeName
:
(
CharSequence
)
defaultValue
(
fields
()[
0
]);
record
.
timestamp
=
fieldSetFlags
()[
1
]
?
this
.
timestamp
:
(
CharSequence
)
defaultValue
(
fields
()[
1
]);
record
.
source
=
fieldSetFlags
()[
2
]
?
this
.
source
:
(
CharSequence
)
defaultValue
(
fields
()[
2
]);
record
.
offset
=
fieldSetFlags
()[
3
]
?
this
.
offset
:
(
CharSequence
)
defaultValue
(
fields
()[
3
]);
record
.
dimensions
=
fieldSetFlags
()[
4
]
?
this
.
dimensions
:
(
Map
<
CharSequence
,
CharSequence
>)
defaultValue
(
fields
()[
4
]);
record
.
measures
=
fieldSetFlags
()[
5
]
?
this
.
measures
:
(
Map
<
CharSequence
,
Double
>)
defaultValue
(
fields
()[
5
]);
record
.
normalfields
=
fieldSetFlags
()[
6
]
?
this
.
normalfields
:
(
Map
<
CharSequence
,
CharSequence
>)
defaultValue
(
fields
()[
6
]);
return
record
;
}
catch
(
Exception
e
)
{
throw
new
org
.
apache
.
avro
.
AvroRuntimeException
(
e
);
}
}
}
@SuppressWarnings
(
"unchecked"
)
private
static
final
org
.
apache
.
avro
.
io
.
DatumWriter
<
TransactionLog
>
WRITER
$
=
(
org
.
apache
.
avro
.
io
.
DatumWriter
<
TransactionLog
>)
MODEL
$
.
createDatumWriter
(
SCHEMA
$
);
@Override
public
void
writeExternal
(
java
.
io
.
ObjectOutput
out
)
throws
java
.
io
.
IOException
{
WRITER
$
.
write
(
this
,
SpecificData
.
getEncoder
(
out
));
}
@SuppressWarnings
(
"unchecked"
)
private
static
final
org
.
apache
.
avro
.
io
.
DatumReader
<
TransactionLog
>
READER
$
=
(
org
.
apache
.
avro
.
io
.
DatumReader
<
TransactionLog
>)
MODEL
$
.
createDatumReader
(
SCHEMA
$
);
@Override
public
void
readExternal
(
java
.
io
.
ObjectInput
in
)
throws
java
.
io
.
IOException
{
READER
$
.
read
(
this
,
SpecificData
.
getDecoder
(
in
));
}
}
src/main/java/com/zorkdata/datamask/domain/log.avro
0 → 100644
View file @
01a8a8e5
{
"namespace": "com.zork.logs",
"type": "record",
"name": "logs",
"fields": [
{
"name": "logTypeName",
"type": [
"string",
"null"
]
},
{
"name": "timestamp",
"type": [
"string",
"null"
]
},
{
"name": "source",
"type": [
"string",
"null"
]
},
{
"name": "offset",
"type": [
"string",
"null"
]
},
{
"name": "dimensions",
"type": [
"null",
{
"type": "map",
"values": "string"
}
]
},
{
"name": "measures",
"type": [
"null",
{
"type": "map",
"values": "double"
}
]
},
{
"name": "normalfields",
"type": [
"null",
{
"type": "map",
"values": "string"
}
]
}
]
}
src/main/java/com/zorkdata/datamask/function/Avro2StrFlatMapFunction.java
0 → 100644
View file @
01a8a8e5
package
com.zorkdata.datamask.function
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.TypeReference
;
import
com.zorkdata.datamask.domain.LogData
;
import
com.zorkdata.datamask.util.avro.AvroDeserializer
;
import
com.zorkdata.datamask.util.avro.AvroDeserializerFactory
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.avro.generic.GenericRecord
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.RichFlatMapFunction
;
import
org.apache.flink.util.Collector
;
/**
* @author xiese
* @Description Avro2StrFlatMapFunction
* @Email xiesen310@163.com
* @Date 2020/9/26 23:14
*/
@Slf4j
public
class
Avro2StrFlatMapFunction
implements
FlatMapFunction
<
String
,
LogData
>
{
@Override
public
void
flatMap
(
String
value
,
Collector
<
LogData
>
out
)
throws
Exception
{
try
{
if
(
null
!=
value
)
{
AvroDeserializer
logsDeserializer
=
AvroDeserializerFactory
.
getLogsDeserializer
();
GenericRecord
record
=
logsDeserializer
.
deserializing
(
value
.
getBytes
());
// System.out.println("----------record---------"+record);
if
(
null
!=
record
)
{
LogData
logData
=
JSON
.
parseObject
(
record
.
toString
(),
new
TypeReference
<
LogData
>()
{
});
// System.out.println("----------logData---------"+logData);
// out.collect(JSON.toJSONString(logData));
out
.
collect
(
logData
);
}
}
}
catch
(
Exception
e
)
{
log
.
error
(
"avro 反序列化失败,错误信息: {}"
,
e
.
getMessage
(),
e
);
}
}
}
src/main/java/com/zorkdata/datamask/util/AvroTest.java
0 → 100644
View file @
01a8a8e5
package
com.zorkdata.datamask.util
;
import
com.alibaba.fastjson.JSON
;
import
com.zorkdata.datamask.domain.LogData
;
import
com.zorkdata.datamask.domain.TransactionLog
;
import
com.zorkdata.datamask.util.avro.AvroDeserializer
;
import
com.zorkdata.datamask.util.avro.AvroDeserializerFactory
;
import
com.zorkdata.datamask.util.avro.AvroSerializerFactory
;
import
org.apache.avro.file.DataFileReader
;
import
org.apache.avro.file.DataFileWriter
;
import
org.apache.avro.generic.GenericRecord
;
import
org.apache.avro.io.DatumReader
;
import
org.apache.avro.io.DatumWriter
;
import
org.apache.avro.specific.SpecificDatumReader
;
import
org.apache.avro.specific.SpecificDatumWriter
;
import
java.io.*
;
import
java.util.HashMap
;
/**
* Description :
* https://www.cnblogs.com/fillPv/p/5009737.html
* @author : wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date : Create in 2020/9/23 4:43
*/
// java -jar avro-tools-1.10.0.jar compile schema log.avro .
//
public
class
AvroTest
{
public
static
void
main
(
String
[]
args
)
{
// Avro序列化,写avro文件
// TransactionLog transactionLog = new TransactionLog();
LogData
transactionLog
=
new
LogData
();
transactionLog
.
setLogTypeName
(
"kcbp_biz_log"
);
transactionLog
.
setTimestamp
(
"2020-09-18T13:59:53.000+08:00"
);
transactionLog
.
setSource
(
"d:\\\\kcbp\\\\log\\\\run\\\\20200918\\\\runlog23.log"
);
transactionLog
.
setOffset
(
"165683111"
);
HashMap
dimensions
=
new
HashMap
()
{{
put
(
"appsystem"
,
"jzjy"
);
put
(
"appprogramname"
,
"jzc9-kcbp1_9600"
);
put
(
"hostname"
,
"jzc9-kcbp1"
);
put
(
"func"
,
""
);
put
(
"nodeid"
,
""
);
put
(
"operway"
,
"W"
);
}};
transactionLog
.
setDimensions
(
dimensions
);
HashMap
measures
=
new
HashMap
<
String
,
Double
>()
{{
put
(
"latence"
,
0.0
);
put
(
"latency"
,
1.0
);
put
(
"spendtime"
,
0.5
);
}};
transactionLog
.
setMeasures
(
measures
);
HashMap
normalFields
=
new
HashMap
()
{{
put
(
"indexTime"
,
"2020-09-18T13:59:54.524+08:00"
);
put
(
"bsflag"
,
""
);
put
(
"productcode"
,
""
);
put
(
"developercode"
,
""
);
put
(
"fmillsecond"
,
""
);
put
(
"inputtype"
,
""
);
put
(
"logchecktime"
,
""
);
put
(
"message"
,
"身份证号码:372925199008075158,地址:上海浦东新区张江高科碧波路690号,手机号:15000101879,邮箱:wanghaiying@zork.com.cn"
);
put
(
"end_logtime"
,
""
);
put
(
"smillsecond"
,
"585606599"
);
put
(
"featurecode"
,
""
);
put
(
"orgid"
,
""
);
put
(
"authcode"
,
""
);
put
(
"collecttime"
,
"2020-09-18T13:59:53.529+08:00"
);
put
(
"fundid"
,
""
);
put
(
"deserializerTime"
,
"2020-09-18T13:59:53.671+08:00"
);
put
(
"messid"
,
"0000011404342B32233DDCDA"
);
put
(
"custid"
,
""
);
put
(
"netputr"
,
""
);
put
(
"versioninfo"
,
""
);
put
(
"beg_logtime"
,
"20200918-135953"
);
put
(
"authinfo"
,
""
);
}};
// transactionLog.setNormalfields(normalFields);
transactionLog
.
setNormalFields
(
normalFields
);
// String path = "d:\\transactionlog-20200925.avro"; // avro文件存放目录
// DatumWriter<TransactionLog> logDatumWriter = new SpecificDatumWriter<>(TransactionLog.class);
// DataFileWriter<TransactionLog> dataFileWriter = new DataFileWriter<>(logDatumWriter);
// dataFileWriter.create(transactionLog.getSchema(), new File(path));
// // 把生成的对象写入到avro文件
// dataFileWriter.append(transactionLog);
// dataFileWriter.append(transactionLog);
// dataFileWriter.append(transactionLog);
// dataFileWriter.close();
/**
* 序列化
*/
byte
[]
kcbp_biz_logs
=
AvroSerializerFactory
.
getLogAvroSerializer
().
serializingLog
(
"kcbp_biz_log"
,
"2020-09-18T13:59:53.000+08:00"
,
"d:\\\\kcbp\\\\log\\\\run\\\\20200918\\\\runlog23.log"
,
"165683111"
,
dimensions
,
measures
,
normalFields
);
// FileOutputStream fos = null;
// try {
// fos = new FileOutputStream("d:\\transactionlog-20200929.avro");
// } catch (FileNotFoundException e) {
// e.printStackTrace();
// }
// try {
// fos.write(kcbp_biz_logs,0,kcbp_biz_logs.length);
// fos.flush();
// fos.close();
// } catch (IOException e) {
// e.printStackTrace();
// }
/**
* 反序列化
*/
// File file = new File("d:\\zork\\part-0-0.avro");
File
file
=
new
File
(
"c:\\part-0-0.avro"
);
// File file = new File("d:\\hdfs-transactionlog-20200929.avro");
byte
[]
byteBuffer
=
new
byte
[(
int
)
file
.
length
()];
FileInputStream
fileInputStream
=
null
;
try
{
// fileInputStream = new FileInputStream("d:\\zork\\part-0-0.avro");
fileInputStream
=
new
FileInputStream
(
"c:\\part-0-0.avro"
);
// fileInputStream = new FileInputStream("d:\\hdfs-transactionlog-20200929.avro");
}
catch
(
FileNotFoundException
e
)
{
e
.
printStackTrace
();
}
try
{
fileInputStream
.
read
(
byteBuffer
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
GenericRecord
genericRecord
=
AvroDeserializerFactory
.
getLogsDeserializer
().
deserializing
(
byteBuffer
);
System
.
out
.
println
(
genericRecord
);
// 读取avro文件,反序列化
// DatumReader<TransactionLog> reader = new SpecificDatumReader<TransactionLog>(TransactionLog.class);
// DataFileReader<TransactionLog> dataFileReader = new DataFileReader<TransactionLog>(new File("d:\\transactionlog-20200925.avro"), reader);
//// DataFileReader<TransactionLog> dataFileReader = new DataFileReader<TransactionLog>(new File("D:\\test.avro"), reader);
// TransactionLog transactionLogRead = null;
// while (dataFileReader.hasNext()) {
// transactionLogRead = dataFileReader.next();
// System.out.println(transactionLogRead);
// }
}
}
src/main/java/com/zorkdata/datamask/util/MaskUtil.java
0 → 100644
View file @
01a8a8e5
package
com.zorkdata.datamask.util
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.regex.Matcher
;
import
java.util.regex.Pattern
;
/**
* Description :
*
* @author : wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date : Create in 2020/9/23 9:30
* RegularExpression
*/
public
class
MaskUtil
{
/**
* 姓名正则
*/
static
Pattern
namePattern
=
Pattern
.
compile
(
"^([\\u4e00-\u9fa5]{1,20}|[a-zA-Z\\.\\s]{1,20})$"
);
/**
* 手机号正则
*/
static
Pattern
mobilePattern
=
Pattern
.
compile
(
"^((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))\\d{8}$"
);
/**
* 电话号码正则
*/
static
Pattern
phonePattern
=
Pattern
.
compile
(
"^(\\d{3,4}-)?\\d{6,8}$"
);
/**
* 邮箱正则
*/
static
Pattern
emailPattern
=
Pattern
.
compile
(
"^\\w+([-+.]\\w+)*@\\w+([-.]\\w+)*\\.\\w+([-.]\\w+)*$"
);
/**
* 身份证号码(15位)正则
*/
// static Pattern idPattern15 = Pattern.compile("\\d{17}[0-9Xx]|\\d{15}");
static
Pattern
idPattern15
=
Pattern
.
compile
(
"^[1-9]\\d{7}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}$"
);
/**
* 身份证号码(18位)正则
*/
static
Pattern
idPattern18
=
Pattern
.
compile
(
"^[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}([0-9Xx])$"
);
/**
* 家庭住址正则
*/
static
Pattern
addressPattern
=
Pattern
.
compile
(
"^([\\u4E00-\\u9FA5A-Za-z0-9_]+(省|市|区|县|道|路|街|号)){2,}$"
);
/**
* ip地址正则
*/
// static Pattern ipPattern = Pattern.compile("^((\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])\\.){3}(\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])$");
static
Pattern
ipPattern
=
Pattern
.
compile
(
"((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)"
);
static
Pattern
macPattern
=
Pattern
.
compile
(
"([A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}"
);
static
List
<
Pattern
>
patterns
=
new
ArrayList
<
Pattern
>(){{
add
(
namePattern
);
add
(
mobilePattern
);
add
(
phonePattern
);
add
(
emailPattern
);
add
(
idPattern15
);
add
(
idPattern18
);
add
(
addressPattern
);
add
(
ipPattern
);
add
(
macPattern
);
}};
public
static
Map
mask
(
Map
map
){
map
.
forEach
((
k
,
v
)
->
{
String
value
=
v
.
toString
();
for
(
Pattern
pattern:
patterns
){
Matcher
matcher
=
pattern
.
matcher
(
v
.
toString
());
if
(
matcher
.
matches
()){
String
replaceStr
=
""
;
for
(
int
i
=
0
;
i
<
matcher
.
group
().
length
();
i
++){
replaceStr
=
replaceStr
.
concat
(
"*"
);
}
System
.
out
.
println
(
replaceStr
);
value
=
value
.
replace
(
matcher
.
group
(),
replaceStr
);
}
}
map
.
put
(
k
,
value
);
});
return
map
;
}
public
static
void
main
(
String
[]
args
)
{
Map
map
=
new
HashMap
();
map
.
put
(
"姓名"
,
"王海鹰"
);
map
.
put
(
"身份证号"
,
"372925199008075158"
);
map
.
put
(
"手机号"
,
"15000101879"
);
map
.
put
(
"电话"
,
"021-61341606"
);
map
.
put
(
"邮箱"
,
"wanghaiying@zork.com.cn"
);
map
.
put
(
"住址"
,
"上海市浦东新区碧波路690号"
);
map
.
put
(
"ip地址"
,
"192.168.70.2"
);
map
.
put
(
"mac地址"
,
"3c-78-43-25-80-bd"
);
System
.
out
.
println
(
mask
(
map
));
// String mobile = "15000101879";
//
// Pattern pattern = Pattern.compile("(13\\d|14[579]|15[^4\\D]|17[^49\\D]|18\\d)\\d{8}");
// Matcher m = pattern.matcher(mobile);
////
// System.out.println(m.matches());
// if(m.matches()){
// System.out.println(m.group());
//
// String replaceStr = "";
// for(int i=0; i < m.group().length(); i++){
// replaceStr = replaceStr.concat("*");
// }
// System.out.println(replaceStr);
// mobile = mobile.replaceAll(m.group(), replaceStr);
// System.out.println(mobile);
// }
}
}
src/main/java/com/zorkdata/datamask/util/avro/AvroDeserializer.java
0 → 100644
View file @
01a8a8e5
package
com.zorkdata.datamask.util.avro
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
org.apache.avro.Schema
;
import
org.apache.avro.generic.GenericData
;
import
org.apache.avro.generic.GenericDatumReader
;
import
org.apache.avro.generic.GenericRecord
;
import
org.apache.avro.io.DatumReader
;
import
org.apache.avro.io.Decoder
;
import
org.apache.avro.io.DecoderFactory
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author xiesen
* @Description 反序列化
* @Email xiesen310@163.com
* @Date 2020/6/28 9:31
*/
public
class
AvroDeserializer
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
AvroDeserializer
.
class
);
public
JSONObject
jsonObject
;
public
JSONArray
jsonArray
;
public
Schema
schema
;
public
String
[]
keys
;
public
AvroDeserializer
(
String
schema
)
{
getKeysFromjson
(
schema
);
}
/**
* @param schema:Avro序列化所使用的schema
* @return void 返回类型
* @throws
* @Title: getKeysFromjson
* @Description:用于获取Avro的keys
*/
void
getKeysFromjson
(
String
schema
)
{
this
.
jsonObject
=
JSONObject
.
parseObject
(
schema
);
this
.
schema
=
new
Schema
.
Parser
().
parse
(
schema
);
this
.
jsonArray
=
this
.
jsonObject
.
getJSONArray
(
"fields"
);
this
.
keys
=
new
String
[
this
.
jsonArray
.
size
()];
for
(
int
i
=
0
;
i
<
this
.
jsonArray
.
size
();
i
++)
{
this
.
keys
[
i
]
=
this
.
jsonArray
.
getJSONObject
(
i
).
get
(
"name"
).
toString
();
}
}
/**
* @param body 参数:byte[] body:kafka消息。
* @param @return 设定文件
* @return String 返回类型
* @throws
* @Title: deserializing
* @Description: 用于Avro的反序列化。
*/
public
GenericRecord
deserializing
(
byte
[]
body
)
{
DatumReader
<
GenericData
.
Record
>
datumReader
=
new
GenericDatumReader
<
GenericData
.
Record
>(
this
.
schema
);
Decoder
decoder
=
DecoderFactory
.
get
().
binaryDecoder
(
body
,
null
);
GenericData
.
Record
result
=
null
;
try
{
result
=
datumReader
.
read
(
null
,
decoder
);
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
String
.
format
(
"error Avro反序列化"
),
e
);
}
return
result
;
}
}
src/main/java/com/zorkdata/datamask/util/avro/AvroDeserializerFactory.java
0 → 100644
View file @
01a8a8e5
package
com.zorkdata.datamask.util.avro
;
/**
* @author xiesen
* @Description 反序列化工厂类
* @Email xiesen310@163.com
* @Date 2020/6/28 9:31
*/
public
class
AvroDeserializerFactory
{
private
static
AvroDeserializer
logs
=
null
;
private
static
AvroDeserializer
metrics
=
null
;
public
static
void
init
()
{
logs
=
null
;
metrics
=
null
;
}
/**
* getLogsDeserializer
*
* @return
*/
public
static
AvroDeserializer
getLogsDeserializer
()
{
if
(
logs
==
null
)
{
logs
=
new
AvroDeserializer
(
LogAvroMacroDef
.
metadata
);
}
return
logs
;
}
/**
* getLogsDeserializer
*
* @return
*/
// public static AvroDeserializer getMetricDeserializer() {
// if (metrics == null) {
// metrics = new AvroDeserializer(MetricAvroMacroDef.metadata);
// }
// return metrics;
// }
}
src/main/java/com/zorkdata/datamask/util/avro/AvroSerializer.java
0 → 100644
View file @
01a8a8e5
package
com.zorkdata.datamask.util.avro
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
org.apache.avro.Schema
;
import
org.apache.avro.generic.GenericData
;
import
org.apache.avro.generic.GenericDatumWriter
;
import
org.apache.avro.generic.GenericRecord
;
import
org.apache.avro.io.DatumWriter
;
import
org.apache.avro.io.Encoder
;
import
org.apache.avro.io.EncoderFactory
;
import
org.apache.avro.util.Utf8
;
import
java.io.ByteArrayOutputStream
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
/**
* @author xiesen
* @Description 序列化
* @Email xiesen310@163.com
* @Date 2020/6/28 9:32
*/
public
class
AvroSerializer
{
public
JSONObject
jsonObject
;
public
JSONArray
jsonArray
;
public
Schema
schema
;
public
List
<
String
>
filedsArrayList
=
new
ArrayList
<
String
>();
public
AvroSerializer
(
String
schema
)
{
getKeysFromjson
(
schema
);
}
/**
* @param schema :Avro序列化所使用的schema
* @return void 返回类型
* @throws
* @Title: getKeysFromjson
* @Description:用于获取Avro的keys
*/
void
getKeysFromjson
(
String
schema
)
{
this
.
jsonObject
=
JSONObject
.
parseObject
(
schema
);
this
.
schema
=
new
Schema
.
Parser
().
parse
(
schema
);
this
.
jsonArray
=
this
.
jsonObject
.
getJSONArray
(
"fields"
);
if
(
filedsArrayList
!=
null
&&
filedsArrayList
.
size
()
>
0
)
{
filedsArrayList
.
clear
();
}
for
(
int
i
=
0
;
i
<
this
.
jsonArray
.
size
();
i
++)
{
filedsArrayList
.
add
(
this
.
jsonArray
.
getJSONObject
(
i
).
get
(
"name"
).
toString
());
}
}
/**
* @param temtuple
* @return byte[]
* @Description: 用于Avro的序列化。
*/
private
synchronized
byte
[]
serializing
(
List
<
String
>
temtuple
)
{
byte
[]
byteArray
=
null
;
GenericRecord
genericRecord
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到genericRecord中
for
(
int
i
=
0
;
i
<
filedsArrayList
.
size
();
i
++)
{
genericRecord
.
put
(
filedsArrayList
.
get
(
i
),
temtuple
.
get
(
i
));
}
ByteArrayOutputStream
byteArrayOutputStream
=
new
ByteArrayOutputStream
();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter
<
GenericRecord
>
datumWriter
=
new
GenericDatumWriter
<
GenericRecord
>(
this
.
schema
);
// 然后由Encoder写到数据流
Encoder
encoder
=
EncoderFactory
.
get
().
binaryEncoder
(
byteArrayOutputStream
,
null
);
try
{
datumWriter
.
write
(
genericRecord
,
encoder
);
encoder
.
flush
();
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"序列化失败 "
+
e
);
}
finally
{
if
(
byteArrayOutputStream
!=
null
)
{
try
{
byteArrayOutputStream
.
close
();
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
}
}
try
{
byteArray
=
byteArrayOutputStream
.
toByteArray
();
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
return
byteArray
;
}
/**
* 序列化json串
*
* @param json
* @return
*/
public
synchronized
byte
[]
serializing
(
String
json
)
{
byte
[]
byteArray
=
null
;
JSONObject
jsonObject
=
(
JSONObject
)
JSONObject
.
parse
(
json
);
// new TypeReference<Object>() {}
GenericRecord
genericRecord
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到genericRecord中
for
(
int
i
=
0
;
i
<
filedsArrayList
.
size
();
i
++)
{
genericRecord
.
put
(
filedsArrayList
.
get
(
i
),
new
Utf8
(
String
.
valueOf
(
jsonObject
.
get
(
filedsArrayList
.
get
(
i
)))));
}
ByteArrayOutputStream
byteArrayOutputStream
=
new
ByteArrayOutputStream
();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter
<
GenericRecord
>
datumWriter
=
new
GenericDatumWriter
<
GenericRecord
>(
this
.
schema
);
// 然后由Encoder写到数据流。
Encoder
encoder
=
EncoderFactory
.
get
().
binaryEncoder
(
byteArrayOutputStream
,
null
);
try
{
datumWriter
.
write
(
genericRecord
,
encoder
);
encoder
.
flush
();
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
finally
{
if
(
byteArrayOutputStream
!=
null
)
{
try
{
byteArrayOutputStream
.
close
();
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
}
}
try
{
byteArray
=
byteArrayOutputStream
.
toByteArray
();
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
return
byteArray
;
}
/**
* 序列化json对象
*
* @param jsonObject
* @return
*/
private
synchronized
byte
[]
serializing
(
JSONObject
jsonObject
)
{
byte
[]
byteArray
=
null
;
GenericRecord
genericRecord
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到datum中
for
(
int
i
=
0
;
i
<
filedsArrayList
.
size
();
i
++)
{
genericRecord
.
put
(
filedsArrayList
.
get
(
i
),
jsonObject
.
get
(
filedsArrayList
.
get
(
i
)));
}
ByteArrayOutputStream
byteArrayOutputStream
=
new
ByteArrayOutputStream
();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter
<
GenericRecord
>
datumWriter
=
new
GenericDatumWriter
<
GenericRecord
>(
this
.
schema
);
// 然后由Encoder写到数据流。
Encoder
encoder
=
EncoderFactory
.
get
().
binaryEncoder
(
byteArrayOutputStream
,
null
);
try
{
datumWriter
.
write
(
genericRecord
,
encoder
);
encoder
.
flush
();
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
finally
{
if
(
byteArrayOutputStream
!=
null
)
{
try
{
byteArrayOutputStream
.
close
();
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
}
}
try
{
byteArray
=
byteArrayOutputStream
.
toByteArray
();
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
return
byteArray
;
}
/**
* 序列化对象
*/
public
synchronized
byte
[]
serializing
(
GenericRecord
datum
)
{
byte
[]
byteArray
=
null
;
ByteArrayOutputStream
byteArrayOutputStream
=
new
ByteArrayOutputStream
();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter
<
GenericRecord
>
datumWriter
=
new
GenericDatumWriter
<
GenericRecord
>(
this
.
schema
);
// 然后由Encoder写到数据流。
Encoder
encoder
=
EncoderFactory
.
get
().
binaryEncoder
(
byteArrayOutputStream
,
null
);
try
{
datumWriter
.
write
(
datum
,
encoder
);
encoder
.
flush
();
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
finally
{
if
(
byteArrayOutputStream
!=
null
)
{
try
{
byteArrayOutputStream
.
close
();
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
}
}
try
{
byteArray
=
byteArrayOutputStream
.
toByteArray
();
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
// GenericRecord s = AvroDeserializerFactory.getTopicmetadataDeserializer().deserializing(returnstr);
return
byteArray
;
}
/**
* 序列化日志
*/
public
synchronized
byte
[]
serializingLog
(
String
logTypeName
,
String
timestamp
,
String
source
,
String
offset
,
Map
<
String
,
String
>
dimensions
,
Map
<
String
,
Double
>
metrics
,
Map
<
String
,
String
>
normalFields
)
{
GenericRecord
datum
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到datum中
datum
.
put
(
0
,
logTypeName
);
datum
.
put
(
1
,
timestamp
);
datum
.
put
(
2
,
source
);
datum
.
put
(
3
,
offset
);
datum
.
put
(
4
,
dimensions
);
datum
.
put
(
5
,
metrics
);
datum
.
put
(
6
,
normalFields
);
return
serializing
(
datum
);
}
/**
* 序列化指标
*/
public
synchronized
byte
[]
serializingMetric
(
String
metricSetName
,
String
timestamp
,
Map
<
String
,
String
>
dimensions
,
Map
<
String
,
Double
>
metrics
)
{
GenericRecord
datum
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到datum中
datum
.
put
(
0
,
metricSetName
);
datum
.
put
(
1
,
timestamp
);
datum
.
put
(
2
,
dimensions
);
datum
.
put
(
3
,
metrics
);
return
serializing
(
datum
);
}
private
synchronized
byte
[]
serializing
(
GenericRecord
genericRecord
,
String
key
[])
{
byte
[]
byteArray
=
null
;
GenericRecord
datum
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到datum中
for
(
int
i
=
0
;
i
<
filedsArrayList
.
size
();
i
++)
{
datum
.
put
(
filedsArrayList
.
get
(
i
),
new
Utf8
(
String
.
valueOf
(
genericRecord
.
get
(
key
[
i
]))));
}
ByteArrayOutputStream
out
=
new
ByteArrayOutputStream
();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter
<
GenericRecord
>
datumWriter
=
new
GenericDatumWriter
<
GenericRecord
>(
this
.
schema
);
// 然后由Encoder写到数据流。
Encoder
encoder
=
EncoderFactory
.
get
().
binaryEncoder
(
out
,
null
);
try
{
datumWriter
.
write
(
datum
,
encoder
);
encoder
.
flush
();
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
finally
{
if
(
out
!=
null
)
{
try
{
out
.
close
();
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
}
}
try
{
byteArray
=
out
.
toByteArray
();
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"序列化失败"
+
e
);
}
return
byteArray
;
}
}
src/main/java/com/zorkdata/datamask/util/avro/AvroSerializerFactory.java
0 → 100644
View file @
01a8a8e5
package
com.zorkdata.datamask.util.avro
;
/**
* @author xiesen
* @Description 序列化工厂类
* @Email xiesen310@163.com
* @Date 2020/6/28 9:32
*/
@SuppressWarnings
(
"all"
)
public
class
AvroSerializerFactory
{
private
static
AvroSerializer
metricMetadata
=
null
;
private
static
AvroSerializer
logMetadata
=
null
;
public
static
AvroSerializer
getLogAvroSerializer
()
{
if
(
logMetadata
==
null
)
{
logMetadata
=
new
AvroSerializer
(
LogAvroMacroDef
.
metadata
);
}
return
logMetadata
;
}
// public static AvroSerializer getMetricAvroSerializer() {
// if (metricMetadata == null) {
// metricMetadata = new AvroSerializer(MetricAvroMacroDef.metadata);
// }
// return metricMetadata;
// }
}
src/main/java/com/zorkdata/datamask/util/avro/LogAvroMacroDef.java
0 → 100644
View file @
01a8a8e5
package
com.zorkdata.datamask.util.avro
;
/**
* @author xiesen
* @Description 日志集 schema 定义
* @Email xiesen310@163.com
* @Date 2020/6/28 9:33
*/
public
class
LogAvroMacroDef
{
public
static
String
metadata
=
"{\n"
+
" \"namespace\": \"com.zork.logs\",\n"
+
" \"type\": \"record\",\n"
+
" \"name\": \"logs\",\n"
+
" \"fields\": [\n"
+
" {\n"
+
" \"name\": \"logTypeName\",\n"
+
" \"type\": [\n"
+
" \"string\",\n"
+
" \"null\"\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"timestamp\",\n"
+
" \"type\": [\n"
+
" \"string\",\n"
+
" \"null\"\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"source\",\n"
+
" \"type\": [\n"
+
" \"string\",\n"
+
" \"null\"\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"offset\",\n"
+
" \"type\": [\n"
+
" \"string\",\n"
+
" \"null\"\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"dimensions\",\n"
+
" \"type\": [\n"
+
" \"null\",\n"
+
" {\n"
+
" \"type\": \"map\",\n"
+
" \"values\": \"string\"\n"
+
" }\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"measures\",\n"
+
" \"type\": [\n"
+
" \"null\",\n"
+
" {\n"
+
" \"type\": \"map\",\n"
+
" \"values\": \"double\"\n"
+
" }\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"normalFields\",\n"
+
" \"type\": [\n"
+
" \"null\",\n"
+
" {\n"
+
" \"type\": \"map\",\n"
+
" \"values\": \"string\"\n"
+
" }\n"
+
" ]\n"
+
" }\n"
+
" ]\n"
+
"}"
;
}
src/main/resources/log4j.properties
0 → 100644
View file @
01a8a8e5
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
log4j.rootLogger
=
INFO, console
log4j.appender.console
=
org.apache.log4j.ConsoleAppender
log4j.appender.console.layout
=
org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern
=
%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment