Commit 52558dee authored by DeleMing's avatar DeleMing

<dev>

1. mock
parent df164b1f
...@@ -68,10 +68,22 @@ ...@@ -68,10 +68,22 @@
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId> <artifactId>kafka-clients</artifactId>
<version>1.1.1</version> <version>0.8.2.2</version>
</dependency> </dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.kafka</groupId>-->
<!-- <artifactId>kafka_2.11</artifactId>-->
<!-- <version>0.8.2.1</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.kafka</groupId>-->
<!-- <artifactId>kafka_2.11</artifactId>-->
<!-- <version>1.1.1</version>-->
<!-- </dependency>-->
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId> <artifactId>fastjson</artifactId>
...@@ -119,7 +131,7 @@ ...@@ -119,7 +131,7 @@
<!--生成的manifest中classpath的前缀,因为要把第三方jar放到lib目录下,所以classpath的前缀是lib/--> <!--生成的manifest中classpath的前缀,因为要把第三方jar放到lib目录下,所以classpath的前缀是lib/-->
<classpathPrefix>lib/</classpathPrefix> <classpathPrefix>lib/</classpathPrefix>
<!-- 应用的main class--> <!-- 应用的main class-->
<mainClass>com.zorkdata.tools.oldmock.MockStreamxLogAvro</mainClass> <mainClass>com.zorkdata.tools.mock.MockLogMergeData</mainClass>
</manifest> </manifest>
</archive> </archive>
<!-- 过滤掉不希望包含在jar中的文件--> <!-- 过滤掉不希望包含在jar中的文件-->
......
...@@ -2,7 +2,9 @@ package com.zorkdata.tools.kafka; ...@@ -2,7 +2,9 @@ package com.zorkdata.tools.kafka;
import com.zorkdata.tools.oldkafka.AvroSerializerFactory; import com.zorkdata.tools.oldkafka.AvroSerializerFactory;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -33,7 +35,6 @@ public class CommonProducer { ...@@ -33,7 +35,6 @@ public class CommonProducer {
initConfig(); initConfig();
Properties props = new Properties(); Properties props = new Properties();
props.put("bootstrap.servers", kafkaServer); props.put("bootstrap.servers", kafkaServer);
// props.put("client.id", "webAPI4LogGather"); 不自定义client.id,使用自增 // props.put("client.id", "webAPI4LogGather"); 不自定义client.id,使用自增
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
......
...@@ -14,6 +14,7 @@ public class CommonProducerPool implements Closeable { ...@@ -14,6 +14,7 @@ public class CommonProducerPool implements Closeable {
private int threadNum = 15; private int threadNum = 15;
/** /**
* 轮循id * 轮循id
*/ */
...@@ -22,6 +23,7 @@ public class CommonProducerPool implements Closeable { ...@@ -22,6 +23,7 @@ public class CommonProducerPool implements Closeable {
private static CommonProducerPool producerInstance; private static CommonProducerPool producerInstance;
public static CommonProducerPool getInstance() { public static CommonProducerPool getInstance() {
if (producerInstance == null) { if (producerInstance == null) {
producerInstance = new CommonProducerPool(); producerInstance = new CommonProducerPool();
} }
......
...@@ -15,16 +15,16 @@ public class Config { ...@@ -15,16 +15,16 @@ public class Config {
private static final Logger logger = LoggerFactory.getLogger(Config.class); private static final Logger logger = LoggerFactory.getLogger(Config.class);
private String kafkaServers; public String kafkaServers;
protected int kafkaBathSize; public int kafkaBathSize;
protected String topicName; public String topicName;
public static final Config INSTANCE = new Config(); public static final Config INSTANCE = new Config();
/** /**
* 读取配置文件 * 读取配置文件
*/ */
private Config() { public Config() {
try { try {
Properties properties = PropertiesUtil.getProperties("/config.properties"); Properties properties = PropertiesUtil.getProperties("/config.properties");
kafkaServers = properties.getProperty("kafka.servers"); kafkaServers = properties.getProperty("kafka.servers");
......
package com.zorkdata.tools.mock;
import org.joda.time.DateTime;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author 谢森
* @Description 时间处理工具类
* @Email xiesen@zork.com.cn
*/
public class DateUtil {
private static DateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
private static ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy.MM.dd");
}
};
private static ThreadLocal<SimpleDateFormat> utcSdf = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
}
};
private static ThreadLocal<SimpleDateFormat> utcSdf1 = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
}
};
private static ThreadLocal<SimpleDateFormat> utcSdf2 = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
}
};
private static ThreadLocal<SimpleDateFormat> utcSdf3 = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
}
};
public static Long timestamp(String timestamp) {
return new DateTime(timestamp).toDate().getTime();
}
public static String format(String timestamp) throws ParseException {
return sdf.get().format(new DateTime(timestamp).toDate());
}
public static Long utcDate2Timestamp(String utcDateStr) throws ParseException {
return utcSdf1.get().parse(utcDateStr).getTime();
}
public static Long utcDate2Timestamp2(String utcDateStr) throws ParseException {
return utcSdf2.get().parse(utcDateStr).getTime();
}
public static Long utcDate2Timestamp3(String utcDateStr) throws ParseException {
return utcSdf3.get().parse(utcDateStr).getTime();
}
public static Long utcDate2Timestamp1(String utcDateStr) {
try {
return utcSdf.get().parse(utcDateStr).getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return 0L;
}
public static String getUTCTimeStr() {
return format.format(new Date()).toString();
}
public static String getUTCTimeStr(long interval) {
long currentTimeMillis = System.currentTimeMillis();
return format.format(new Date(currentTimeMillis + interval)).toString();
}
}
...@@ -3,8 +3,6 @@ package com.zorkdata.tools.mock; ...@@ -3,8 +3,6 @@ package com.zorkdata.tools.mock;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zorkdata.tools.kafka.CommonProducer; import com.zorkdata.tools.kafka.CommonProducer;
import com.zorkdata.tools.kafka.CommonProducerPool; import com.zorkdata.tools.kafka.CommonProducerPool;
import com.zorkdata.tools.oldkafka.Producer;
import com.zorkdata.tools.oldkafka.ProducerPool;
import java.time.Instant; import java.time.Instant;
import java.util.Random; import java.util.Random;
...@@ -72,9 +70,9 @@ public class MockFilebeatData { ...@@ -72,9 +70,9 @@ public class MockFilebeatData {
long size = 10; long size = 10;
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
String json = buildMsg(); String json = buildMsg();
System.out.println(json);
CommonProducer producer = CommonProducerPool.getInstance().getProducer(); // CommonProducer producer = CommonProducerPool.getInstance().getProducer();
producer.sendJson(topicName, json); // producer.sendJson(topicName, json);
} }
Thread.sleep(1000); Thread.sleep(1000);
} }
......
This diff is collapsed.
package com.zorkdata.tools.mock; package com.zorkdata.tools.mock;
import com.zorkdata.tools.kafka.CommonProducer;
import com.zorkdata.tools.kafka.CommonProducerPool;
import com.zorkdata.tools.utils.DateUtil; import com.zorkdata.tools.utils.DateUtil;
import org.mortbay.util.ajax.JSON;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
...@@ -59,9 +58,19 @@ public class MockStreamxLogAvro { ...@@ -59,9 +58,19 @@ public class MockStreamxLogAvro {
Map<String, String> dimensions = getRandomDimensions(); Map<String, String> dimensions = getRandomDimensions();
Map<String, Double> measures = new HashMap<>(1); Map<String, Double> measures = new HashMap<>(1);
Map<String, String> normalFields = getRandomNormalFields(); Map<String, String> normalFields = getRandomNormalFields();
CommonProducer producer = CommonProducerPool.getInstance().getProducer();
producer.sendLogAvro(topicName, logTypeName, timestamp, source, offset, dimensions, Map<String, Object> map = new HashMap<>();
measures, normalFields); map.put("logTypeName", logTypeName);
map.put("timestamp", timestamp);
map.put("source", source);
map.put("offset", offset);
map.put("dimensions", dimensions);
map.put("measures", measures);
map.put("normalFields", normalFields);
System.out.println(JSON.toString(map).getBytes().length);
// CommonProducer producer = CommonProducerPool.getInstance().getProducer();
// producer.sendLogAvro(topicName, logTypeName, timestamp, source, offset, dimensions,
// measures, normalFields);
} }
Thread.sleep(1000); Thread.sleep(1000);
} }
......
...@@ -65,7 +65,7 @@ public class CommonProducer { ...@@ -65,7 +65,7 @@ public class CommonProducer {
/** /**
* 当 producer 发送消息到 broker 时,broker 需要在规定的时间内返回结果,这个时间就是该参数控制的,默认是 30s * 当 producer 发送消息到 broker 时,broker 需要在规定的时间内返回结果,这个时间就是该参数控制的,默认是 30s
*/ */
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); // props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
/** /**
* 指定了producer 端用于缓存的缓存区大小,单位是字节,默认是 33554432, 即 32G * 指定了producer 端用于缓存的缓存区大小,单位是字节,默认是 33554432, 即 32G
......
package com.zorkdata.tools.utils; // package com.zorkdata.tools.utils;
//
import org.apache.kafka.clients.consumer.ConsumerConfig; // import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord; // import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; // import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; // import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition; // import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer; // import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger; // import org.slf4j.Logger;
import org.slf4j.LoggerFactory; // import org.slf4j.LoggerFactory;
//
import java.util.Arrays; // import java.util.Arrays;
import java.util.List; // import java.util.List;
import java.util.Properties; // import java.util.Properties;
import java.util.concurrent.ExecutorService; // import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; // import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; // import java.util.concurrent.TimeUnit;
//
//
/** // /**
* @author DeleMing // * @author DeleMing
*/ // */
public class JConsumerMutil { // public class JConsumerMutil {
private final static Logger log = LoggerFactory.getLogger(JConsumerMutil.class); // private final static Logger log = LoggerFactory.getLogger(JConsumerMutil.class);
private final KafkaConsumer<String, String> consumer; // private final KafkaConsumer<String, String> consumer;
private ExecutorService executorService; // private ExecutorService executorService;
//
public JConsumerMutil() { // public JConsumerMutil() {
Properties props = new Properties(); // Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop:9092"); // props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "ke1"); // props.put(ConsumerConfig.GROUP_ID_CONFIG, "ke1");
// 开启自动提交 // // 开启自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//
// 自动提交的间隔时间 // // 自动提交的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//
consumer = new KafkaConsumer<String, String>(props); // consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test_kafka_game_x")); // consumer.subscribe(Arrays.asList("test_kafka_game_x"));
} // }
//
public void execute() { // public void execute() {
// 初始化线程池 // // 初始化线程池
executorService = Executors.newFixedThreadPool(1); // executorService = Executors.newFixedThreadPool(1);
while (true) { // while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); // ConsumerRecords<String, String> records = consumer.poll(100);
if (null != records) { // if (null != records) {
executorService.submit(new KafkaConsumerThread(records, consumer)); // executorService.submit(new KafkaConsumerThread(records, consumer));
} // }
} // }
} // }
//
public void shutdown() { // public void shutdown() {
//
try { // try {
if (consumer != null) { // if (consumer != null) {
consumer.close(); // consumer.close();
} // }
//
if (executorService != null) { // if (executorService != null) {
executorService.shutdown(); // executorService.shutdown();
} // }
//
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { // if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
log.error("shutdown kafka consumer thread timeout."); // log.error("shutdown kafka consumer thread timeout.");
} // }
} catch (InterruptedException e) { // } catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Thread.currentThread().interrupt();
} // }
} // }
//
//
/** // /**
* 消费者线程实例 // * 消费者线程实例
*/ // */
class KafkaConsumerThread implements Runnable { // class KafkaConsumerThread implements Runnable {
//
private ConsumerRecords<String, String> records; // private ConsumerRecords<String, String> records;
//
public KafkaConsumerThread(ConsumerRecords<String, String> records, // public KafkaConsumerThread(ConsumerRecords<String, String> records,
KafkaConsumer<String, String> consumer) { // KafkaConsumer<String, String> consumer) {
this.records = records; // this.records = records;
} // }
//
@Override // @Override
public void run() { // public void run() {
for (TopicPartition partition : records.partitions()) { // for (TopicPartition partition : records.partitions()) {
// 获取消费记录数据集 // // 获取消费记录数据集
List<ConsumerRecord<String, String>> partitionRecords = this.records.records(partition); // List<ConsumerRecord<String, String>> partitionRecords = this.records.records(partition);
log.info("Thread Id : " + Thread.currentThread().getId()); // log.info("Thread Id : " + Thread.currentThread().getId());
for (ConsumerRecord<String, String> record : partitionRecords) { // for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println("offset =" + record.offset() + ", key=" + record.key() + ", value=" + record.value()); // System.out.println("offset =" + record.offset() + ", key=" + record.key() + ", value=" + record.value());
} // }
} // }
} // }
} // }
//
//
public static void main(String[] args) { // public static void main(String[] args) {
JConsumerMutil consumer = new JConsumerMutil(); // JConsumerMutil consumer = new JConsumerMutil();
try { // try {
consumer.execute(); // consumer.execute();
} catch (Exception e) { // } catch (Exception e) {
log.error("mutil consumer from kafka has error , msg is " + e.getMessage()); // log.error("mutil consumer from kafka has error , msg is " + e.getMessage());
consumer.shutdown(); // consumer.shutdown();
} // }
} // }
} // }
//
kafka.servers=kafka01:9092 #kafka.servers=kafka1:9092,kafka2:9092,kafka3:9092
kafka.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
#kafka.servers=kafka-1:19092,kafka-2:19092,kafka-3:19092
# ,kafka02:9092,kafka03:9092 # ,kafka02:9092,kafka03:9092
kafka.batch.size=100000 kafka.batch.size=1
kafka.topic.name=a kafka.topic.name=gutoai
key.serializer=org.apache.kafka.common.serialization.StringSerializer key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
#value.serializer=org.apache.kafka.common.serialization.StringSerializer #value.serializer=org.apache.kafka.common.serialization.StringSerializer
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment