Commit a7c76f3f authored by DeleMing's avatar DeleMing

优化代码

parent aa4ae227
...@@ -16,24 +16,22 @@ import org.slf4j.LoggerFactory; ...@@ -16,24 +16,22 @@ import org.slf4j.LoggerFactory;
* @author DeleMing * @author DeleMing
*/ */
public class AvroDeserializer { public class AvroDeserializer {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializer.class); private static final Logger logger = LoggerFactory.getLogger(AvroDeserializer.class);
public JSONObject jsonObject; public JSONObject jsonObject;
public JSONArray jsonArray; public JSONArray jsonArray;
public Schema schema; public Schema schema;
public String[] keys; public String[] keys;
public AvroDeserializer(String schema) { public AvroDeserializer(String schema) {
getKeysFromjson(schema); getKeysFromJson(schema);
} }
/** /**
* @param schema:Avro序列化所使用的schema * 用于获取Avro的keys
* @return void 返回类型 *
* @throws * @param schema Avro序列化所使用的schema
* @Title: getKeysFromjson
* @Description:用于获取Avro的keys
*/ */
void getKeysFromjson(String schema) { void getKeysFromJson(String schema) {
this.jsonObject = JSONObject.parseObject(schema); this.jsonObject = JSONObject.parseObject(schema);
this.schema = new Schema.Parser().parse(schema); this.schema = new Schema.Parser().parse(schema);
this.jsonArray = this.jsonObject.getJSONArray("fields"); this.jsonArray = this.jsonObject.getJSONArray("fields");
...@@ -44,14 +42,11 @@ public class AvroDeserializer { ...@@ -44,14 +42,11 @@ public class AvroDeserializer {
} }
/** /**
* @param body 参数:byte[] body:kafka消息。 * 用于Avro的反序列化
* @param @return 设定文件 *
* @return String 返回类型 * @param body 参数:byte[] body:kafka消息。
* @throws * @return GenericRecord
* @Title: deserializing
* @Description: 用于Avro的反序列化。
*/ */
public GenericRecord deserializing(byte[] body) { public GenericRecord deserializing(byte[] body) {
DatumReader<GenericData.Record> datumReader = new GenericDatumReader<GenericData.Record>(this.schema); DatumReader<GenericData.Record> datumReader = new GenericDatumReader<GenericData.Record>(this.schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(body, null); Decoder decoder = DecoderFactory.get().binaryDecoder(body, null);
...@@ -59,7 +54,7 @@ public class AvroDeserializer { ...@@ -59,7 +54,7 @@ public class AvroDeserializer {
try { try {
result = datumReader.read(null, decoder); result = datumReader.read(null, decoder);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error(String.format("error Avro反序列化"), e); logger.error("error Avro反序列化", e);
} }
return result; return result;
} }
......
...@@ -4,6 +4,7 @@ package com.zorkdata.tools.avro; ...@@ -4,6 +4,7 @@ package com.zorkdata.tools.avro;
* @author DeleMing * @author DeleMing
*/ */
public class AvroDeserializerFactory { public class AvroDeserializerFactory {
private static AvroDeserializer logs = null; private static AvroDeserializer logs = null;
private static AvroDeserializer metrics = null; private static AvroDeserializer metrics = null;
......
...@@ -24,46 +24,41 @@ public class AvroSerializer { ...@@ -24,46 +24,41 @@ public class AvroSerializer {
public JSONObject jsonObject; public JSONObject jsonObject;
public JSONArray jsonArray; public JSONArray jsonArray;
public Schema schema; public Schema schema;
public List<String> filedsArrayList = new ArrayList<String>(); public List<String> fieldsArrayList = new ArrayList<String>();
public AvroSerializer(String schema) { public AvroSerializer(String schema) {
getKeysFromjson(schema); getKeysFromJson(schema);
} }
/** /**
* @param schema * 用于获取Avro的keys
* :Avro序列化所使用的schema *
* @return void 返回类型 * @param schema Avro序列化所使用的schema
* @throws
* @Title: getKeysFromjson
* @Description:用于获取Avro的keys
*/ */
void getKeysFromjson(String schema) { void getKeysFromJson(String schema) {
this.jsonObject = JSONObject.parseObject(schema); this.jsonObject = JSONObject.parseObject(schema);
this.schema = new Schema.Parser().parse(schema); this.schema = new Schema.Parser().parse(schema);
this.jsonArray = this.jsonObject.getJSONArray("fields"); this.jsonArray = this.jsonObject.getJSONArray("fields");
if (filedsArrayList != null && filedsArrayList.size() > 0) { if (fieldsArrayList != null && fieldsArrayList.size() > 0) {
filedsArrayList.clear(); fieldsArrayList.clear();
} }
for (int i = 0; i < this.jsonArray.size(); i++) { for (int i = 0; i < this.jsonArray.size(); i++) {
filedsArrayList.add(this.jsonArray.getJSONObject(i).get("name").toString()); fieldsArrayList.add(this.jsonArray.getJSONObject(i).get("name").toString());
} }
} }
/** /**
* @param * 用于Avro的序列化
* @param @return 设定文件 *
* @return String 返回类型 * @param temtuple
* @throws * @return
* @Title: serializing
* @Description: 用于Avro的序列化。
*/ */
private synchronized byte[] serializing(List<String> temtuple) { private synchronized byte[] serializing(List<String> temtuple) {
byte[] returnstr = null; byte[] returnstr = null;
GenericRecord datum = new GenericData.Record(this.schema); GenericRecord datum = new GenericData.Record(this.schema);
// 将数据加到datum中 // 将数据加到datum中
for (int i = 0; i < filedsArrayList.size(); i++) { for (int i = 0; i < fieldsArrayList.size(); i++) {
datum.put(filedsArrayList.get(i), temtuple.get(i)); datum.put(fieldsArrayList.get(i), temtuple.get(i));
} }
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型 // DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
...@@ -76,7 +71,7 @@ public class AvroSerializer { ...@@ -76,7 +71,7 @@ public class AvroSerializer {
encoder.flush(); encoder.flush();
} catch (IOException e) { } catch (IOException e) {
System.out.println("序列化失败 " + e ); System.out.println("序列化失败 " + e);
} finally { } finally {
if (out != null) { if (out != null) {
try { try {
...@@ -105,8 +100,8 @@ public class AvroSerializer { ...@@ -105,8 +100,8 @@ public class AvroSerializer {
JSONObject jsonObject = (JSONObject) JSONObject.parse(json);// new TypeReference<Object>() {} JSONObject jsonObject = (JSONObject) JSONObject.parse(json);// new TypeReference<Object>() {}
GenericRecord datum = new GenericData.Record(this.schema); GenericRecord datum = new GenericData.Record(this.schema);
// 将数据加到datum中 // 将数据加到datum中
for (int i = 0; i < filedsArrayList.size(); i++) { for (int i = 0; i < fieldsArrayList.size(); i++) {
datum.put(filedsArrayList.get(i), new Utf8(String.valueOf(jsonObject.get(filedsArrayList.get(i))))); datum.put(fieldsArrayList.get(i), new Utf8(String.valueOf(jsonObject.get(fieldsArrayList.get(i)))));
} }
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型 // DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
...@@ -146,8 +141,8 @@ public class AvroSerializer { ...@@ -146,8 +141,8 @@ public class AvroSerializer {
byte[] returnstr = null; byte[] returnstr = null;
GenericRecord datum = new GenericData.Record(this.schema); GenericRecord datum = new GenericData.Record(this.schema);
// 将数据加到datum中 // 将数据加到datum中
for (int i = 0; i < filedsArrayList.size(); i++) { for (int i = 0; i < fieldsArrayList.size(); i++) {
datum.put(filedsArrayList.get(i), jsonObject.get(filedsArrayList.get(i))); datum.put(fieldsArrayList.get(i), jsonObject.get(fieldsArrayList.get(i)));
} }
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型 // DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
...@@ -249,8 +244,8 @@ public class AvroSerializer { ...@@ -249,8 +244,8 @@ public class AvroSerializer {
byte[] returnstr = null; byte[] returnstr = null;
GenericRecord datum = new GenericData.Record(this.schema); GenericRecord datum = new GenericData.Record(this.schema);
// 将数据加到datum中 // 将数据加到datum中
for (int i = 0; i < filedsArrayList.size(); i++) { for (int i = 0; i < fieldsArrayList.size(); i++) {
datum.put(filedsArrayList.get(i), new Utf8(String.valueOf(genericRecord.get(key[i])))); datum.put(fieldsArrayList.get(i), new Utf8(String.valueOf(genericRecord.get(key[i]))));
} }
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型 // DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
......
...@@ -4,6 +4,7 @@ package com.zorkdata.tools.avro; ...@@ -4,6 +4,7 @@ package com.zorkdata.tools.avro;
* @author DeleMing * @author DeleMing
*/ */
public class AvroSerializerFactory { public class AvroSerializerFactory {
private static AvroSerializer metricMetadata = null; private static AvroSerializer metricMetadata = null;
private static AvroSerializer logMetadata = null; private static AvroSerializer logMetadata = null;
......
...@@ -3,6 +3,8 @@ package com.zorkdata.tools.kafka; ...@@ -3,6 +3,8 @@ package com.zorkdata.tools.kafka;
import com.zorkdata.tools.oldkafka.AvroSerializerFactory; import com.zorkdata.tools.oldkafka.AvroSerializerFactory;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
...@@ -13,87 +15,90 @@ import java.util.Properties; ...@@ -13,87 +15,90 @@ import java.util.Properties;
*/ */
public class CommonProducer { public class CommonProducer {
private static String kafkaServer; private static final Logger logger = LoggerFactory.getLogger(CommonProducer.class);
private static int kafkaBathSize;
private static KafkaProducer<String, byte[]> producerByte; private String kafkaServer;
private static KafkaProducer<String, String> producerString; private int kafkaBathSize;
private KafkaProducer<String, byte[]> producerByte;
private KafkaProducer<String, String> producerString;
public void initConfig() {
kafkaServer = Config.INSTANCE.getKafkaServers();
kafkaBathSize = Config.INSTANCE.getKafkaBathSize();
}
public CommonProducer() { public CommonProducer() {
try { try {
initConfig(); initConfig();
Properties props = new Properties(); Properties props = new Properties();
props.put("bootstrap.servers", kafkaServer); props.put("bootstrap.servers", kafkaServer);
props.put("client.id", "webAPI4LogGather");
// 不自定义clientid使用自增clientid, props.put("client.id", "webAPI4LogGather");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("batch.size", kafkaBathSize); props.put("batch.size", kafkaBathSize);
// 启用压缩 // 启用压缩
props.put("compression.type", "lz4"); props.put("compression.type", "lz4");
producerByte = new KafkaProducer<String, byte[]>(props); producerByte = new KafkaProducer<>(props);
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerString = new KafkaProducer<String, String>(props); producerString = new KafkaProducer<>(props);
} catch (Exception ex) { } catch (Exception ex) {
ex.printStackTrace(); logger.error(ex.toString());
} }
} }
public void initConfig() {
kafkaServer = Config.getInstance().kafkaServers;
kafkaBathSize = Config.getInstance().kafkaBathSize;
}
public void sendLog(String topic, String logTypeName, String timestamp, String source, String offset, public void sendLog(String topic, String logTypeName, String timestamp, String source, String offset,
Map<String, String> dimensions, Map<String, Double> metrics, Map<String, String> normalFields) { Map<String, String> dimensions, Map<String, Double> metrics, Map<String, String> normalFields) {
try { try {
byte[] bytes = AvroSerializerFactory.getLogAvorSerializer().serializingLog(logTypeName, timestamp, source, byte[] bytes = AvroSerializerFactory.getLogAvorSerializer().serializingLog(logTypeName, timestamp, source,
offset, dimensions, metrics, normalFields); offset, dimensions, metrics, normalFields);
producerByte.send(new ProducerRecord<String, byte[]>(topic, null, bytes)); producerByte.send(new ProducerRecord<>(topic, null, bytes));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); logger.error(e.toString());
} }
} }
public void sendLog(String topic, String logJson) { public void sendLog(String topic, String logJson) {
try { try {
producerString.send(new ProducerRecord<String, String>(topic, null, logJson)); producerString.send(new ProducerRecord<>(topic, null, logJson));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); logger.error(e.toString());
} }
} }
public void sendErrorLog(String errorLogTopic, String logJson) { public void sendMetric(String metricTopic, String metricSetName, String timestamp, Map<String, String> dimensions,
Map<String, Double> metrics) {
try { try {
producerString.send(new ProducerRecord<String, String>(errorLogTopic, null, logJson)); byte[] bytes = AvroSerializerFactory.getMetricAvorSerializer().serializingMetric(metricSetName, timestamp,
dimensions, metrics);
producerByte.send(new ProducerRecord<>(metricTopic, null, bytes));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); logger.error(e.toString());
} }
} }
public void sendErrorMetric(String errorMetricTopic, String logJson) { public void sendErrorLog(String errorLogTopic, String logJson) {
try { try {
producerString.send(new ProducerRecord<String, String>(errorMetricTopic, null, logJson)); producerString.send(new ProducerRecord<>(errorLogTopic, null, logJson));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); logger.error(e.toString());
} }
} }
public void sendAlarm(String alarmTopic, String alarmJson) { public void sendErrorMetric(String errorMetricTopic, String logJson) {
try { try {
producerString.send(new ProducerRecord<String, String>(alarmTopic, null, alarmJson)); producerString.send(new ProducerRecord<>(errorMetricTopic, null, logJson));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); logger.error(e.toString());
} }
} }
public void sendMetric(String metricTopic, String metricSetName, String timestamp, Map<String, String> dimensions, public void sendAlarm(String alarmTopic, String alarmJson) {
Map<String, Double> metrics) {
try { try {
byte[] bytes = AvroSerializerFactory.getMetricAvorSerializer().serializingMetric(metricSetName, timestamp, producerString.send(new ProducerRecord<>(alarmTopic, null, alarmJson));
dimensions, metrics);
producerByte.send(new ProducerRecord<String, byte[]>(metricTopic, null, bytes));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); logger.error(e.toString());
} }
} }
......
...@@ -32,7 +32,6 @@ public class CommonProducerPool implements Closeable { ...@@ -32,7 +32,6 @@ public class CommonProducerPool implements Closeable {
init(); init();
} }
public void init() { public void init() {
pool = new CommonProducer[threadNum]; pool = new CommonProducer[threadNum];
for (int i = 0; i < threadNum; i++) { for (int i = 0; i < threadNum; i++) {
...@@ -41,7 +40,8 @@ public class CommonProducerPool implements Closeable { ...@@ -41,7 +40,8 @@ public class CommonProducerPool implements Closeable {
} }
public CommonProducer getProducer() { public CommonProducer getProducer() {
if (index > 65535) { final int maxThread = 65535;
if (index > maxThread) {
index = 0; index = 0;
} }
return pool[index++ % threadNum]; return pool[index++ % threadNum];
......
package com.zorkdata.tools.kafka; package com.zorkdata.tools.kafka;
import com.zorkdata.tools.utils.PropertiesUtil; import com.zorkdata.tools.utils.PropertiesUtil;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties; import java.util.Properties;
/** /**
* @author MSI-Gaming * @author MSI-Gaming
*/ */
@Getter
public class Config { public class Config {
private static Config instance;
public static Config getInstance() { private static final Logger logger = LoggerFactory.getLogger(Config.class);
if (instance == null) {
instance = new Config();
}
return instance;
}
public String kafkaServers; private String kafkaServers;
public int kafkaBathSize; protected int kafkaBathSize;
public String topicName; protected String topicName;
public static final Config INSTANCE = new Config();
/**
* 读取配置文件
*/
private Config() { private Config() {
try { try {
Properties properties = PropertiesUtil.getProperties("/config.properties"); Properties properties = PropertiesUtil.getProperties("/config.properties");
...@@ -29,7 +31,7 @@ public class Config { ...@@ -29,7 +31,7 @@ public class Config {
kafkaBathSize = Integer.parseInt(properties.getProperty("kafka.batch.size")); kafkaBathSize = Integer.parseInt(properties.getProperty("kafka.batch.size"));
topicName = properties.getProperty("kafka.topic.name"); topicName = properties.getProperty("kafka.topic.name");
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); logger.error(e.toString());
System.exit(1); System.exit(1);
} }
} }
......
# Kafka Producer部分参数设置及其说明
```.java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop:9092");
/**
* 这个参数控制着相同分区内数据发送的批次个数大小,也就是当数据达到 这个size 时,进行数据发送,
* 但是并不是数据达不到 size 的值,就不会发送数据,默认是 1048576,即 16k
*/
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
/**
* 消息是否发送,不是仅仅通过 batch.size 的值来控制的,实际上是一种权衡策略,即吞吐量和延时之间的权衡
* linger.ms 参数就是控制消息发送延时行为的,默认是 0,表示消息需要被立即发送。
*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
/**
* 指定了producer 端用于缓存的缓存区大小,单位是字节,默认是 33554432, 即 32G
*/
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
/**
* 用户控制 生产者的持久性 acks 有3个值,
* 0: 表示producer 完全不理睬 broker 的处理结果
* all: 表示发送数据时,broker 不仅会将消息写入到本地磁盘,同时也要保证其他副本也写入完成,才返回结果
* 1: 表示发送数据时,broker 接收到消息写入到本地磁盘即可,无需保证其他副本是否写入成功
*/
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return props;
```
\ No newline at end of file
package com.zorkdata.tools.load;
import com.zorkdata.tools.kafka.Config;
/**
* @author: LiaoMingtao
* @date: 2020/6/30
*/
public class Test {
public static void main(String[] args) {
System.out.println(Config.INSTANCE.getKafkaServers());
}
}
...@@ -3,18 +3,9 @@ package com.zorkdata.tools.mock; ...@@ -3,18 +3,9 @@ package com.zorkdata.tools.mock;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zorkdata.tools.kafka.CommonProducer; import com.zorkdata.tools.kafka.CommonProducer;
import com.zorkdata.tools.kafka.CommonProducerPool; import com.zorkdata.tools.kafka.CommonProducerPool;
import com.zorkdata.tools.oldkafka.Producer;
import com.zorkdata.tools.oldkafka.ProducerPool;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException; import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutionException;
import static java.lang.System.currentTimeMillis; import static java.lang.System.currentTimeMillis;
......
...@@ -2,8 +2,6 @@ package com.zorkdata.tools.mock; ...@@ -2,8 +2,6 @@ package com.zorkdata.tools.mock;
import com.zorkdata.tools.kafka.CommonProducer; import com.zorkdata.tools.kafka.CommonProducer;
import com.zorkdata.tools.kafka.CommonProducerPool; import com.zorkdata.tools.kafka.CommonProducerPool;
import com.zorkdata.tools.oldkafka.Producer;
import com.zorkdata.tools.oldkafka.ProducerPool;
import com.zorkdata.tools.utils.DateUtil; import com.zorkdata.tools.utils.DateUtil;
import java.util.HashMap; import java.util.HashMap;
...@@ -50,7 +48,7 @@ public class MockStreamxLogAvro { ...@@ -50,7 +48,7 @@ public class MockStreamxLogAvro {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// long size = 10000000L * 1; // long size = 10000000L * 1;
long size = 10; long size = 100;
// String topicName = "log2metric1y"; // String topicName = "log2metric1y";
String topicName = "test"; String topicName = "test";
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
......
...@@ -4,5 +4,6 @@ kafka.topic.name=a ...@@ -4,5 +4,6 @@ kafka.topic.name=a
key.serializer=org.apache.kafka.common.serialization.StringSerializer key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
#value.serializer=org.apache.kafka.common.serialization.StringSerializer #value.serializer=org.apache.kafka.common.serialization.StringSerializer
producer.theard.num=15
log.size = 10000 log.size = 10000
log.topic=flinkx log.topic=flinkx
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment