Commit cee08142 authored by 王海鹰's avatar 王海鹰

Merge branch 'master' into 'master'

修改项目结构,适配自动化打包

See merge request !2
parents 7582bbaa 3b9a2c8b
stages:
- sonar_push
- sonar_scan
- maven_clean
- build_release
before_script:
- 'echo "Build fold : $PWD"'
- 'echo "PROJECT NAME : $CI_PROJECT_NAME"'
- 'echo "PROJECT ID : $CI_PROJECT_ID"'
- 'echo "PROJECT URL : $CI_PROJECT_URL"'
- 'echo "ENVIRONMENT URL : $CI_ENVIRONMENT_URL"'
- 'export COMMIT_ID_SHORT=${CI_COMMIT_SHA:0:8}'
- 'export PATH=$PATH:/usr/bin'
sonar_push:
stage: sonar_push
only:
- master
when: manual
image: hub.zorkdata.com/devops/sonar_runner:00be5a71
allow_failure: true
script:
- FOLD_LIST=`bash /cicd-script/sonarqube/get_fold_list.sh "src"`
- 'bash /cicd-script/sonar_push.sh "-Dsonar.sources=`echo $FOLD_LIST`"'
sonar_scan:
stage: sonar_scan
only:
- merge_requests
- branches
- master
when: always
image: hub.zorkdata.com/devops/sonar_runner:00be5a71
allow_failure: false
script:
- FOLD_LIST=`bash /cicd-script/sonarqube/get_fold_list.sh "src"`
- 'bash /cicd-script/sonar_review.sh "-Dsonar.sources=`echo $FOLD_LIST`"'
maven_clean:
stage: maven_clean
when: manual
image: hub.zorkdata.com/devops/maven3.6-jdk1.8:9b09951e
allow_failure: true
script:
- 'bash /cicd-script/maven/maven_clean.sh'
build_release:
stage: build_release
only:
- tags
when: always
image: hub.zorkdata.com/devops/maven3.6-jdk1.8:9b09951e
allow_failure: false
script:
- 'bash /cicd-script/maven/maven_clean.sh'
- 'bash /cicd-script/maven/maven_deploy.sh'
- 'bash /cicd-script/maven/tag_release.sh transactionlogmask'
<!--
~ Copyright (c) 2020. log-merge
打包描述文件,以下内容不要修改
-->
<assembly>
<id>bin</id>
<formats>
<format>tar.gz</format>
</formats>
<fileSets>
<!-- ============ 以下内容不要修改 ============ -->
<fileSet>
<directory>target/transaction-log-deploy/bin</directory>
<includes>
<include>*.sh</include>
</includes>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>target/transaction-log-deploy/conf</directory>
<outputDirectory>conf</outputDirectory>
</fileSet>
<fileSet>
<directory>target/transaction-log-deploy/lib</directory>
<outputDirectory>lib</outputDirectory>
</fileSet>
</fileSets>
</assembly>
\ No newline at end of file
......@@ -17,284 +17,408 @@ specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zorkdata.datamask</groupId>
<artifactId>transactionLogMask</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<groupId>com.zorkdata.datamask</groupId>
<artifactId>transactionLogMask</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Guotai Transaction Log Mask Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.8.1</flink.version>
<!-- <flink.version>1.11.2</flink.version>-->
<java.version>1.8</java.version>
<kafka.version>0.8</kafka.version>
<hadoop.version>2.6.5</hadoop.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<name>Guotai Transaction Log Mask Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.8.1</flink.version>
<java.version>1.8</java.version>
<kafka.version>0.8</kafka.version>
<hadoop.version>2.6.0</hadoop.version>
<avro.version>1.8.2</avro.version>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<repositories>
<!-- Using Local Nexus Maven Repository -->
<repository>
<id>nexus</id>
<name>Nexus Repository</name>
<url>http://nexus.zorkdata.com/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>oss</id>
<name>oss</name>
<url>https://oss.sonatype.org/content/groups/public</url>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>nexus</id>
<name>Nexus Repository</name>
<url>http://nexus.zorkdata.com/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<version>2.6.5-10.0</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<distributionManagement>
<repository>
<id>releases</id>
<name>Internal Releases</name>
<url>http://nexus.zorkdata.com/repository/releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<name>Internal Snapshots</name>
<url>http://nexus.zorkdata.com/repository/snapshots/</url>
</snapshotRepository>
</distributionManagement>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-filesystem -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.12</artifactId>
<version>1.8.3</version>
</dependency>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<version>2.6.5-10.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
-->
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-tools -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-tools</artifactId>
<version>1.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<!-- <scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<!-- <scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-tools</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.16</version>
</dependency>
</dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.16</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.zorkdata.datamask.TransactionLogMask</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- 资源整合 -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>2.6</version>
<executions>
<execution>
<id>copy-config</id>
<phase>package</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
<outputDirectory>${basedir}/target/transaction-log-deploy/conf
</outputDirectory>
<resources>
<resource>
<directory>${basedir}/target/classes</directory>
<excludes>
<exclude>**/*.java</exclude>
<exclude>**/*.class</exclude>
<exclude>**/*.bat</exclude>
<exclude>**/*.sh</exclude>
<exclude>**/*.conf</exclude>
</excludes>
</resource>
</resources>
</configuration>
</execution>
<execution>
<id>copy-script</id>
<phase>package</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<encoding>UTF-8</encoding>
<outputDirectory>${basedir}/target/transaction-log-deploy/bin
</outputDirectory>
<resources>
<resource>
<directory>${basedir}/target/classes/bin</directory>
<filtering>true</filtering>
<includes>
<include>*.bat</include>
<include>*.sh</include>
<include>*.py</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
<execution>
<id>copy-library</id>
<phase>package</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/transaction-log-deploy/lib
</outputDirectory>
<resources>
<resource>
<directory>${basedir}/target</directory>
<includes>
<include>${project.artifactId}-${project.version}.jar
</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<!-- 打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptors>
<descriptor>assemblies/assembly.xml</descriptor>
</descriptors>
<finalName>transactionlogmask</finalName>
<tarLongFileMode>posix</tarLongFileMode>
</configuration>
</execution>
</executions>
</plugin>
<!-- 静态代码扫描 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-pmd-plugin</artifactId>
<version>3.13.0</version>
<configuration>
<rulesets>
<ruleset>rulesets/java/ali-comment.xml</ruleset>
<ruleset>rulesets/java/ali-concurrent.xml</ruleset>
<ruleset>rulesets/java/ali-constant.xml</ruleset>
<ruleset>rulesets/java/ali-exception.xml</ruleset>
<ruleset>rulesets/java/ali-flowcontrol.xml</ruleset>
<ruleset>rulesets/java/ali-naming.xml</ruleset>
<ruleset>rulesets/java/ali-oop.xml</ruleset>
<ruleset>rulesets/java/ali-orm.xml</ruleset>
<ruleset>rulesets/java/ali-other.xml</ruleset>
<ruleset>rulesets/java/ali-set.xml</ruleset>
<ruleset>rulesets/vm/ali-other.xml</ruleset>
</rulesets>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.zorkdata.datamask.TransactionLogMask</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<printFailingErrors>true</printFailingErrors>
<excludeFromFailureFile>exclude-pmd.properties</excludeFromFailureFile>
</configuration>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>com.alibaba.p3c</groupId>
<artifactId>p3c-pmd</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-pmd-plugin</artifactId>
<version>3.13.0</version>
<type>maven-plugin</type>
</dependency>
</dependencies>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- &lt;!&ndash; This improves the out-of-the-box experience in Eclipse by resolving some warnings. &ndash;&gt;-->
<!-- <plugin>-->
<!-- <groupId>org.eclipse.m2e</groupId>-->
<!-- <artifactId>lifecycle-mapping</artifactId>-->
<!-- <version>1.0.0</version>-->
<!-- <configuration>-->
<!-- <lifecycleMappingMetadata>-->
<!-- <pluginExecutions>-->
<!-- <pluginExecution>-->
<!-- <pluginExecutionFilter>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-shade-plugin</artifactId>-->
<!-- <versionRange>[3.0.0,)</versionRange>-->
<!-- <goals>-->
<!-- <goal>shade</goal>-->
<!-- </goals>-->
<!-- </pluginExecutionFilter>-->
<!-- <action>-->
<!-- <ignore/>-->
<!-- </action>-->
<!-- </pluginExecution>-->
<!-- <pluginExecution>-->
<!-- <pluginExecutionFilter>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-compiler-plugin</artifactId>-->
<!-- <versionRange>[3.1,)</versionRange>-->
<!-- <goals>-->
<!-- <goal>testCompile</goal>-->
<!-- <goal>compile</goal>-->
<!-- </goals>-->
<!-- </pluginExecutionFilter>-->
<!-- <action>-->
<!-- <ignore/>-->
<!-- </action>-->
<!-- </pluginExecution>-->
<!-- </pluginExecutions>-->
<!-- </lifecycleMappingMetadata>-->
<!-- </configuration>-->
<!-- </plugin>-->
</plugins>
</pluginManagement>
</build>
</build>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
\ No newline at end of file
package com.zorkdata.datamask;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.zorkdata.datamask.constants.Constants;
import com.zorkdata.datamask.domain.LogData;
import com.zorkdata.datamask.domain.TransactionLog;
import com.zorkdata.datamask.util.MaskUtil;
import com.zorkdata.datamask.constant.ParamConstants;
import com.zorkdata.datamask.hadoop.HadoopMask;
import com.zorkdata.datamask.kafka.KafkaMask;
import com.zorkdata.datamask.util.ZorkParameterUtil;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.ZoneId;
import java.util.*;
/**
......@@ -53,238 +17,25 @@ import java.util.*;
* Date : Create in 2020/9/18 17:35
*/
public class TransactionLogMask {
private static Logger LOG = LoggerFactory.getLogger(TransactionLogMask.class);
private static String source = "hdfs";
private static String hdfsSrc;
private static String hdfsDest;
private static String core;
private static String date;
private static Long startTime;
private static Long endTime;
private static String namePattern = "";
private static String mobilePattern = "";
private static String phonePattern = "";
private static String emailPattern = "";
private static Logger logger = LoggerFactory.getLogger(TransactionLogMask.class);
public static final int PARAM_LENGTH = 2;
public static void main(String[] args) throws Exception {
if ("hdfs".equals(source)) {
maskHdfsLog(args);
} else if ("kafka".equals(source)) {
maskKafkaMsg(args);
}
}
/**
* 初始化配置文件
*
* @param conf
*/
private static void initConf(Map conf) {
source = String.valueOf(conf.get(Constants.SOURCE)).trim();
hdfsSrc = String.valueOf(conf.get(Constants.HDFS_SRC)).trim();
hdfsDest = String.valueOf(conf.get(Constants.HDFS_DEST)).trim();
core = String.valueOf(conf.get(Constants.CORE)).trim();
date = String.valueOf(conf.get(Constants.DATE)).trim();
startTime = Long.parseLong(String.valueOf(conf.get(Constants.START_TIME)).trim());
endTime = Long.parseLong(String.valueOf(conf.get(Constants.END_TIME)).trim());
}
/**
* hdfs日志文件脱敏
*
* @param args 请求参数
* @return void
*/
public static void maskHdfsLog(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
JobConf jobConf = new JobConf();
jobConf.set("avro.output.schema", TransactionLog.SCHEMA$.toString(true));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Map<String, String> conf = ZorkParameterUtil.readParameter(args);
LOG.info("配置文件: " + conf);
initConf(conf);
ParameterTool parameterTool = ParameterTool.fromMap(conf);
env.getConfig().setGlobalJobParameters(parameterTool);
List<String> logFiles = filterHdfsLogFiles(hdfsSrc, date, startTime, endTime);
for (String logFile : logFiles) {
/**
* 读取hdfs日志文件,avro反序列化处理
*/
HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<Object, Object>
(new AvroInputFormat(), Object.class, Object.class, jobConf);
AvroInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(logFile));
DataSource<Tuple2<Object, Object>> hdfsLogInput = env.createInput(hadoopInputFormat);
// hdfsLogInput.print();
/**
* 脱敏算子
*/
FlatMapOperator<Tuple2<Object, Object>, Object> maskFlatMapOperator = hdfsLogInput.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
@Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> collector) throws Exception {
LogData logData = JSON.parseObject(value.getField(0).toString(), new TypeReference<LogData>() {
});
//根据日志事件的核心信息做过滤
if (null != core && logData.getDimensions().get("hostname").indexOf("c9") > -1) {
//根据日志事件的timestamp做过滤
Long timestamp = utc2timestamp(logData.getTimestamp());
if (null != timestamp && timestamp > startTime && timestamp < endTime || Boolean.TRUE) {
Map maskResult = MaskUtil.mask(logData.getNormalFields());
logData.setNormalFields(maskResult);
collector.collect(logData);
}
}
}
});
// maskFlatMapOperator.print();
// 获取目标hdfs的输出目录
String logFileName = logFile.split("/")[logFile.split("/").length - 1];
String filePath = hdfsDest + logFileName.replace(".avro", "");
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(new AvroOutputFormat(), jobConf);
FileOutputFormat.setOutputPath(jobConf, new Path(filePath));
/**
* avro序列化算子
*/
maskFlatMapOperator.map(new MapFunction<Object, Tuple2<AvroWrapper<LogData>, NullWritable>>() {
@Override
public Tuple2<AvroWrapper<LogData>, NullWritable> map(Object value) throws Exception {
AvroKey<LogData> key = new AvroKey<LogData>((LogData) value);
Tuple2<AvroWrapper<LogData>, NullWritable> tupple = new Tuple2<AvroWrapper<LogData>, NullWritable>(key, NullWritable.get());
return tupple;
}
}).output(hadoopOutputFormat);
try {
env.execute("国泰交易日志脱敏job");
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* kafka消息数据脱敏
*
* @param args 请求参数
* @return void
*/
public static void maskKafkaMsg(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
ParameterTool params = ParameterTool.fromArgs(args);
String servers = params.get("servers");
String zookeeper = params.get("zookeeper");
String topic = params.get("topic");
String hdfsDest = params.get("hdfs-dest");
String core = params.get("core", "c1");
String date = params.get("date", sdf.format(new Date()));
String startTime = params.get("startTime");
String endTime = params.get("endTime");
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("zookeeper.connect", zookeeper);
props.put("group.id", "group1");
props.put("enable.auto.commit", false);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.records", 1000);
SingleOutputStreamOperator<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props)).setParallelism(1);
// TODO 根据date、startTime、endTime过滤
BucketingSink<String> hdfsSink = new BucketingSink<>(hdfsDest);
//创建一个按照时间创建目录的bucketer,默认是yyyy-MM-dd--HH,时区默认是美国时间。这里我都改了,一天创建一次目录,上海时间
hdfsSink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai")));
//设置每个文件的最大大小 ,默认是384M(1024 * 1024 * 384)
hdfsSink.setBatchSize(1024 * 1024 * 384);
//设置多少时间,就换一个文件写
hdfsSink.setBatchRolloverInterval(1000 * 60 * 60);
hdfsSink.setPendingSuffix("ccc");
hdfsSink.setInactiveBucketThreshold(60 * 1000L);
hdfsSink.setInactiveBucketCheckInterval(60 * 1000L);
hdfsSink.setAsyncTimeout(60 * 1000);
dataStreamSource.addSink(hdfsSink);
try {
env.execute("国泰交易日志脱敏job");
} catch (Exception e) {
e.printStackTrace();
if (args.length != PARAM_LENGTH) {
String error = "参数缺失,请输入配置文件,例如: --conf --conf /opt/TransactionLogMask/application.yml";
logger.error(error);
throw new RuntimeException(error);
}
}
Map<String, String> conf = ZorkParameterUtil.readParameter(args);
logger.info("配置文件: {}", JSON.toJSONString(conf));
String source = conf.get(ParamConstants.SOURCE);
/**
* 过滤hdfs日志文件
*
* @param hdfs hdfs地址
* @param date 日期
* @param startTime 起始时间
* @param endTime 结束时间
* @return hdfs文件列表
*/
private static List<String> filterHdfsLogFiles(String hdfs, String date, Long startTime, Long endTime) {
if (!hdfs.endsWith("/")) {
hdfs += "/";
}
String path = hdfs;
if (null != date) {
path = hdfs + date;
if (ParamConstants.HDFS.equals(source)) {
HadoopMask.maskHdfsLog(conf);
} else if (ParamConstants.KAFKA.equals(source)) {
KafkaMask.maskKafkaMsg(conf);
}
Configuration conf = new Configuration();
List<String> logFiles = new ArrayList<>();
try {
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(new URI("hdfs://cdh-2:8020/"), conf, "hdfs");
} catch (InterruptedException e) {
e.printStackTrace();
}
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path(path), false);
while (locatedFileStatusRemoteIterator.hasNext()) {
LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
long modificationTime = next.getModificationTime();
// 根据文件的修改时间做过滤,获取用户指定时间段内的文件
if (modificationTime > startTime && modificationTime < endTime) {
Path hdfsFilePath = next.getPath();
logFiles.add(hdfsFilePath.toString());
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
return logFiles;
}
/**
* UTC时间转
*
* @param utcTime UTC时间
* @return unix时间戳
*/
public static Long utc2timestamp(String utcTime) {
SimpleDateFormat utcFormater = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
utcFormater.setTimeZone(TimeZone.getTimeZone("asia/shanghai"));//时区定义并进行时间获取
Date gpsUTCDate = null;
try {
gpsUTCDate = utcFormater.parse(utcTime);
} catch (ParseException e) {
System.out.println("时间戳格式转换异常:" + utcTime + e.getMessage());
return null;
}
return gpsUTCDate.getTime();
}
}
package com.zorkdata.datamask.constants;
package com.zorkdata.datamask.constant;
import java.util.Date;
/**
* Description :
......@@ -6,7 +8,8 @@ package com.zorkdata.datamask.constants;
* @author : wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date : Create in 2020/10/20 15:32
*/
public interface Constants {
public interface ParamConstants {
String SOURCE = "source";
String HDFS_SRC = "hdfs_src";
String HDFS_DEST = "hdfs_dest";
......@@ -15,6 +18,12 @@ public interface Constants {
String START_TIME = "start_time";
String END_TIME = "end_time";
String SERVERS = "servers";
String ZOOKEEPER = "zookeeper";
String TOPIC ="topic";
String HDFS ="hdfs";
String KAFKA ="kafka";
String NAME_REG_EXP = "name_reg_exp";
String MOBILE_REG_EXP = "mobile_reg_exp";
String PHONE_REG_EXP = "phone_reg_exp";
......
package com.zorkdata.datamask.constant;
/**
* @author 谢森
* @Description 常量定义
* @Email xiesen310@163.com
* @Date 2020/10/21 15:50
*/
public interface StrConstants {
String FILE_SEPARATOR = "/";
String AVRO_SUFFIX = ".avro";
String EMPTY_STR = "";
}
package com.zorkdata.datamask.domain;
import lombok.Data;
/**
* @author 谢森
* @Description 参数实体类
* @Email xiesen310@163.com
* @Date 2020/10/21 14:33
*/
@Data
public class HadoopParam {
private String source;
private String hdfsSrc;
private String hdfsDest;
private String core;
private String date;
private Long startTime;
private Long endTime;
public HadoopParam(String source, String hdfsSrc, String hdfsDest, String core, String date, Long startTime,
Long endTime) {
this.source = source;
this.hdfsSrc = hdfsSrc;
this.hdfsDest = hdfsDest;
this.core = core;
this.date = date;
this.startTime = startTime;
this.endTime = endTime;
}
}
package com.zorkdata.datamask.domain;
import lombok.Data;
/**
* @author 谢森
* @Description kafka 参数实体类
* @Email xiesen310@163.com
* @Date 2020/10/21 15:07
*/
@Data
public class KafkaParam {
private String servers;
private String zookeeper;
private String topic;
private String hdfsDest;
private String core;
private String date;
private Long startTime;
private Long endTime;
public KafkaParam(String servers, String zookeeper, String topic, String hdfsDest, String core, String date,
Long startTime, Long endTime) {
this.servers = servers;
this.zookeeper = zookeeper;
this.topic = topic;
this.hdfsDest = hdfsDest;
this.core = core;
this.date = date;
this.startTime = startTime;
this.endTime = endTime;
}
}
package com.zorkdata.datamask.hadoop;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.zorkdata.datamask.constant.StrConstants;
import com.zorkdata.datamask.domain.LogData;
import com.zorkdata.datamask.domain.HadoopParam;
import com.zorkdata.datamask.domain.TransactionLog;
import com.zorkdata.datamask.util.DateUtils;
import com.zorkdata.datamask.util.MaskUtil;
import com.zorkdata.datamask.util.ParamUtils;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author 谢森
* @Description hadoop 文件数据脱敏
* @Email xiesen310@163.com
* @Date 2020/10/21 14:29
*/
public class HadoopMask {
private static final Logger logger = LoggerFactory.getLogger(HadoopMask.class);
/**
* hdfs日志文件脱敏
*
* @param conf 请求参数
* @return void
*/
public static void maskHdfsLog(Map<String, String> conf) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
JobConf jobConf = new JobConf();
jobConf.set("avro.output.schema", TransactionLog.SCHEMA$.toString(true));
HadoopParam hadoopParam = ParamUtils.initHadoopConf(conf);
ParameterTool parameterTool = ParameterTool.fromMap(conf);
env.getConfig().setGlobalJobParameters(parameterTool);
List<String> logFiles = filterHdfsLogFiles(hadoopParam.getHdfsSrc(), hadoopParam.getDate(),
hadoopParam.getStartTime(), hadoopParam.getEndTime());
for (String logFile : logFiles) {
/**
* 读取hdfs日志文件,avro反序列化处理
*/
HadoopInputFormat<Object, Object> hadoopInputFormat = new HadoopInputFormat<Object, Object>
(new AvroInputFormat(), Object.class, Object.class, jobConf);
AvroInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(logFile));
DataSource<Tuple2<Object, Object>> hdfsLogInput = env.createInput(hadoopInputFormat);
/**
* 脱敏算子
*/
FlatMapOperator<Tuple2<Object, Object>, Object> maskFlatMapOperator =
hdfsLogInput.flatMap(new FlatMapFunction<Tuple2<Object, Object>, Object>() {
@Override
public void flatMap(Tuple2<Object, Object> value, Collector<Object> collector) throws Exception {
LogData logData = JSON.parseObject(value.getField(0).toString(),
new TypeReference<LogData>() {
});
//根据日志事件的核心信息做过滤
if (null != hadoopParam.getCore() && logData.getDimensions().get("hostname").indexOf("c9") > -1) {
//根据日志事件的timestamp做过滤
Long timestamp = DateUtils.utc2timestamp(logData.getTimestamp());
boolean flag = null != timestamp && timestamp > hadoopParam.getStartTime()
&& timestamp < hadoopParam.getEndTime() || Boolean.TRUE;
if (flag) {
Map maskResult = MaskUtil.mask(logData.getNormalFields());
logData.setNormalFields(maskResult);
collector.collect(logData);
}
}
}
});
// 获取目标hdfs的输出目录
String logFileName =
logFile.split(StrConstants.FILE_SEPARATOR)[logFile.split(StrConstants.FILE_SEPARATOR).length - 1];
String filePath = hadoopParam.getHdfsSrc() + logFileName.replace(StrConstants.AVRO_SUFFIX,
StrConstants.EMPTY_STR);
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(new AvroOutputFormat(), jobConf);
FileOutputFormat.setOutputPath(jobConf, new Path(filePath));
/**
* avro序列化算子
*/
maskFlatMapOperator.map(new MapFunction<Object, Tuple2<AvroWrapper<LogData>, NullWritable>>() {
@Override
public Tuple2<AvroWrapper<LogData>, NullWritable> map(Object value) throws Exception {
AvroKey<LogData> key = new AvroKey<LogData>((LogData) value);
Tuple2<AvroWrapper<LogData>, NullWritable> tupple = new Tuple2<AvroWrapper<LogData>,
NullWritable>(key, NullWritable.get());
return tupple;
}
}).output(hadoopOutputFormat);
try {
env.execute("国泰交易日志脱敏job");
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 过滤hdfs日志文件
*
* @param hdfs hdfs地址
* @param date 日期
* @param startTime 起始时间
* @param endTime 结束时间
* @return hdfs文件列表
*/
private static List<String> filterHdfsLogFiles(String hdfs, String date, Long startTime, Long endTime) {
if (!hdfs.endsWith(StrConstants.FILE_SEPARATOR)) {
hdfs += StrConstants.FILE_SEPARATOR;
}
String path = hdfs;
if (null != date) {
path = hdfs + date;
}
Configuration conf = new Configuration();
List<String> logFiles = new ArrayList<>();
try {
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(new URI("hdfs://cdh-2:8020/"), conf, "hdfs");
} catch (InterruptedException e) {
e.printStackTrace();
}
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path(path),
false);
while (locatedFileStatusRemoteIterator.hasNext()) {
LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
long modificationTime = next.getModificationTime();
// 根据文件的修改时间做过滤,获取用户指定时间段内的文件
if (modificationTime > startTime && modificationTime < endTime) {
Path hdfsFilePath = next.getPath();
logFiles.add(hdfsFilePath.toString());
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
return logFiles;
}
}
package com.zorkdata.datamask.kafka;
import com.zorkdata.datamask.domain.HadoopParam;
import com.zorkdata.datamask.domain.KafkaParam;
import com.zorkdata.datamask.util.ParamUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.text.SimpleDateFormat;
import java.time.ZoneId;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
/**
* @author 谢森
* @Description kafka 数据脱敏
* @Email xiesen310@163.com
* @Date 2020/10/21 14:51
*/
public class KafkaMask {
/**
* kafka消息数据脱敏
*
* @param conf 请求参数
* @return void
*/
public static void maskKafkaMsg(Map<String, String> conf) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
KafkaParam kafkaParam = ParamUtils.initKafkaConf(conf);
ParameterTool parameterTool = ParameterTool.fromMap(conf);
env.getConfig().setGlobalJobParameters(parameterTool);
Properties props = new Properties();
props.put("bootstrap.servers", kafkaParam.getServers());
props.put("zookeeper.connect", kafkaParam.getZookeeper());
props.put("group.id", "group1");
props.put("enable.auto.commit", false);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.records", 1000);
SingleOutputStreamOperator<String> dataStreamSource =
env.addSource(new FlinkKafkaConsumer<>(kafkaParam.getTopic(),
new SimpleStringSchema(), props)).setParallelism(1);
// TODO 根据date、startTime、endTime过滤
BucketingSink<String> hdfsSink = new BucketingSink<>(kafkaParam.getHdfsDest());
//创建一个按照时间创建目录的bucketer,默认是yyyy-MM-dd--HH,时区默认是美国时间。这里我都改了,一天创建一次目录,上海时间
hdfsSink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai")));
//设置每个文件的最大大小 ,默认是384M(1024 * 1024 * 384)
hdfsSink.setBatchSize(1024 * 1024 * 384);
//设置多少时间,就换一个文件写
hdfsSink.setBatchRolloverInterval(1000 * 60 * 60);
hdfsSink.setPendingSuffix("ccc");
hdfsSink.setInactiveBucketThreshold(60 * 1000L);
hdfsSink.setInactiveBucketCheckInterval(60 * 1000L);
hdfsSink.setAsyncTimeout(60 * 1000);
dataStreamSource.addSink(hdfsSink);
try {
env.execute("国泰交易日志脱敏job");
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.zorkdata.datamask.util;
import com.alibaba.fastjson.JSON;
import com.zorkdata.datamask.domain.LogData;
import com.zorkdata.datamask.domain.TransactionLog;
import com.zorkdata.datamask.util.avro.AvroDeserializer;
import com.zorkdata.datamask.util.avro.AvroDeserializerFactory;
import com.zorkdata.datamask.util.avro.AvroSerializerFactory;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import java.io.*;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.TimeZone;
/**
* Description :
* https://www.cnblogs.com/fillPv/p/5009737.html
* @author : wanghaiying (<a href="wanghaiying@zorkdata.com.cn">wanghaiying@zorkdata.com.cn</a>)
* Date : Create in 2020/9/23 4:43
*/
// java -jar avro-tools-1.10.0.jar compile schema log.avro .
//
public class AvroTest {
public static void main(String[] args) throws IOException {
// Avro序列化,写avro文件
// TransactionLog transactionLog = new TransactionLog();
// LogData transactionLog = new LogData();
// transactionLog.setLogTypeName("kcbp_biz_log");
// transactionLog.setTimestamp("2020-09-18T13:59:53.000+08:00");
// transactionLog.setSource("d:\\\\kcbp\\\\log\\\\run\\\\20200918\\\\runlog23.log");
// transactionLog.setOffset("165683111");
//
// HashMap dimensions = new HashMap() {{
// put("appsystem", "jzjy");
// put("appprogramname", "jzc9-kcbp1_9600");
// put("hostname", "jzc9-kcbp1");
// put("func", "");
// put("nodeid", "");
// put("operway", "W");
// }};
//// transactionLog.setDimensions(dimensions);
//
// HashMap measures = new HashMap<String, Double>() {{
// put("latence", 0.0);
// put("latency", 1.0);
// put("spendtime", 0.5);
// }};
//// transactionLog.setMeasures(measures);
//
// HashMap normalFields = new HashMap() {{
// put("indexTime", "2020-09-18T13:59:54.524+08:00");
// put("bsflag", "");
// put("productcode", "");
// put("developercode", "");
// put("fmillsecond", "");
// put("inputtype", "");
// put("logchecktime", "");
// put("message", "身份证号码:372925199008075158,地址:上海浦东新区张江高科碧波路690号,手机号:15000101879,邮箱:wanghaiying@zork.com.cn");
// put("end_logtime", "");
// put("smillsecond", "585606599");
// put("featurecode", "");
// put("orgid", "");
// put("authcode", "");
// put("collecttime", "2020-09-18T13:59:53.529+08:00");
// put("fundid", "");
// put("deserializerTime", "2020-09-18T13:59:53.671+08:00");
// put("messid", "0000011404342B32233DDCDA");
// put("custid", "");
// put("netputr", "");
// put("versioninfo", "");
// put("beg_logtime", "20200918-135953");
// put("authinfo", "");
// }};
// transactionLog.setNormalFields(normalFields);
// String path = "d:\\transactionlog-20200925.avro"; // avro文件存放目录
// DatumWriter<TransactionLog> logDatumWriter = new SpecificDatumWriter<>(TransactionLog.class);
// DataFileWriter<TransactionLog> dataFileWriter = new DataFileWriter<>(logDatumWriter);
// dataFileWriter.create(transactionLog.getSchema(), new File(path));
// // 把生成的对象写入到avro文件
// dataFileWriter.append(transactionLog);
// dataFileWriter.append(transactionLog);
// dataFileWriter.append(transactionLog);
// dataFileWriter.close();
/**
* 序列化
*/
// byte[] kcbp_biz_logs = AvroSerializerFactory.getLogAvroSerializer().serializingLog("kcbp_biz_log", "2020-09-18T13:59:53.000+08:00",
// "d:\\\\kcbp\\\\log\\\\run\\\\20200918\\\\runlog23.log", "165683111", dimensions, measures, normalFields);
// FileOutputStream fos = null;
// try {
// fos = new FileOutputStream("d:\\transactionlog-20201009.avro");
// } catch (FileNotFoundException e) {
// e.printStackTrace();
// }
// try {
// fos.write(kcbp_biz_logs,0,kcbp_biz_logs.length);
// fos.flush();
// fos.close();
// } catch (IOException e) {
// e.printStackTrace();
// }
/**
* 反序列化
*/
// File file = new File("d:\\part-0-0.avro");
// File file = new File("d:\\hdfs-transactionlog-20200929.avro");
// File file = new File("d:\\transactionlog-20201009.avro");
// byte[] byteBuffer = new byte[(int) file.length()];
//
// FileInputStream fileInputStream = null;
// try {
//// fileInputStream = new FileInputStream("d:\\part-0-0.avro");
// fileInputStream = new FileInputStream("d:\\transactionlog-20201009.avro");
// } catch (FileNotFoundException e) {
// e.printStackTrace();
// }
// try {
// fileInputStream.read(byteBuffer);
// } catch (IOException e) {
// e.printStackTrace();
// }
// GenericRecord genericRecord = AvroDeserializerFactory.getLogsDeserializer().deserializing(byteBuffer);
// System.out.println(genericRecord);
// 读取avro文件,反序列化
// DatumReader<LogData> reader = new SpecificDatumReader<LogData>(LogData.class);
//// DataFileReader<LogData> dataFileReader = new DataFileReader<LogData>(new File("d:\\part-0-0.avro"), reader);
// DataFileReader<LogData> dataFileReader = new DataFileReader<LogData>(new File("d:\\transactionlog-20201009.avro"), reader);
// LogData transactionLogRead = null;
// while (dataFileReader.hasNext()) {
// transactionLogRead = dataFileReader.next();
// System.out.println(transactionLogRead);
// }
Schema schema = new Schema.Parser().parse(new File("d:\\log.avro"));
GenericRecord emp = new GenericData.Record(schema);
File file = new File("d:\\1 (1).avro");
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader);
while (dataFileReader.hasNext())
{
emp = dataFileReader.next();
System.out.println(emp);
}
// Long aLong = utc2Local("2020-09-29T09:36:20.626+08:00");
// System.out.println(aLong);
}
public static Long utc2Local(String utcTime) {
SimpleDateFormat utcFormater = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
utcFormater.setTimeZone(TimeZone.getTimeZone("UTC"));//时区定义并进行时间获取
Date gpsUTCDate = null;
try {
gpsUTCDate = utcFormater.parse(utcTime);
} catch (ParseException e) {
System.out.println("时间戳格式转换异常:" + utcTime + e.getMessage());
}
return gpsUTCDate.getTime();
}
}
......@@ -6,9 +6,11 @@ package com.zorkdata.datamask.util;
* @Email xiesen@zork.com.cn
*/
public class ConfigUtils {
public static final String EMPTY_STR = "";
public static final String NULL_STR = "null";
public static String getString(String value, String defaultValue) {
String result = value == null || value.equals("") || value.equals("null") ? defaultValue : value;
String result = value == null || EMPTY_STR.equals(value) || NULL_STR.equals(value) ? defaultValue : value;
return result;
}
......
package com.zorkdata.datamask.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
/**
* @author 谢森
* @Description 时间处理工具类
* @Email xiesen310@163.com
* @Date 2020/10/21 14:39
*/
public class DateUtils {
public static final Logger logger = LoggerFactory.getLogger(DateUtils.class);
private static SimpleDateFormat utcFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
/**
* UTC时间转
*
* @param utcTime UTC时间
* @return unix时间戳
*/
public static Long utc2timestamp(String utcTime) {
//时区定义并进行时间获取
utcFormatter.setTimeZone(TimeZone.getTimeZone("asia/shanghai"));
Date gpsUtcDate = null;
try {
gpsUtcDate = utcFormatter.parse(utcTime);
} catch (ParseException e) {
logger.error("时间戳格式转换异常:{} 原因: {}", utcTime, e.getMessage());
return null;
}
return gpsUtcDate.getTime();
}
}
......@@ -19,6 +19,7 @@ import java.util.*;
public class LoadConf {
private static final Logger LOG = LoggerFactory.getLogger(com.zorkdata.datamask.util.LoadConf.class);
public static final int DEFAULT_MAP_CAPACITY = 16;
public LoadConf() {
}
......@@ -62,7 +63,7 @@ public class LoadConf {
throw new RuntimeException("Could not find config file on classpath " + name);
}
} else {
HashMap var19 = new HashMap();
HashMap var19 = new HashMap(DEFAULT_MAP_CAPACITY);
return var19;
}
} catch (IOException var17) {
......@@ -100,11 +101,11 @@ public class LoadConf {
return getConfigFileInputStream(configFilePath, true);
}
public static Map LoadYaml(String confPath) {
public static Map loadYaml(String confPath) {
return findAndReadYaml(confPath, true, true);
}
public static Map LoadProperty(String prop) {
public static Map loadProperty(String prop) {
InputStream in = null;
Properties properties = new Properties();
......@@ -125,8 +126,10 @@ public class LoadConf {
}
}
Map ret = new HashMap();
Map ret = new HashMap(DEFAULT_MAP_CAPACITY);
ret.putAll(properties);
return ret;
}
}
......@@ -15,55 +15,50 @@ import java.util.regex.Pattern;
* RegularExpression
*/
public class MaskUtil {
public static final int DEFAULT_MAP_CAPACITY = 16;
private MaskRegexConfig maskRegexConfig;
/**
* 姓名正则
*/
static Pattern namePattern = Pattern.compile("([\\u4e00-\\u9fa5]{1,20}|[a-zA-Z\\.\\s]{1,20})");
// Pattern namePattern = Pattern.compile(maskRegexConfig.getNameRegExp());
/**
* 手机号正则
*/
static Pattern mobilePattern = Pattern.compile("((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))\\d{8}");
// Pattern mobilePattern = Pattern.compile(maskRegexConfig.getMobileRegExp());
static Pattern mobilePattern = Pattern.compile("((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))" +
"\\d{8}");
/**
* 电话号码正则
*/
static Pattern phonePattern = Pattern.compile("(\\d{3,4}-)?\\d{6,8}");
// Pattern phonePattern = Pattern.compile(maskRegexConfig.getPhoneRegExp());
/**
* 邮箱正则
*/
static Pattern emailPattern = Pattern.compile("\\w+([-+.]\\w+)*@\\w+([-.]\\w+)*\\.\\w+([-.]\\w+)*");
// Pattern emailPattern = Pattern.compile(maskRegexConfig.getEmailRegExp());
/**
* 身份证号码(15位)正则
*/
static Pattern idPattern15 = Pattern.compile("[1-9]\\d{7}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}");
// Pattern idPattern15 = Pattern.compile(maskRegexConfig.getIdRegExp15());
/**
* 身份证号码(18位)正则
*/
static Pattern idPattern18 = Pattern.compile("[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}([0-9Xx])");
// Pattern idPattern18 = Pattern.compile(maskRegexConfig.getIdRegExp18());
static Pattern idPattern18 = Pattern.compile("[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}" +
"([0-9Xx])");
/**
* 家庭住址正则
*/
static Pattern addressPattern = Pattern.compile("([\\u4E00-\\u9FA5A-Za-z0-9_]+(省|市|区|县|道|路|街|号|弄|条|室)){2,}");
// Pattern addressPattern = Pattern.compile(maskRegexConfig.getAddressRegExp());
/**
* ip地址正则
* // static Pattern ipPattern = Pattern.compile("^((\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])\\.){3}
* // (\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])$");
*/
// static Pattern ipPattern = Pattern.compile("^((\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])\\.){3}(\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])$");
static Pattern ipPattern = Pattern.compile("((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)");
// Pattern ipPattern = Pattern.compile(maskRegexConfig.getIpRegExp());
static Pattern ipPattern = Pattern.compile("((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}" +
"(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)");
/**
* mac地址正则
*/
static Pattern macPattern = Pattern.compile("([A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}");
// Pattern macPattern = Pattern.compile(maskRegexConfig.getMacRegExp());
static List<Pattern> patterns = new ArrayList<Pattern>() {{
add(macPattern);
......@@ -87,7 +82,6 @@ public class MaskUtil {
for (int i = 0; i < matcher.group().length(); i++) {
replaceStr = replaceStr.concat("*");
}
// System.out.println(replaceStr);
value = value.replace(matcher.group(), replaceStr);
}
}
......@@ -99,7 +93,7 @@ public class MaskUtil {
public static void main(String[] args) {
MaskUtil maskUtil = new MaskUtil();
Map map = new HashMap();
Map map = new HashMap(DEFAULT_MAP_CAPACITY);
map.put("姓名", "王海鹰");
map.put("身份证号", "372925199008075158");
map.put("手机号", "15000101879");
......@@ -112,23 +106,5 @@ public class MaskUtil {
map.put("message", "王海鹰,372925199008075158#15000101879");
map.put("messid", "0000011404342B32233DDCDA");
System.out.println(maskUtil.mask(map));
// String mobile = "15000101879";
//
// Pattern pattern = Pattern.compile("(13\\d|14[579]|15[^4\\D]|17[^49\\D]|18\\d)\\d{8}");
// Matcher m = pattern.matcher(mobile);
////
// System.out.println(m.matches());
// if(m.matches()){
// System.out.println(m.group());
//
// String replaceStr = "";
// for(int i=0; i < m.group().length(); i++){
// replaceStr = replaceStr.concat("*");
// }
// System.out.println(replaceStr);
// mobile = mobile.replaceAll(m.group(), replaceStr);
// System.out.println(mobile);
// }
}
}
package com.zorkdata.datamask.util;
import com.zorkdata.datamask.constant.ParamConstants;
import com.zorkdata.datamask.domain.HadoopParam;
import com.zorkdata.datamask.domain.KafkaParam;
import java.util.Map;
/**
* @author 谢森
* @Description 参数工具类
* @Email xiesen310@163.com
* @Date 2020/10/21 14:42
*/
public class ParamUtils {
/**
* 初始化配置文件
*
* @param conf
*/
public static HadoopParam initHadoopConf(Map conf) {
String source = String.valueOf(conf.get(ParamConstants.SOURCE)).trim();
String hdfsSrc = String.valueOf(conf.get(ParamConstants.HDFS_SRC)).trim();
String hdfsDest = String.valueOf(conf.get(ParamConstants.HDFS_DEST)).trim();
String core = String.valueOf(conf.get(ParamConstants.CORE)).trim();
String date = String.valueOf(conf.get(ParamConstants.DATE)).trim();
Long startTime = Long.parseLong(String.valueOf(conf.get(ParamConstants.START_TIME)).trim());
Long endTime = Long.parseLong(String.valueOf(conf.get(ParamConstants.END_TIME)).trim());
return new HadoopParam(source, hdfsSrc, hdfsDest, core, date, startTime, endTime);
}
public static KafkaParam initKafkaConf(Map conf) {
String servers = String.valueOf(conf.get(ParamConstants.SERVERS)).trim();
String zookeeper = String.valueOf(conf.get(ParamConstants.ZOOKEEPER)).trim();
String topic = String.valueOf(conf.get(ParamConstants.TOPIC)).trim();
String hdfsDest = String.valueOf(conf.get(ParamConstants.HDFS_DEST)).trim();
String core = String.valueOf(conf.get(ParamConstants.CORE)).trim();
String date = String.valueOf(conf.get(ParamConstants.DATE)).trim();
Long startTime = Long.parseLong(String.valueOf(conf.get(ParamConstants.START_TIME)).trim());
Long endTime = Long.parseLong(String.valueOf(conf.get(ParamConstants.END_TIME)).trim());
return new KafkaParam(servers, zookeeper, topic, hdfsDest, core, date, startTime, endTime);
}
}
......@@ -13,6 +13,7 @@ import java.util.Map;
*/
public class ZorkParameterUtil {
private static final Logger logger = LoggerFactory.getLogger(com.zorkdata.datamask.util.ZorkParameterUtil.class);
public static final String YML_SUFFIX = "yml";
/**
* 读取参数
......@@ -26,22 +27,21 @@ public class ZorkParameterUtil {
String configPath;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
configPath = parameterTool.get("configPath");
configPath = parameterTool.get("conf");
} catch (Exception e) {
// configPath = "/etc/flinkConfig.yaml";
configPath = "D:\\zork\\transactionLogMask\\src\\main\\resources\\application.yml";
throw new RuntimeException("读取配置文件失败,请检查配置路径.");
}
logger.info("read config path is " + configPath);
if (!configPath.endsWith("yml")) {
if (!configPath.endsWith(YML_SUFFIX)) {
System.err.println("Please input correct configuration file and flink run mode!");
System.exit(-1);
throw new RuntimeException("Please input correct configuration file and flink run mode!");
} else {
conf = LoadConf.LoadYaml(configPath);
conf = LoadConf.loadYaml(configPath);
if (conf == null) {
logger.error("配置文件" + args[0] + "不存在,系统退出");
System.exit(-1);
throw new RuntimeException("配置文件" + args[0] + "不存在,系统退出");
}
}
return conf;
......
......@@ -100,7 +100,7 @@ public class AvroSerializer {
*/
public synchronized byte[] serializing(String json) {
byte[] byteArray = null;
JSONObject jsonObject = (JSONObject) JSONObject.parse(json);// new TypeReference<Object>() {}
JSONObject jsonObject = (JSONObject) JSONObject.parse(json);
GenericRecord genericRecord = new GenericData.Record(this.schema);
// 将数据加到genericRecord中
for (int i = 0; i < filedsArrayList.size(); i++) {
......@@ -209,7 +209,6 @@ public class AvroSerializer {
} catch (Exception e) {
System.out.println("序列化失败" + e);
}
// GenericRecord s = AvroDeserializerFactory.getTopicmetadataDeserializer().deserializing(returnstr);
return byteArray;
}
......
# 不做脱敏的字段白名单
fieldsWhiteList=fundid,custid,orgid,brhid,secuid,bankcode,market,ordersno,ordergroup,count,poststr,stkcode,bsflag,\
orderamt,price,qty,bankcode,tacode,ofcode,transacc,taacc
# 脱敏用的正则表达式
# 姓名正则
nameRegExp = "[\一-龥]{1,20}|[a-zA-Z\\.\\s]{1,20}"
# 手机号正则
mobileRegExp = "(13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|(147))\\d{8}"
# 电话号码正则
phoneRegExp = "(\\d{3,4}-)?\\d{6,8}"
# 邮箱正则
emailRegExp = "\\w+([-+.]\\w+)*@\\w+([-.]\\w+)*\\.\\w+([-.]\\w+)*"
# 身份证号码(15位)正则
idRegExp15 = "[1-9]\\d{7}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}"
#身份证号码(18位)正则
idRegExp18 = "[1-9]\\d{5}[1-9]\\d{3}((0\\d)|(1[0-2]))(([0|1|2]\\d)|3[0-1])\\d{3}([0-9Xx])"
# 家庭住址正则
addressRegExp = "([\一-\龥A-Za-z0-9_]+(省|市|区|县|道|路|街|号|弄|条|室)){2,}"
# ip地址正则
ipRegExp = "((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)"
# mac地址正则
macRegExp = "([A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}"
\ No newline at end of file
#!/usr/bin/env bash
export base_path=$(cd `dirname $0`; pwd)
deploy_path=${base_path%/*}
if [ ! -d "$deploy_path/logs" ]; then
mkdir -p $deploy_path/logs
fi
FLINK_TASK_CONF=gmas-config.yaml
parallelism=1
flink run -p $parallelism $deploy_path/lib/transactionLogMask-0.1.jar
$$deploy_path/conf/$FLINK_TASK_CONF> $$deploy_path/logs/transactionLogMask-0.1.log &
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment