Commit 3d6de0d0 authored by DeleMing's avatar DeleMing

优化代码

parent a7c76f3f
...@@ -34,7 +34,7 @@ public class CommonProducer { ...@@ -34,7 +34,7 @@ public class CommonProducer {
Properties props = new Properties(); Properties props = new Properties();
props.put("bootstrap.servers", kafkaServer); props.put("bootstrap.servers", kafkaServer);
// 不自定义clientid使用自增clientid, props.put("client.id", "webAPI4LogGather"); // props.put("client.id", "webAPI4LogGather"); 不自定义client.id,使用自增
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("batch.size", kafkaBathSize); props.put("batch.size", kafkaBathSize);
...@@ -48,8 +48,20 @@ public class CommonProducer { ...@@ -48,8 +48,20 @@ public class CommonProducer {
} }
} }
public void sendLog(String topic, String logTypeName, String timestamp, String source, String offset, /**
Map<String, String> dimensions, Map<String, Double> metrics, Map<String, String> normalFields) { * 发送logAvro数据
*
* @param topic 指标名称
* @param logTypeName 日志结构名称
* @param timestamp 时间
* @param source 日志路径
* @param offset 偏移量
* @param dimensions 维度
* @param metrics 指标
* @param normalFields 普通列
*/
public void sendLogAvro(String topic, String logTypeName, String timestamp, String source, String offset,
Map<String, String> dimensions, Map<String, Double> metrics, Map<String, String> normalFields) {
try { try {
byte[] bytes = AvroSerializerFactory.getLogAvorSerializer().serializingLog(logTypeName, timestamp, source, byte[] bytes = AvroSerializerFactory.getLogAvorSerializer().serializingLog(logTypeName, timestamp, source,
offset, dimensions, metrics, normalFields); offset, dimensions, metrics, normalFields);
...@@ -59,16 +71,17 @@ public class CommonProducer { ...@@ -59,16 +71,17 @@ public class CommonProducer {
} }
} }
public void sendLog(String topic, String logJson) { /**
try { * 想kafka发送metricAvro数据
producerString.send(new ProducerRecord<>(topic, null, logJson)); *
} catch (Exception e) { * @param metricTopic topic名称
logger.error(e.toString()); * @param metricSetName 指标集名称
} * @param timestamp 时间戳
} * @param dimensions 维度
* @param metrics 指标
public void sendMetric(String metricTopic, String metricSetName, String timestamp, Map<String, String> dimensions, */
Map<String, Double> metrics) { public void sendMetricAvro(String metricTopic, String metricSetName, String timestamp, Map<String, String> dimensions,
Map<String, Double> metrics) {
try { try {
byte[] bytes = AvroSerializerFactory.getMetricAvorSerializer().serializingMetric(metricSetName, timestamp, byte[] bytes = AvroSerializerFactory.getMetricAvorSerializer().serializingMetric(metricSetName, timestamp,
dimensions, metrics); dimensions, metrics);
...@@ -78,28 +91,19 @@ public class CommonProducer { ...@@ -78,28 +91,19 @@ public class CommonProducer {
} }
} }
public void sendErrorLog(String errorLogTopic, String logJson) { /**
* 向kafka发送json数据
*
* @param topic topic名称
* @param logJson json数据
*/
public void sendJson(String topic, String logJson) {
try { try {
producerString.send(new ProducerRecord<>(errorLogTopic, null, logJson)); producerString.send(new ProducerRecord<>(topic, null, logJson));
} catch (Exception e) {
logger.error(e.toString());
}
}
public void sendErrorMetric(String errorMetricTopic, String logJson) {
try {
producerString.send(new ProducerRecord<>(errorMetricTopic, null, logJson));
} catch (Exception e) { } catch (Exception e) {
logger.error(e.toString()); logger.error(e.toString());
} }
} }
public void sendAlarm(String alarmTopic, String alarmJson) {
try {
producerString.send(new ProducerRecord<>(alarmTopic, null, alarmJson));
} catch (Exception e) {
logger.error(e.toString());
}
}
} }
...@@ -73,7 +73,7 @@ public class MockGrok { ...@@ -73,7 +73,7 @@ public class MockGrok {
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
String json = buildJson(buildMessage()); String json = buildJson(buildMessage());
CommonProducer producer = CommonProducerPool.getInstance().getProducer(); CommonProducer producer = CommonProducerPool.getInstance().getProducer();
producer.sendLog(topicName, json); producer.sendJson(topicName, json);
} }
Thread.sleep(1000); Thread.sleep(1000);
} }
......
package com.zorkdata.tools.mock;
/**
* @author: LiaoMingtao
* @date: 2020/6/30
*/
public class MockMetricbeatData {
}
...@@ -40,7 +40,7 @@ public class MockStreamxLogAvro { ...@@ -40,7 +40,7 @@ public class MockStreamxLogAvro {
} }
private static Map<String, String> getRandomNormalFields() { private static Map<String, String> getRandomNormalFields() {
Map<String, String> normalFields = new HashMap<>(); Map<String, String> normalFields = new HashMap<>(2);
normalFields.put("message", "data update success"); normalFields.put("message", "data update success");
normalFields.put("countryCode", getRandomCountryCode()); normalFields.put("countryCode", getRandomCountryCode());
return normalFields; return normalFields;
...@@ -60,7 +60,7 @@ public class MockStreamxLogAvro { ...@@ -60,7 +60,7 @@ public class MockStreamxLogAvro {
Map<String, Double> measures = new HashMap<>(1); Map<String, Double> measures = new HashMap<>(1);
Map<String, String> normalFields = getRandomNormalFields(); Map<String, String> normalFields = getRandomNormalFields();
CommonProducer producer = CommonProducerPool.getInstance().getProducer(); CommonProducer producer = CommonProducerPool.getInstance().getProducer();
producer.sendLog(topicName, logTypeName, timestamp, source, offset, dimensions, producer.sendLogAvro(topicName, logTypeName, timestamp, source, offset, dimensions,
measures, normalFields); measures, normalFields);
} }
Thread.sleep(1000); Thread.sleep(1000);
......
...@@ -2,8 +2,6 @@ package com.zorkdata.tools.mock; ...@@ -2,8 +2,6 @@ package com.zorkdata.tools.mock;
import com.zorkdata.tools.kafka.CommonProducer; import com.zorkdata.tools.kafka.CommonProducer;
import com.zorkdata.tools.kafka.CommonProducerPool; import com.zorkdata.tools.kafka.CommonProducerPool;
import com.zorkdata.tools.oldkafka.Producer;
import com.zorkdata.tools.oldkafka.ProducerPool;
import com.zorkdata.tools.utils.DateUtil; import com.zorkdata.tools.utils.DateUtil;
import java.util.HashMap; import java.util.HashMap;
...@@ -16,38 +14,20 @@ import java.util.Random; ...@@ -16,38 +14,20 @@ import java.util.Random;
*/ */
public class MockStreamxMetricAvro { public class MockStreamxMetricAvro {
private static final Random random = new Random();
private static Map<String, String> getRandomDimensions() { private static Map<String, String> getRandomDimensions() {
Random random = new Random();
int i = random.nextInt(10); int i = random.nextInt(10);
Map<String, String> dimensions = new HashMap<>(); Map<String, String> dimensions = new HashMap<>(4);
dimensions.put("hostname", "zorkdata" + i); dimensions.put("hostname", "zorkdata" + i);
dimensions.put("ip", "192.168.1." + i); dimensions.put("ip", "192.168.1." + i);
dimensions.put("appprogramname", "tc50"); dimensions.put("appprogramname", "tc50");
dimensions.put("appsystem", "tdx"); dimensions.put("appsystem", "tdx");
return dimensions; return dimensions;
} }
private static String[] codes = {
"AO", "AF", "AL", "DZ", "AD", "AI", "AG", "AR", "AM", "AU",
"AT", "AZ", "BS", "BH", "BD", "BB", "BY", "BE", "BZ", "BJ"
};
private static String getRandomCountryCode() {
return codes[new Random(codes.length).nextInt(codes.length)];
}
private static Map<String, String> getRandomNormalFields() {
Map<String, String> normalFields = new HashMap<>();
normalFields.put("message", "data update success");
normalFields.put("countryCode", getRandomCountryCode());
return normalFields;
}
private static Map<String, Double> getMetrics() { private static Map<String, Double> getMetrics() {
Map<String, Double> metrics = new HashMap<>(1); Map<String, Double> metrics = new HashMap<>(1);
Random random = new Random();
int i = random.nextInt(10); int i = random.nextInt(10);
metrics.put("metric1", i * 0.1); metrics.put("metric1", i * 0.1);
metrics.put("metric2", i * 0.2); metrics.put("metric2", i * 0.2);
...@@ -65,7 +45,7 @@ public class MockStreamxMetricAvro { ...@@ -65,7 +45,7 @@ public class MockStreamxMetricAvro {
Map<String, String> dimensions = getRandomDimensions(); Map<String, String> dimensions = getRandomDimensions();
Map<String, Double> metrics = getMetrics(); Map<String, Double> metrics = getMetrics();
CommonProducer producer = CommonProducerPool.getInstance().getProducer(); CommonProducer producer = CommonProducerPool.getInstance().getProducer();
producer.sendMetric(topicName, metricSetName, timestamp, dimensions, metrics); producer.sendMetricAvro(topicName, metricSetName, timestamp, dimensions, metrics);
} }
Thread.sleep(1000); Thread.sleep(1000);
} }
......
...@@ -11,7 +11,10 @@ public class ProducerPool implements Closeable { ...@@ -11,7 +11,10 @@ public class ProducerPool implements Closeable {
private CommonProducer[] pool; private CommonProducer[] pool;
private int threadNum = 15; private int threadNum = 15;
// 轮循id
/**
* 轮循id
*/
private int index = 0; private int index = 0;
private static ProducerPool producerInstance = null; private static ProducerPool producerInstance = null;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment