Commit 8873bbfa authored by DeleMing's avatar DeleMing

更新代码结构

parent 9bfb7459
package com.zorkdata.tools.kafka;
import com.zorkdata.tools.oldkafka.AvroSerializerFactory;
import com.zorkdata.tools.oldkafka.Config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Map;
import java.util.Properties;
/**
......@@ -8,8 +14,87 @@ import org.apache.kafka.clients.producer.KafkaProducer;
*/
public class CommonProducer {
private static String kafkaServer;
private static int kafkaBathSize;
private static KafkaProducer<String, byte[]> producerByte;
private static KafkaProducer<String, String> producerString;
public CommonProducer() {
try {
initConfig();
Properties props = new Properties();
props.put("bootstrap.servers", kafkaServer);
props.put("client.id", "webAPI4LogGather");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("batch.size", kafkaBathSize);
producerByte = new KafkaProducer<String, byte[]>(props);
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerString = new KafkaProducer<String, String>(props);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public void initConfig() {
kafkaServer = Config.getInstance().kafkaServers;
kafkaBathSize = Config.getInstance().kafkaBathSize;
}
public void sendLog(String topic, String logTypeName, String timestamp, String source, String offset,
Map<String, String> dimensions, Map<String, Double> metrics, Map<String, String> normalFields) {
try {
byte[] bytes = AvroSerializerFactory.getLogAvorSerializer().serializingLog(logTypeName, timestamp, source,
offset, dimensions, metrics, normalFields);
producerByte.send(new ProducerRecord<String, byte[]>(topic, null, bytes));
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendLog(String topic, String logJson) {
try {
producerString.send(new ProducerRecord<String, String>(topic, null, logJson));
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendErrorLog(String errorLogTopic, String logJson) {
try {
producerString.send(new ProducerRecord<String, String>(errorLogTopic, null, logJson));
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendErrorMetric(String errorMetricTopic, String logJson) {
try {
producerString.send(new ProducerRecord<String, String>(errorMetricTopic, null, logJson));
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendAlarm(String alarmTopic, String alarmJson) {
try {
producerString.send(new ProducerRecord<String, String>(alarmTopic, null, alarmJson));
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendMetric(String metricTopic, String metricSetName, String timestamp, Map<String, String> dimensions,
Map<String, Double> metrics) {
try {
byte[] bytes = AvroSerializerFactory.getMetricAvorSerializer().serializingMetric(metricSetName, timestamp,
dimensions, metrics);
producerByte.send(new ProducerRecord<String, byte[]>(metricTopic, null, bytes));
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.zorkdata.tools.kafka;
import java.io.Closeable;
import java.io.IOException;
/**
* @author: LiaoMingtao
* @date: 2020/6/29
*/
public class CommonProducerPool implements Closeable {
private CommonProducer[] pool;
private int threadNum = 15;
/**
* 轮循id
*/
private int index = 0;
private static CommonProducerPool producerInstance = null;
public static CommonProducerPool getInstance() {
if (producerInstance == null) {
producerInstance = new CommonProducerPool();
}
return CommonProducerPool.producerInstance;
}
private CommonProducerPool() {
init();
}
public void init() {
pool = new CommonProducer[threadNum];
for (int i = 0; i < threadNum; i++) {
pool[i] = new CommonProducer();
}
}
public CommonProducer getProducer() {
if (index > 65535) {
index = 0;
}
return pool[index++ % threadNum];
}
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
*
* <p> As noted in {@link AutoCloseable#close()}, cases where the
* close may fail require careful attention. It is strongly advised
* to relinquish the underlying resources and to internally
* <em>mark</em> the {@code Closeable} as closed, prior to throwing
* the {@code IOException}.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {
}
}
......@@ -17,16 +17,16 @@ public class Config {
return instance;
}
private String kafkaServers;
private String kafkaBathSize;
private String topicName;
public String kafkaServers;
public int kafkaBathSize;
public String topicName;
private Config() {
try {
Properties properties = PropertiesUtil.getProperties("/config.properties");
kafkaServers = properties.getProperty("kafka.servers");
kafkaBathSize = properties.getProperty("kafka.batch.size");
kafkaBathSize = Integer.parseInt(properties.getProperty("kafka.batch.size"));
topicName = properties.getProperty("kafka.topic.name");
} catch (Exception e) {
e.printStackTrace();
......
......@@ -58,7 +58,7 @@ public class MockKafkaConnectAvro {
System.out.println(sum(logTypeName, timestamp, source, offset, dimensions, measures, normalFields));
// System.out.println("--------------------- start ----------------------------");
// long l1 = System.currentTimeMillis();
CustomerProducer producer = ProducerPool.getInstance(propertiesName).getProducer();
CommonProducer producer = ProducerPool.getInstance(propertiesName).getProducer();
// long l2 = System.currentTimeMillis();
// System.out.println("获取 producer 需要的时间: " + (l2 - l1) + "ms");
producer.sendLog(logTypeName, timestamp, source, offset, dimensions, measures, normalFields);
......
package com.zorkdata.tools.oldmock;
import com.alibaba.fastjson.JSONObject;
import com.zorkdata.tools.utils.CustomerProducer;
import com.zorkdata.tools.utils.CommonProducer;
import com.zorkdata.tools.utils.ProducerPool;
import com.zorkdata.tools.utils.PropertiesUtil;
import com.zorkdata.tools.utils.StringUtil;
......@@ -47,7 +47,7 @@ public class MockKafkaConnectJson {
jsonObject.put("message", message);
int length = jsonObject.toJSONString().length();
System.out.println(length);
CustomerProducer producer = ProducerPool.getInstance(propertiesName).getProducer();
CommonProducer producer = ProducerPool.getInstance(propertiesName).getProducer();
producer.sendJsonLog(jsonObject.toJSONString());
}
long end = System.currentTimeMillis();
......
......@@ -17,24 +17,24 @@ import java.util.concurrent.ExecutionException;
* @Date 2020/4/2 9:39
*/
@Slf4j
public class CustomerProducer {
public class CommonProducer {
static String servers = "kafka-1:9092,kafka-2:9092,kafka-3:9092";
static int batchSize = 1;
static CustomerProducer testProducer;
static CommonProducer testProducer;
static String topics;
public static long logSize;
private static KafkaProducer<String, byte[]> producer;
private static KafkaProducer<String, String> noAvroProducer;
public static synchronized CustomerProducer getInstance(String propertiesName) {
public static synchronized CommonProducer getInstance(String propertiesName) {
if (testProducer == null) {
testProducer = new CustomerProducer(propertiesName);
testProducer = new CommonProducer(propertiesName);
}
return testProducer;
}
public CustomerProducer(String propertiesName) {
public CommonProducer(String propertiesName) {
try {
initConfig(propertiesName);
Properties props = new Properties();
......
......@@ -8,7 +8,7 @@ import java.io.IOException;
*/
public class ProducerPool implements Closeable {
private CustomerProducer[] pool;
private CommonProducer[] pool;
private int threadNum = 15;
// 轮循id
......@@ -29,13 +29,13 @@ public class ProducerPool implements Closeable {
public void init(String propertiesName) {
pool = new CustomerProducer[threadNum];
pool = new CommonProducer[threadNum];
for (int i = 0; i < threadNum; i++) {
pool[i] = new CustomerProducer(propertiesName);
pool[i] = new CommonProducer(propertiesName);
}
}
public CustomerProducer getProducer() {
public CommonProducer getProducer() {
if (index > 65535) {
index = 0;
}
......
kafka.servers=kafka01:9092,kafka02:9092,kafka03:9092
kafka.batch.size=100000
kafka.topic.name=a
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
#value.serializer=org.apache.kafka.common.serialization.StringSerializer
log.size = 10000
log.topic=flinkx
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment