Commit df164b1f authored by DeleMing's avatar DeleMing

优化代码

parent 869f5bea
...@@ -20,19 +20,6 @@ ...@@ -20,19 +20,6 @@
<enabled>true</enabled> <enabled>true</enabled>
</snapshots> </snapshots>
</repository> </repository>
<!-- <repository>
<id>oss</id>
<name>oss</name>
<url>https://oss.sonatype.org/content/groups/public</url>
</repository>-->
<!-- <repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>-->
</repositories> </repositories>
<pluginRepositories> <pluginRepositories>
<pluginRepository> <pluginRepository>
...@@ -79,18 +66,6 @@ ...@@ -79,18 +66,6 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!--<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.2</version>
</dependency>-->
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId> <artifactId>kafka_2.11</artifactId>
......
package com.zorkdata.tools.load; package com.zorkdata.tools.load;
import com.zorkdata.tools.kafka.Config; import com.zorkdata.tools.mock.MockFilebeatData;
/** /**
* @author: LiaoMingtao * @author: LiaoMingtao
...@@ -9,6 +9,12 @@ import com.zorkdata.tools.kafka.Config; ...@@ -9,6 +9,12 @@ import com.zorkdata.tools.kafka.Config;
public class Test { public class Test {
public static void main(String[] args) { public static void main(String[] args) {
System.out.println(Config.INSTANCE.getKafkaServers()); System.out.println(MockFilebeatData.buildMsg().getBytes().length);
StringBuilder a = new StringBuilder();
String strs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
for (int i =0; i < 345; i++) {
a.append(strs.charAt((int)(Math.random() * 26)));
}
System.out.println(a.toString());
} }
} }
package com.zorkdata.tools.mock; package com.zorkdata.tools.mock;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zorkdata.tools.kafka.CommonProducer;
import com.zorkdata.tools.kafka.CommonProducerPool;
import com.zorkdata.tools.oldkafka.Producer; import com.zorkdata.tools.oldkafka.Producer;
import com.zorkdata.tools.oldkafka.ProducerPool; import com.zorkdata.tools.oldkafka.ProducerPool;
...@@ -15,7 +17,7 @@ import static java.lang.System.currentTimeMillis; ...@@ -15,7 +17,7 @@ import static java.lang.System.currentTimeMillis;
*/ */
public class MockFilebeatData { public class MockFilebeatData {
private static String buildMsg() { public static String buildMsg() {
JSONObject filebeatJson = new JSONObject(); JSONObject filebeatJson = new JSONObject();
JSONObject metadataJson = new JSONObject(); JSONObject metadataJson = new JSONObject();
metadataJson.put("beat", "filebeat"); metadataJson.put("beat", "filebeat");
...@@ -37,15 +39,21 @@ public class MockFilebeatData { ...@@ -37,15 +39,21 @@ public class MockFilebeatData {
filebeatJson.put("@timestamp", Instant.now().toString()); filebeatJson.put("@timestamp", Instant.now().toString());
filebeatJson.put("source", "/var/log/nginx/access.log"); filebeatJson.put("source", "/var/log/nginx/access.log");
filebeatJson.put("offset", String.valueOf(currentTimeMillis())); filebeatJson.put("offset", String.valueOf(currentTimeMillis()));
filebeatJson.put("message", "10:06:41.335 功能请求 IP:182.140.129.3 MAC:F8A963586DFF 线程:00004364 通道ID:4 事务ID:16 请求:(0-98)集成客户校验(*) 营业部:(0001)国金证券集中交易(*)\\n66650109|************|XshR9/S5SDE=|8|0||12|7.37.0||||||||||0||0|182.140.129.3;PENGKANG;Administrator;83025;Intel(R)Core(TM)i7-4510UCPU@2.00GHz*4;bfebfbff00040651-GenuineIntel;Windows7 Service Pack 1 (Build 7601);182.140.129.3,0.0.0.0,0.0.0.0;F8A963586DFF,00FF8C535532,A0A8CD0D00B0;TF655AWJ16NG2L,143116404707;07/15/2014;8DC03929-0822-453C-A2D5-EFBE95E359BE;182.140.129.3;;NTFS;0C17-8FD7;C:;113G;HTS725050A7E630;GH2Z;TF655AWJ16NG2L;|||||2,Mar 1 2018,10:22:32|0|||GETLOGINPARAM||7.37,6.01,Mar 1 2018,10:37:07|8噝\\\\\\\\5\\\\\\\\3||||\\\\n10:06:41.491 调用失败 IP:182.140.129.3 MAC:F8A963586DFF 线程:00004364 通道ID:4 事务ID:16 请求:(0-98)集成客户校验(*) 营业部:(0001)国金证券集中交易(*) 耗时A:156 耗时B:0 排队:0\\\\n-4|资金账号或密码错误!|0|||\\\\n10:06:52.678 系统信息 开始关闭交易中心服务。\\\\n10:06:53.303 系统信息 (HS_TCP2.dll)连接守护线程退出!\\\\n10:06:53.335 系统信息 (HS_TCP2.dll)\\\\\\\"刷新约定购回标的证券信息\\\\\\\"线程成功退出!(记录总条数:3536)\\\\n10:06:54.413 系统信息 港股行情服务: 保存代码表(港股)缓存。\\\\n10:06:54.678 系统信息 深沪行情服务: 保存代码表缓存。\\\\n10:06:54.960 系统信息 交易中心服务已经成功关闭。\\\\n10:06:54.960 系统信息 系统正常关闭\\\\n");
StringBuilder message = new StringBuilder();
String strs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
for (int i =0; i < 345; i++) {
message.append(strs.charAt((int)(Math.random() * 26)));
}
filebeatJson.put("message", message);
Random random = new Random(); Random random = new Random();
int i = random.nextInt(10); int i = random.nextInt(10);
filebeatJson.put("appsystem", "test_appsystem" + i * 2); filebeatJson.put("appsystem", "test_appsystem" + i);
filebeatJson.put("appprogramname", "test_appprogramname" + i * 3); filebeatJson.put("appprogramname", "test_appprogramname" + i );
filebeatJson.put("logTypeName", "test_topic_log" + i * 4); filebeatJson.put("logTypeName", "test_topic_log" + i) ;
filebeatJson.put("servicename", "test_servicename" + i * 5); filebeatJson.put("servicename", "test_servicename" + i);
filebeatJson.put("servicecode", "test_cdde" + i * 6); filebeatJson.put("servicecode", "test_cdde" + i );
filebeatJson.put("collectorruleid", "1"); filebeatJson.put("collectorruleid", "1");
filebeatJson.put("@metadata", metadataJson); filebeatJson.put("@metadata", metadataJson);
...@@ -59,13 +67,14 @@ public class MockFilebeatData { ...@@ -59,13 +67,14 @@ public class MockFilebeatData {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
long size = 10000000L * 10; // long size = 10000000L * 10;
// long size = 1; String topicName = "test";
long size = 10;
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
String json = buildMsg(); String json = buildMsg();
Producer producer = ProducerPool.getInstance().getProducer(); CommonProducer producer = CommonProducerPool.getInstance().getProducer();
producer.sendLog("analysis1y", json); producer.sendJson(topicName, json);
} }
Thread.sleep(1000); Thread.sleep(1000);
} }
......
kafka.servers=kafka01:9092,kafka02:9092,kafka03:9092 kafka.servers=kafka01:9092
# ,kafka02:9092,kafka03:9092
kafka.batch.size=100000 kafka.batch.size=100000
kafka.topic.name=a kafka.topic.name=a
key.serializer=org.apache.kafka.common.serialization.StringSerializer key.serializer=org.apache.kafka.common.serialization.StringSerializer
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment