Commit a3dbae78 authored by zskycode's avatar zskycode

<dev> 修改hdfs加载配置方式

parent 5a759320
Pipeline #51396 passed with stages
in 2 minutes and 37 seconds
......@@ -26,7 +26,7 @@ public class JobConfig implements Serializable {
this.setHdfsUri(jobInitConfig.getHdfsUri());
this.setHdfsUser(jobInitConfig.getHdfsUser());
this.setHdfsSrc(jobInitConfig.getHdfsUri() + jobInitConfig.getHdfsSrc());
this.setHdfsDest(jobInitConfig.getHdfsUri() + jobInitConfig.getHdfsDest());
this.setHdfsDest(jobInitConfig.getHdfsDestUri() + jobInitConfig.getHdfsDest());
this.setMatchHostname(jobInitConfig.getMatchHostname());
this.setStartTime(jobInitConfig.getStartTime());
this.setEndTime(jobInitConfig.getEndTime());
......
......@@ -26,6 +26,7 @@ public class JobInitConfig implements Serializable {
this.sinkParallelism = Integer.parseInt(conf.get(ConfigConstants.SINK_PARALLELISM));
this.avroOutputSchema = new Schema.Parser().parse(AvroSchemaDef.ZORK_LOG_SCHEMA).toString(true);
this.hdfsUri = String.valueOf(conf.get(ConfigConstants.HDFS_URI)).trim();
this.hdfsDestUri = String.valueOf(conf.get(ConfigConstants.HDFS_DEST_URI)).trim();
this.hdfsUser = String.valueOf(conf.get(ConfigConstants.HDFS_USER)).trim();
this.hdfsSrc = String.valueOf(conf.get(ConfigConstants.HDFS_SRC)).trim();
this.hdfsDest = String.valueOf(conf.get(ConfigConstants.HDFS_DEST)).trim();
......@@ -55,6 +56,7 @@ public class JobInitConfig implements Serializable {
private int sinkParallelism;
private String avroOutputSchema;
private String hdfsUri;
private String hdfsDestUri;
  • Remove this unused "hdfsDestUri" private field. 📘

Please register or sign in to reply
private String hdfsUser;
private String hdfsSrc;
private String hdfsDest;
......
......@@ -18,6 +18,7 @@ public final class ConfigConstants {
public static final String REG_DIMENSION = "reg.dimension";
public static final String HDFS_URI = "hdfs_uri";
public static final String HDFS_DEST_URI="hdfs_dest_uri";
public static final String HDFS_USER = "hdfs_user";
public static final String HDFS_SRC = "hdfs_src";
public static final String HDFS_DEST = "hdfs_dest";
......
......@@ -27,9 +27,10 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.security.Credentials;
import java.io.IOException;
import java.io.Serializable;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
......@@ -43,6 +44,13 @@ import java.util.List;
public class HdfsLogDesensitization implements Serializable {
private static final long serialVersionUID = -6253583122681202967L;
public static void main(String[] args) {
JobConf jobConf = localconfig("C:\\Users\\Administrator\\Desktop\\hadoop\\hadoop-conf");
Credentials credentials = jobConf.getCredentials();
  • Remove this useless assignment to local variable "credentials". 📘 🔽 Remove this unused "credentials" local variable. 📘

Please register or sign in to reply
OutputCommitter outputCommitter = jobConf.getOutputCommitter();
  • Remove this useless assignment to local variable "outputCommitter". 📘 🔽 Remove this unused "outputCommitter" local variable. 📘

Please register or sign in to reply
System.out.println(jobConf);
  • Replace this use of System.out or System.err by a logger. 📘

Please register or sign in to reply
}
private static final String AVRO_OUTPUT_SCHEMA = "avro.output.schema";
private JobConfig jobConfig;
......@@ -55,23 +63,26 @@ public class HdfsLogDesensitization implements Serializable {
desensitizationHdfsLog(this.jobConfig);
}
public JobConf config(String jobName, String path) {
JobConf conf = new JobConf(HdfsLogDesensitization.class);
conf.setJobName(jobName);
public static JobConf localconfig(String path) {
Configuration configuration = new Configuration();
configuration.addResource(new Path(path + "/core-site.xml"));
  • 🚫 Define a constant instead of duplicating this literal "/core-site.xml" 3 times. 📘

Please register or sign in to reply
configuration.addResource(new Path(path + "/hdfs-site.xml"));
  • 🚫 Define a constant instead of duplicating this literal "/hdfs-site.xml" 3 times. 📘

Please register or sign in to reply
JobConf conf = new JobConf(false);
conf.addResource("classpath:" + path + "/core-site.xml");
conf.addResource("classpath:" + path + "/hdfs-site.xml");
/// conf.addResource("classpath:" + path + "/mapred-site.xml");
log.info("配置文件加载:"+"classpath:" + path + "/core-site.xml");
log.info("配置文件加载:"+"classpath:" + path + "/hdfs-site.xml");
log.error("配置文件加载:" + "classpath:" + path + "/core-site.xml");
log.error("配置文件加载:" + "classpath:" + path + "/hdfs-site.xml");
return conf;
}
public void desensitizationHdfsLog(JobConfig jobConfig) {
// 初始化flink job env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
JobConf jobConfInput = new JobConf();
JobConf jobConfInput = null;
if (!StringUtils.isEmpty(jobConfig.getHdfsSourcePath())) {
jobConfInput = config("jobConfInput", jobConfig.getHdfsSourcePath());
jobConfInput = localconfig(jobConfig.getHdfsSourcePath());
} else {
jobConfInput = new JobConf();
}
jobConfInput.set(AVRO_OUTPUT_SCHEMA, jobConfig.getAvroOutputSchema());
......@@ -91,14 +102,17 @@ public class HdfsLogDesensitization implements Serializable {
// sink部分
// 获取目标hdfs的输出目录
JobConf jobConfOutput = new JobConf();
JobConf jobConfOutput = null;
if (!StringUtils.isEmpty(jobConfig.getHdfsSinkPath())) {
jobConfOutput = config("jobConfOutput", jobConfig.getHdfsSinkPath());
jobConfOutput = localconfig(jobConfig.getHdfsSinkPath());
} else {
jobConfOutput = new JobConf();
}
jobConfOutput.set(AVRO_OUTPUT_SCHEMA, jobConfig.getAvroOutputSchema());
String filePath = jobConfig.getHdfsDest();
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(new AvroOutputFormat(), jobConfOutput);
FileOutputFormat.setOutputPath(jobConfOutput, new Path(filePath));
HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(new AvroOutputFormat(), jobConfOutput);
// avro序列化算子(.writeAsText("file:///lmt/output"); 本地写入)
flatMapOperator.map(new MapFunction<LogData, Tuple2<AvroWrapper<LogData>, NullWritable>>() {
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.hadoop.mapred;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
/**
* Common base for Java and Scala API for using Hadoop input formats with Flink.
*
* @param <K> Type of key
* @param <V> Type of value
* @param <T> The type itself
*/
@Internal
public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCommonBase<T, HadoopInputSplit> {
  • 🚫 抽象类【HadoopInputFormatBase】命名应以Abstract或Base开头 📘 【HadoopInputFormatBase】注释缺少@author信息 📘

Please register or sign in to reply
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
// Mutexes to avoid concurrent operations on Hadoop InputFormats.
// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
// In contrast, Flink parallelizes using Threads, so multiple Hadoop InputFormat instances
// might be used in the same JVM.
  • 字段【OPEN_MUTEX】必须使用javadoc形式的注释 📘

Please register or sign in to reply
private static final Object OPEN_MUTEX = new Object();
private static final Object CONFIGURE_MUTEX = new Object();
private static final Object CLOSE_MUTEX = new Object();
private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
protected Class<K> keyClass;
protected Class<V> valueClass;
private JobConf jobConf;
protected transient K key;
protected transient V value;
private transient RecordReader<K, V> recordReader;
protected transient boolean fetched = false;
protected transient boolean hasNext;
public HadoopInputFormatBase(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
super(job.getCredentials());
this.mapredInputFormat = mapredInputFormat;
this.keyClass = key;
this.valueClass = value;
/// HadoopUtils.mergeHadoopConf(job);
  • This block of commented-out lines of code should be removed. 📘

Please register or sign in to reply
this.jobConf = job;
ReflectionUtils.setConf(mapredInputFormat, jobConf);
}
public JobConf getJobConf() {
return jobConf;
}
// --------------------------------------------------------------------------------------------
// InputFormat
// --------------------------------------------------------------------------------------------
@Override
public void configure(Configuration parameters) {
// enforce sequential configuration() calls
synchronized (CONFIGURE_MUTEX) {
// configure MR InputFormat if necessary
if (this.mapredInputFormat instanceof Configurable) {
((Configurable) this.mapredInputFormat).setConf(this.jobConf);
} else if (this.mapredInputFormat instanceof JobConfigurable) {
((JobConfigurable) this.mapredInputFormat).configure(this.jobConf);
}
}
}
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
// only gather base statistics for FileInputFormats
if (!(mapredInputFormat instanceof FileInputFormat)) {
return null;
}
final FileBaseStatistics cachedFileStats = (cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null;
try {
final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf);
return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
} catch (IOException ioex) {
if (LOG.isWarnEnabled()) {
LOG.warn("Could not determine statistics due to an io error: "
+ ioex.getMessage());
}
} catch (Throwable t) {
  • Catch Exception instead of Throwable. 📘

Please register or sign in to reply
if (LOG.isErrorEnabled()) {
LOG.error("Unexpected problem while getting the file statistics: "
+ t.getMessage(), t);
}
}
// no statistics available
return null;
}
@Override
public HadoopInputSplit[] createInputSplits(int minNumSplits)
throws IOException {
org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits);
HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length];
for (int i = 0; i < splitArray.length; i++) {
hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf);
}
return hiSplit;
}
@Override
public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
return new LocatableInputSplitAssigner(inputSplits);
}
@Override
public void open(HadoopInputSplit split) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
if (this.recordReader instanceof Configurable) {
((Configurable) this.recordReader).setConf(jobConf);
}
key = this.recordReader.createKey();
value = this.recordReader.createValue();
this.fetched = false;
}
}
@Override
public boolean reachedEnd() throws IOException {
if (!fetched) {
fetchNext();
}
return !hasNext;
}
protected void fetchNext() throws IOException {
hasNext = this.recordReader.next(key, value);
fetched = true;
}
@Override
public void close() throws IOException {
if (this.recordReader != null) {
// enforce sequential close() calls
synchronized (CLOSE_MUTEX) {
this.recordReader.close();
}
}
}
// --------------------------------------------------------------------------------------------
// Helper methods
// --------------------------------------------------------------------------------------------
private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
ArrayList<FileStatus> files) throws IOException {
long latestModTime = 0L;
// get the file info and check whether the cached statistics are still valid.
for (org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
final Path filePath = new Path(hadoopPath.toUri());
final FileSystem fs = FileSystem.get(filePath.toUri());
final FileStatus file = fs.getFileStatus(filePath);
latestModTime = Math.max(latestModTime, file.getModificationTime());
// enumerate all files and check their modification time stamp.
if (file.isDir()) {
FileStatus[] fss = fs.listStatus(filePath);
files.ensureCapacity(files.size() + fss.length);
for (FileStatus s : fss) {
if (!s.isDir()) {
files.add(s);
latestModTime = Math.max(s.getModificationTime(), latestModTime);
}
}
} else {
files.add(file);
}
}
// check whether the cached statistics are still valid, if we have any
if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
return cachedStats;
}
// calculate the whole length
long len = 0;
for (FileStatus s : files) {
len += s.getLen();
}
// sanity check
if (len <= 0) {
len = BaseStatistics.SIZE_UNKNOWN;
}
return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
}
// --------------------------------------------------------------------------------------------
// Custom serialization methods
// --------------------------------------------------------------------------------------------
private void writeObject(ObjectOutputStream out) throws IOException {
super.write(out);
out.writeUTF(mapredInputFormat.getClass().getName());
out.writeUTF(keyClass.getName());
out.writeUTF(valueClass.getName());
jobConf.write(out);
}
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
super.read(in);
String hadoopInputFormatClassName = in.readUTF();
String keyClassName = in.readUTF();
String valueClassName = in.readUTF();
if (jobConf == null) {
jobConf = new JobConf();
}
jobConf.readFields(in);
try {
this.mapredInputFormat = (org.apache.hadoop.mapred.InputFormat<K, V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate the hadoop input format", e);
  • Define and throw a dedicated exception instead of using a generic one. 📘

Please register or sign in to reply
}
try {
this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
} catch (Exception e) {
throw new RuntimeException("Unable to find key class.", e);
  • Define and throw a dedicated exception instead of using a generic one. 📘

Please register or sign in to reply
}
try {
this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
} catch (Exception e) {
throw new RuntimeException("Unable to find value class.", e);
  • Define and throw a dedicated exception instead of using a generic one. 📘

Please register or sign in to reply
}
ReflectionUtils.setConf(mapredInputFormat, jobConf);
jobConf.getCredentials().addAll(this.credentials);
Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
if (currentUserCreds != null) {
jobConf.getCredentials().addAll(currentUserCreds);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.hadoop.mapred;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCommitter;
import java.io.IOException;
/**
* Wrapper for using HadoopOutputFormats (mapred-variant) with Flink.
*
* <p>The IF is returning a {@code Tuple2<K,V>}.
*
* @param <K> Type of the key
* @param <V> Type of the value.
*/
@Public
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
  • 【HadoopOutputFormat】注释缺少@author信息 📘

Please register or sign in to reply
private static final long serialVersionUID = 1L;
public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) {
super(mapredOutputFormat, job);
}
public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, Class<OutputCommitter> outputCommitterClass, JobConf job) {
this(mapredOutputFormat, job);
super.getJobConf().setOutputCommitter(outputCommitterClass);
}
@Override
public void writeRecord(Tuple2<K, V> record) throws IOException {
this.recordWriter.write(record.f0, record.f1);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.hadoop.mapred;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
/**
* Common base for the mapred HadoopOutputFormat wrappers. There are implementations for Java and Scala.
*
* @param <K> Type of Key
* @param <V> Type of Value
* @param <T> Record type.
*/
@Internal
public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormatCommonBase<T> implements FinalizeOnMaster {
  • 🚫 抽象类【HadoopOutputFormatBase】命名应以Abstract或Base开头 📘 【HadoopOutputFormatBase】注释缺少@author信息 📘

Please register or sign in to reply
private static final long serialVersionUID = 1L;
// Mutexes to avoid concurrent operations on Hadoop OutputFormats.
// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
// In contrast, Flink parallelizes using Threads, so multiple Hadoop OutputFormat instances
// might be used in the same JVM.
  • 字段【OPEN_MUTEX】必须使用javadoc形式的注释 📘

Please register or sign in to reply
protected static final Object OPEN_MUTEX = new Object();
protected static final Object CONFIGURE_MUTEX = new Object();
protected static final Object CLOSE_MUTEX = new Object();
protected JobConf jobConf;
protected org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat;
protected transient RecordWriter<K, V> recordWriter;
protected transient OutputCommitter outputCommitter;
protected transient TaskAttemptContext context;
public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) {
super(job.getCredentials());
this.mapredOutputFormat = mapredOutputFormat;
/// HadoopUtils.mergeHadoopConf(job);
  • This block of commented-out lines of code should be removed. 📘

Please register or sign in to reply
this.jobConf = job;
}
public JobConf getJobConf() {
return jobConf;
}
// --------------------------------------------------------------------------------------------
// OutputFormat
// --------------------------------------------------------------------------------------------
@Override
public void configure(Configuration parameters) {
// enforce sequential configure() calls
synchronized (CONFIGURE_MUTEX) {
// configure MR OutputFormat if necessary
if (this.mapredOutputFormat instanceof Configurable) {
((Configurable) this.mapredOutputFormat).setConf(this.jobConf);
} else if (this.mapredOutputFormat instanceof JobConfigurable) {
((JobConfigurable) this.mapredOutputFormat).configure(this.jobConf);
}
}
}
/**
* create the temporary output file for hadoop RecordWriter.
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws java.io.IOException
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
  • 魔法值【6】 📘

Please register or sign in to reply
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
  • Format specifiers should be used instead of string concatenation. 📘

Please register or sign in to reply
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
}
/**
* commit the task by moving the output file out from the temporary directory.
* @throws java.io.IOException
*/
@Override
public void close() throws IOException {
// enforce sequential close() calls
synchronized (CLOSE_MUTEX) {
this.recordWriter.close(new HadoopDummyReporter());
if (this.outputCommitter.needsTaskCommit(this.context)) {
this.outputCommitter.commitTask(this.context);
}
}
}
@Override
public void finalizeGlobal(int parallelism) throws IOException {
try {
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
OutputCommitter outputCommitter = this.jobConf.getOutputCommitter();
  • Rename "outputCommitter" which hides the field declared at line 73. 📘

Please register or sign in to reply
// finalize HDFS output format
outputCommitter.commitJob(jobContext);
} catch (Exception e) {
throw new RuntimeException(e);
  • Define and throw a dedicated exception instead of using a generic one. 📘

Please register or sign in to reply
}
}
// --------------------------------------------------------------------------------------------
// Custom serialization methods
// --------------------------------------------------------------------------------------------
private void writeObject(ObjectOutputStream out) throws IOException {
super.write(out);
out.writeUTF(mapredOutputFormat.getClass().getName());
jobConf.write(out);
}
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
super.read(in);
String hadoopOutputFormatName = in.readUTF();
if (jobConf == null) {
jobConf = new JobConf();
}
jobConf.readFields(in);
try {
this.mapredOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K, V>) Class.forName(hadoopOutputFormatName, true, Thread.currentThread().getContextClassLoader()).newInstance();
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate the hadoop output format", e);
  • Define and throw a dedicated exception instead of using a generic one. 📘

Please register or sign in to reply
}
ReflectionUtils.setConf(mapredOutputFormat, jobConf);
jobConf.getCredentials().addAll(this.credentials);
Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
if (currentUserCreds != null) {
jobConf.getCredentials().addAll(currentUserCreds);
}
}
}
# 查询日志起始
start_time: "2021-02-26 00:00:20"
start_time: "2023-04-06 00:00:20"
# 查询日志结束
end_time: "2021-02-26 23:59:59"
end_time: "2023-04-06 23:59:59"
# 任务配置
job_name: "国泰交易日志脱敏job"
......@@ -43,16 +43,21 @@ download_path: "/tmp/"
# hadoop 相关配置
# hdfs 地址,必须以斜杠结尾
hdfs_uri: "hdfs://cdh-2:8020/"
hdfs_dest_uri: "hdfs://zork7084:8020/"
# hdfs 用户名
hdfs_user: "hdfs"
# hdfs日志源文件地址,若source为hdfs,则该地址必传,必须以斜杠结尾
hdfs_src: "/tmp/datawarehouse/net23/"
hdfs_src: "/xiesen/datawarehouse/kcbp/"
# hdfs日志写入地址,非必传,默认写到hdfs-src目录下的output目录下,必须以斜杠结尾
hdfs_dest: "/tmp/datawarehouse/net23/output14/"
hdfs_dest: "/xiesen/datawarehouse/kcbp/output1/"
hdfs_source_path: "/root/hadoop/hadoop-conf"
hdfs_sink_path: "/root/hadoop/ambari"
hdfs_source_path: "C:/Users/Administrator/Desktop/hadoop/hadoop-conf"
hdfs_sink_path: "C:/Users/Administrator/Desktop/hadoop/ambari"
# cdh下载配置
# cdh能执行hdfs命令的机器的ip
......
  • SonarQube analysis reported 230 issues

    • 7 blocker
    • 🚫 42 critical
    • 156 major
    • 🔽 22 minor
    • 3 info

    Watch the comments in this conversation to review them.

    Top 30 extra issues

    Note: The following issues were found on lines that were not modified in the commit. Because these issues can't be reported as line comments, they are summarized here:

    1. Remove this hard-coded password. 📘
    2. Remove this hard-coded password. 📘
    3. Remove this hard-coded password. 📘
    4. Remove this hard-coded password. 📘
    5. Remove this hard-coded password. 📘
    6. Remove this hard-coded password. 📘
    7. Remove this hard-coded password. 📘
    8. 🚫 Define a constant instead of duplicating this literal " {\n" 11 times. 📘
    9. 🚫 [Define a constant instead of duplicating this literal " "type": \n" 11 times. 📘
    10. 🚫 Define a constant instead of duplicating this literal " "string",\n" 6 times. 📘
    11. 🚫 Define a constant instead of duplicating this literal " "null"\n" 6 times. 📘
    12. 🚫 [Define a constant instead of duplicating this literal " ]\n" 11 times.](https://git.zorkdata.com/liaomingtao/transaction-log-desensitization/blob/a3dbae78c33864315edec5cb8f5567f8243e9247/src/main/java/com/zorkdata/desensitization/avro/AvroSchemaDef.java#L23) 📘
    13. 🚫 Define a constant instead of duplicating this literal " },\n" 9 times. 📘
    14. 🚫 Define a constant instead of duplicating this literal " "null",\n" 5 times. 📘
    15. 🚫 Define a constant instead of duplicating this literal " {\n" 5 times. 📘
    16. 🚫 Define a constant instead of duplicating this literal " "type": "map",\n" 5 times. 📘
    17. 🚫 Define a constant instead of duplicating this literal " "values": "string"\n" 3 times. 📘
    18. 🚫 Define a constant instead of duplicating this literal " }\n" 5 times. 📘
    19. 🚫 Define a constant instead of duplicating this literal "序列化失败" 13 times. 📘
    20. 🚫 Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed. 📘
    21. 🚫 Refactor this method to reduce its Cognitive Complexity from 161 to the 15 allowed. 📘
    22. 🚫 Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed. 📘
    23. 🚫 Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed. 📘
    24. 🚫 Define a constant instead of duplicating this literal "classpath:" 4 times. 📘
    25. 🚫 Refactor this method to reduce its Cognitive Complexity from 19 to the 15 allowed. 📘
    26. 🚫 Change this "try" to a try-with-resources. (sonar.java.source not set. Assuming 7 or greater.) 📘
    27. 🚫 Refactor this code to not throw exceptions in finally blocks. 📘
    28. 🚫 Refactor this code to not throw exceptions in finally blocks. 📘
    29. 🚫 Change this "try" to a try-with-resources. (sonar.java.source not set. Assuming 7 or greater.) 📘
    30. 🚫 Define a constant instead of duplicating this literal "jobName" 8 times. 📘
    • ... 174 more
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment