Commit d97a6281 authored by DeleMing's avatar DeleMing

<dev>

1. 修改代码
parent 21ec0353
package com.zorkdta.tools.mock;
import com.zorkdta.tools.avro.AvroSerializerFactory;
import lombok.Data;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.login.Configuration;
import java.util.Map;
import java.util.Properties;
/**
* @author shaojiao
* Date: 2020/1/8
* Time: 10:12
* Description: 当前kafkaUtil适用于1.1.0版本
*/
@Data
public class KafkaProducerUtil {
private Logger log = LoggerFactory.getLogger(KafkaProducerUtil.class);
/**
* kafka相关代码
*/
private String kafkaBootstrapServers;
private Integer kafkaBatchSize;
/**
* kafka的SASL的验证
*/
private boolean kafkaSaslFlag;
private String kafkaSaslAppkey;
private String kafkaSaslSecretkey;
private String kafkaSaslMechanism;
/**
* kafka的kerberos验证
*/
private boolean kafkaKerberosFlag;
private String kafkaKerberosKrb5Conf;
private String kafkaKerberosJaasConf;
private String kafkaSecurityProtocol;
private String kafkaSaslKerberosServiceName;
private static KafkaProducer<String, byte[]> producer;
private static KafkaProducer<String, String> noAvroProducer;
public KafkaProducerUtil(String kafkaBootstrapServers, Integer kafkaBatchSize){
this.kafkaBootstrapServers = kafkaBootstrapServers;
this.kafkaBatchSize = kafkaBatchSize;
this.kafkaSaslFlag = false;
this.kafkaKerberosFlag = false;
createKafkaClient();
}
public KafkaProducerUtil(String kafkaBootstrapServers, Integer kafkaBatchSize, boolean kafkaSaslFlag, String kafkaSecurityProtocol,
String kafkaSaslMechanism, String kafkaSaslAppkey, String kafkaSaslSecretkey){
this.kafkaBootstrapServers = kafkaBootstrapServers;
this.kafkaBatchSize = kafkaBatchSize;
this.kafkaSaslFlag = kafkaSaslFlag;
this.kafkaKerberosFlag = false;
this.kafkaSecurityProtocol= kafkaSecurityProtocol;
this.kafkaSaslMechanism = kafkaSaslMechanism;
this.kafkaSaslAppkey = kafkaSaslAppkey;
this.kafkaSaslSecretkey = kafkaSaslSecretkey;
createKafkaClient();
}
public KafkaProducerUtil(String kafkaBootstrapServers, Integer kafkaBatchSize, boolean kafkaKerberosFlag, String kafkaKerberosKrb5Conf,
String kafkaKerberosJaasConf, String kafkaSecurityProtocol, String kafkaSaslKerberosServiceName, String kafkaSaslMechanism){
this.kafkaBootstrapServers = kafkaBootstrapServers;
this.kafkaBatchSize = kafkaBatchSize;
this.kafkaSaslFlag = false;
this.kafkaKerberosFlag = kafkaKerberosFlag;
this.kafkaSecurityProtocol= kafkaSecurityProtocol;
this.kafkaSaslMechanism = kafkaSaslMechanism;
this.kafkaKerberosKrb5Conf = kafkaKerberosKrb5Conf;
this.kafkaKerberosJaasConf = kafkaKerberosJaasConf;
this.kafkaSaslKerberosServiceName = kafkaSaslKerberosServiceName;
createKafkaClient();
}
public KafkaProducerUtil(String kafkaBootstrapServers, Integer kafkaBatchSize,boolean kafkaSaslFlag, boolean kafkaKerberosFlag, String kafkaKerberosKrb5Conf,
String kafkaKerberosJaasConf, String kafkaSecurityProtocol, String kafkaSaslKerberosServiceName,
String kafkaSaslMechanism, String kafkaSaslAppkey, String kafkaSaslSecretkey){
this.kafkaBootstrapServers = kafkaBootstrapServers;
this.kafkaBatchSize = kafkaBatchSize;
this.kafkaSaslFlag = kafkaSaslFlag;
this.kafkaKerberosFlag = kafkaKerberosFlag;
this.kafkaSecurityProtocol= kafkaSecurityProtocol;
this.kafkaSaslMechanism = kafkaSaslMechanism;
this.kafkaKerberosKrb5Conf = kafkaKerberosKrb5Conf;
this.kafkaKerberosJaasConf = kafkaKerberosJaasConf;
this.kafkaSaslKerberosServiceName = kafkaSaslKerberosServiceName;
this.kafkaSaslAppkey = kafkaSaslAppkey;
this.kafkaSaslSecretkey = kafkaSaslSecretkey;
createKafkaClient();
}
public void createKafkaClient() {
try {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBootstrapServers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("batch.size", kafkaBatchSize);
if (kafkaSaslFlag) {
props.put("security.protocol", kafkaSecurityProtocol);
props.put("sasl.mechanism", kafkaSaslMechanism);
//Configuration.setConfiguration(new SaslConfig(kafkaSaslAppkey, kafkaSaslSecretkey));
}
if (kafkaKerberosFlag) {
System.setProperty("java.security.krb5.conf", kafkaKerberosKrb5Conf);
System.setProperty("java.security.auth.login.config", kafkaKerberosJaasConf);
props.put("security.protocol", kafkaSecurityProtocol);
props.put("sasl.kerberos.service.name", kafkaSaslKerberosServiceName);
props.put("sasl.mechanism", kafkaSaslMechanism);
}
producer = new KafkaProducer<String, byte[]>(props);
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
noAvroProducer = new KafkaProducer<String, String>(props);
} catch (Exception ex) {
ex.printStackTrace();
log.error("初始化Kafka失败,系统自动退出! ", ex);
System.exit(1);
}
}
public void sendAlarm(String topic, String alarmJson) {
try {
noAvroProducer.send(new ProducerRecord<String, String>(topic, null, alarmJson));
} catch (Exception e) {
log.error("sendAlarm-插入Kafka失败", e);
}
}
public void sendMetric(String metricSetName, String timestamp, Map<String, String> dimensions,
Map<String, Double> metrics, String topic) {
try {
byte[] bytes = AvroSerializerFactory.getMetricAvorSerializer().serializingMetric(metricSetName, timestamp,
dimensions, metrics);
producer.send(new ProducerRecord<String, byte[]>(topic, null, bytes));
} catch (Exception e) {
log.error("sendMetric-插入Kafka失败", e);
}
}
public void sendLog(String topic, String logTypeName, String timestamp, String source, String offset,
Map<String, String> dimensions, Map<String, Double> metrics, Map<String, String> normalFields) {
try {
byte[] bytes = AvroSerializerFactory.getLogAvorSerializer().serializingLog(logTypeName, timestamp, source,
offset, dimensions, metrics, normalFields);
producer.send(new ProducerRecord<String, byte[]>(topic, null, bytes));
} catch (Exception e) {
log.error("sendLog-插入Kafka失败", e);
}
}
}
...@@ -3,11 +3,13 @@ package com.zorkdta.tools.mock; ...@@ -3,11 +3,13 @@ package com.zorkdta.tools.mock;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Future;
/** /**
* 生产数据(功能正常) * 生产数据(功能正常)
...@@ -22,14 +24,14 @@ public class MockConnectJsonData { ...@@ -22,14 +24,14 @@ public class MockConnectJsonData {
private static void init() { private static void init() {
Properties props = new Properties(); Properties props = new Properties();
props.put("bootstrap.servers", brokerlist); props.put("bootstrap.servers", brokerlist);
props.put("acks", "1"); props.put("acks", "-1");
props.put("retries", 0); props.put("retries", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName());
props.put("batch.size", 16384); props.put("batch.size", 16384);
props.put("linger.ms", 1); props.put("linger.ms", 1);
props.put("buffer.memory", 33554432); props.put("buffer.memory", 33554432);
producer = new KafkaProducer<String, String>(props); producer = new KafkaProducer<>(props);
} }
/** /**
...@@ -50,7 +52,7 @@ public class MockConnectJsonData { ...@@ -50,7 +52,7 @@ public class MockConnectJsonData {
private static String buildMsg() { private static String buildMsg() {
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
jsonObject.put("logtypename", "tdx_filebeat"); jsonObject.put("logtypename", "tdx_filebeat");
jsonObject.put("hostname", "kafka-connect-2"); jsonObject.put("hostname", "kafkaproducer-connect-2");
jsonObject.put("appprogram", "tdx"); jsonObject.put("appprogram", "tdx");
jsonObject.put("offset", String.valueOf(System.currentTimeMillis())); jsonObject.put("offset", String.valueOf(System.currentTimeMillis()));
jsonObject.put("message", "10:06:41.335 功能请求 IP:182.140.129.3 MAC:F8A963586DFF 线程:00004364 通道ID:4 事务ID:16 请求:(0-98)集成客户校验(*) 营业部:(0001)国金证券集中交易(*)\\n66650109|************|XshR9/S5SDE=|8|0||12|7.37.0||||||||||0||0|182.140.129.3;PENGKANG;Administrator;83025;Intel(R)Core(TM)i7-4510UCPU@2.00GHz*4;bfebfbff00040651-GenuineIntel;Windows7 Service Pack 1 (Build 7601);182.140.129.3,0.0.0.0,0.0.0.0;F8A963586DFF,00FF8C535532,A0A8CD0D00B0;TF655AWJ16NG2L,143116404707;07/15/2014;8DC03929-0822-453C-A2D5-EFBE95E359BE;182.140.129.3;;NTFS;0C17-8FD7;C:;113G;HTS725050A7E630;GH2Z;TF655AWJ16NG2L;|||||2,Mar 1 2018,10:22:32|0|||GETLOGINPARAM||7.37,6.01,Mar 1 2018,10:37:07|8噝\\\\5\\\\3||||\\n10:06:41.491 调用失败 IP:182.140.129.3 MAC:F8A963586DFF 线程:00004364 通道ID:4 事务ID:16 请求:(0-98)集成客户校验(*) 营业部:(0001)国金证券集中交易(*) 耗时A:156 耗时B:0 排队:0\\n-4|资金账号或密码错误!|0|||\\n10:06:52.678 系统信息 开始关闭交易中心服务。\\n10:06:53.303 系统信息 (HS_TCP2.dll)连接守护线程退出!\\n10:06:53.335 系统信息 (HS_TCP2.dll)\\\"刷新约定购回标的证券信息\\\"线程成功退出!(记录总条数:3536)\\n10:06:54.413 系统信息 港股行情服务: 保存代码表(港股)缓存。\\n10:06:54.678 系统信息 深沪行情服务: 保存代码表缓存。\\n10:06:54.960 系统信息 交易中心服务已经成功关闭。\\n10:06:54.960 系统信息 系统正常关闭\\n"); jsonObject.put("message", "10:06:41.335 功能请求 IP:182.140.129.3 MAC:F8A963586DFF 线程:00004364 通道ID:4 事务ID:16 请求:(0-98)集成客户校验(*) 营业部:(0001)国金证券集中交易(*)\\n66650109|************|XshR9/S5SDE=|8|0||12|7.37.0||||||||||0||0|182.140.129.3;PENGKANG;Administrator;83025;Intel(R)Core(TM)i7-4510UCPU@2.00GHz*4;bfebfbff00040651-GenuineIntel;Windows7 Service Pack 1 (Build 7601);182.140.129.3,0.0.0.0,0.0.0.0;F8A963586DFF,00FF8C535532,A0A8CD0D00B0;TF655AWJ16NG2L,143116404707;07/15/2014;8DC03929-0822-453C-A2D5-EFBE95E359BE;182.140.129.3;;NTFS;0C17-8FD7;C:;113G;HTS725050A7E630;GH2Z;TF655AWJ16NG2L;|||||2,Mar 1 2018,10:22:32|0|||GETLOGINPARAM||7.37,6.01,Mar 1 2018,10:37:07|8噝\\\\5\\\\3||||\\n10:06:41.491 调用失败 IP:182.140.129.3 MAC:F8A963586DFF 线程:00004364 通道ID:4 事务ID:16 请求:(0-98)集成客户校验(*) 营业部:(0001)国金证券集中交易(*) 耗时A:156 耗时B:0 排队:0\\n-4|资金账号或密码错误!|0|||\\n10:06:52.678 系统信息 开始关闭交易中心服务。\\n10:06:53.303 系统信息 (HS_TCP2.dll)连接守护线程退出!\\n10:06:53.335 系统信息 (HS_TCP2.dll)\\\"刷新约定购回标的证券信息\\\"线程成功退出!(记录总条数:3536)\\n10:06:54.413 系统信息 港股行情服务: 保存代码表(港股)缓存。\\n10:06:54.678 系统信息 深沪行情服务: 保存代码表缓存。\\n10:06:54.960 系统信息 交易中心服务已经成功关闭。\\n10:06:54.960 系统信息 系统正常关闭\\n");
...@@ -64,18 +66,18 @@ public class MockConnectJsonData { ...@@ -64,18 +66,18 @@ public class MockConnectJsonData {
} }
private static void send(String message) { private static void send(String message) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, null, message); ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, null, message);
producer.send(producerRecord); producer.send(producerRecord);
} }
public static void main(String[] args) { public static void main(String[] args) {
topic = "tdx3"; topic = "test";
brokerlist = "kafka-1:19092,kafka-2:19092,kafka-3:19092"; brokerlist = "kafka01:9092,kafka02:9092,kafka03:9092";
init(); init();
for (int i = 0; i <= 10000; i++) { for (int i = 0; i <= 10000; i++) {
String message = buildMsg(); String message = "" + i;
send(message); send(message);
} }
} }
......
...@@ -2,7 +2,6 @@ package com.zorkdta.tools.mock; ...@@ -2,7 +2,6 @@ package com.zorkdta.tools.mock;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zorkdta.tools.utils.*; import com.zorkdta.tools.utils.*;
import top.xiesen.mock.kafka.utils.*;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
......
...@@ -6,7 +6,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; ...@@ -6,7 +6,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import com.zorkdta.tools.avro.AvroSerializer; import com.zorkdta.tools.avro.AvroSerializer;
import com.zorkdta.tools.avro.AvroSerializerFactory; import com.zorkdta.tools.avro.AvroSerializerFactory;
import top.xiesen.mock.kafka.utils.*;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
......
...@@ -5,7 +5,6 @@ import com.zorkdta.tools.utils.CustomerProducer; ...@@ -5,7 +5,6 @@ import com.zorkdta.tools.utils.CustomerProducer;
import com.zorkdta.tools.utils.ProducerPool; import com.zorkdta.tools.utils.ProducerPool;
import com.zorkdta.tools.utils.PropertiesUtil; import com.zorkdta.tools.utils.PropertiesUtil;
import com.zorkdta.tools.utils.StringUtil; import com.zorkdta.tools.utils.StringUtil;
import top.xiesen.mock.kafka.utils.*;
import java.util.Properties; import java.util.Properties;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment