Commit 21ec0353 authored by DeleMing's avatar DeleMing

<dev>

1. 初始化项目
parents
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4" />
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zorkdata</groupId>
<artifactId>mock-data</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<!-- Using Local Nexus Maven Repository -->
<repository>
<id>user-release</id>
<name>Nexus Repository</name>
<url>http://nexus.zorkdata.com/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<!-- <repository>
<id>oss</id>
<name>oss</name>
<url>https://oss.sonatype.org/content/groups/public</url>
</repository>-->
<!-- <repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>-->
</repositories>
<pluginRepositories>
<pluginRepository>
<id>nexus</id>
<name>Nexus Repository</name>
<url>http://nexus.zorkdata.com/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<!-- 设定团队持续集成发布包服务器 -->
<distributionManagement>
<repository>
<id>user-release</id>
<name>User Porject Release</name>
<url>http://nexus.zorkdata.com/repository/releases</url>
</repository>
<snapshotRepository>
<id>user-snapshot</id>
<name>User Porject Snapshot</name>
<url>http://nexus.zorkdata.com/repository/snapshots</url>
</snapshotRepository>
</distributionManagement>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.2</version>
</dependency>-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-tools</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<!-- get all project dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- MainClass in mainfest make a executable jar -->
<archive>
<manifest>
<mainClass>top.xiesen.mock.kafka.mock.MockKafkaConnectAvro</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.zorkdta.tools;
import com.alibaba.fastjson.JSON;
import com.zorkdta.tools.avro.AvroSerializer;
import com.zorkdta.tools.avro.AvroSerializerFactory;
import com.zorkdta.tools.pojo.ZorkData;
import com.zorkdta.tools.utils.DateUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MockProduct {
private static final Logger LOGGER = LoggerFactory.getLogger(MockProduct.class);
private static String topic = "test1";
private static String brokerAddr = "zork-poc103:9092";
private static ProducerRecord<String, byte[]> producerRecord = null;
private static KafkaProducer<String, byte[]> producer = null;
public static void init() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerAddr);
props.put("client.id", "test");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", ByteArraySerializer.class.getName());
props.put("batch.size", 1);
producer = new KafkaProducer<String, byte[]>(props);
}
public static byte[] buildZorkDataResp() {
Map<String, Object> event = new HashMap<String, Object>();
ZorkData zorkData = new ZorkData();
String logTypeName = "tc50_biz_filebeat";
String timestamp = DateUtil.getUTCTimeStr();
String source = "d:\\tc50\\log\\20191231.log";
String offset = String.valueOf(6322587L);
zorkData.setLogTypeName(logTypeName);
zorkData.setOffset(offset);
zorkData.setSource(source);
zorkData.setTimestamp(timestamp);
Map<String, String> dimensions = new HashMap<>();
dimensions.put("hostname", "ZVTDX-TC50223");
dimensions.put("appprogramname", "ZVTDX-TC50223_770");
dimensions.put("appsystem", "TXJY");
Map<String, Double> measures = new HashMap<>();
measures.put("latence", 301.0);
Map<String, String> normalFields = new HashMap<>();
normalFields.put("message", "成功处理");
zorkData.setDimensions(dimensions);
zorkData.setMeasures(measures);
zorkData.setNormalFields(normalFields);
String msg = JSON.toJSONString(zorkData);
System.out.println(msg);
AvroSerializer avroSerializer = AvroSerializerFactory.getLogAvorSerializer();
byte[] bytes = avroSerializer.serializingLog(logTypeName, timestamp, source, offset, dimensions, measures, normalFields);
return bytes;
}
public static byte[] buildZorkDataReq() {
Map<String, Object> event = new HashMap<String, Object>();
ZorkData zorkData = new ZorkData();
String logTypeName = "tc50_biz_filebeat_req";
String timestamp = DateUtil.getUTCTimeStr();
String source = "d:\\tc50\\log\\20191231.log";
String offset = String.valueOf(6322587L);
zorkData.setLogTypeName(logTypeName);
zorkData.setOffset(offset);
zorkData.setSource(source);
zorkData.setTimestamp(timestamp);
Map<String, String> dimensions = new HashMap<>();
dimensions.put("hostname", "ZVTDX-TC50223");
dimensions.put("appprogramname", "ZVTDX-TC50223_770");
dimensions.put("appsystem", "TXJY");
Map<String, Double> measures = new HashMap<>();
measures.put("latence", 301.0);
Map<String, String> normalFields = new HashMap<>();
normalFields.put("message", "成功处理");
zorkData.setDimensions(dimensions);
zorkData.setMeasures(measures);
zorkData.setNormalFields(normalFields);
String msg = JSON.toJSONString(zorkData);
System.out.println(msg);
AvroSerializer avroSerializer = AvroSerializerFactory.getLogAvorSerializer();
byte[] bytes = avroSerializer.serializingLog(logTypeName, timestamp, source, offset, dimensions, measures, normalFields);
return bytes;
}
public static void send() throws ExecutionException, InterruptedException {
init();
byte[] req = buildZorkDataReq();
producerRecord = new ProducerRecord<String, byte[]>(
topic,
null,
req
);
producer.send(producerRecord).get();
Thread.sleep(200);
byte[] resp = buildZorkDataResp();
producerRecord = new ProducerRecord<String, byte[]>(
topic,
null,
resp
);
producer.send(producerRecord).get();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
for (int i = 0 ; i < 3; i++) {
send();
}
}
}
package com.zorkdta.tools.avro;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* todo
*
* @version V1.0
* @Author XieSen
* @Date 2019/4/3 11:00.
*/
public class AvroDeserializer {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializer.class);
public JSONObject jsonObject;
public JSONArray jsonArray;
public Schema schema;
public String[] keys;
public AvroDeserializer(String schema) {
getKeysFromjson(schema);
}
/**
* @param schema:Avro序列化所使用的schema
* @return void 返回类型
* @throws
* @Title: getKeysFromjson
* @Description:用于获取Avro的keys
*/
void getKeysFromjson(String schema) {
this.jsonObject = JSONObject.parseObject(schema);
this.schema = new Schema.Parser().parse(schema);
this.jsonArray = this.jsonObject.getJSONArray("fields");
this.keys = new String[this.jsonArray.size()];
for (int i = 0; i < this.jsonArray.size(); i++) {
this.keys[i] = this.jsonArray.getJSONObject(i).get("name").toString();
}
}
/**
* @param body 参数:byte[] body:kafka消息。
* @param @return 设定文件
* @return String 返回类型
* @throws
* @Title: deserializing
* @Description: 用于Avro的反序列化。
*/
public GenericRecord deserializing(byte[] body) {
DatumReader<GenericData.Record> datumReader = new GenericDatumReader<GenericData.Record>(this.schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(body, null);
GenericData.Record result = null;
try {
result = datumReader.read(null, decoder);
} catch (Exception e) {
LOGGER.error(String.format("error Avro反序列化"), e);
}
return result;
}
}
package com.zorkdta.tools.avro;
/**
* todo
*
* @version V1.0
* @Author XieSen
* @Date 2019/4/3 11:01.
*/
public class AvroDeserializerFactory {
private static AvroDeserializer logs = null;
private static AvroDeserializer metrics = null;
public static void init() {
logs = null;
metrics = null;
}
/**
* getLogsDeserializer
*
* @return
*/
public static AvroDeserializer getLogsDeserializer() {
if (logs == null) {
logs = new AvroDeserializer(LogAvroMacroDef.metadata);
}
return logs;
}
/**
* getLogsDeserializer
*
* @return
*/
public static AvroDeserializer getMetricDeserializer() {
if (metrics == null) {
metrics = new AvroDeserializer(MetricAvroMacroDef.metadata);
}
return metrics;
}
}
package com.zorkdta.tools.avro;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.util.Utf8;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* todo
*
* @version V1.0
* @Author XieSen
* @Date 2019/4/3 11:01.
*/
public class AvroSerializer {
public JSONObject jsonObject;
public JSONArray jsonArray;
public Schema schema;
public List<String> filedsArrayList = new ArrayList<String>();
public AvroSerializer(String schema) {
getKeysFromjson(schema);
}
/**
* @param schema
* :Avro序列化所使用的schema
* @return void 返回类型
* @throws
* @Title: getKeysFromjson
* @Description:用于获取Avro的keys
*/
void getKeysFromjson(String schema) {
this.jsonObject = JSONObject.parseObject(schema);
this.schema = new Schema.Parser().parse(schema);
this.jsonArray = this.jsonObject.getJSONArray("fields");
if (filedsArrayList != null && filedsArrayList.size() > 0) {
filedsArrayList.clear();
}
for (int i = 0; i < this.jsonArray.size(); i++) {
filedsArrayList.add(this.jsonArray.getJSONObject(i).get("name").toString());
}
}
/**
* @param
* @param @return 设定文件
* @return String 返回类型
* @throws
* @Title: serializing
* @Description: 用于Avro的序列化。
*/
private synchronized byte[] serializing(List<String> temtuple) {
byte[] returnstr = null;
GenericRecord datum = new GenericData.Record(this.schema);
// 将数据加到datum中
for (int i = 0; i < filedsArrayList.size(); i++) {
datum.put(filedsArrayList.get(i), temtuple.get(i));
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter<GenericRecord> write = new GenericDatumWriter<GenericRecord>(this.schema);
// 然后由Encoder写到数据流。
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
try {
write.write(datum, encoder);
encoder.flush();
} catch (IOException e) {
System.out.println("序列化失败 " + e );
} finally {
if (out != null) {
try {
out.close();
} catch (IOException e) {
System.out.println("序列化失败" + e);
}
}
}
try {
returnstr = out.toByteArray();
} catch (Exception e) {
System.out.println("序列化失败" + e);
}
return returnstr;
}
/**
* 序列化json串
*
* @param json
* @return
*/
private synchronized byte[] serializing(String json) {
byte[] returnstr = null;
JSONObject jsonObject = (JSONObject) JSONObject.parse(json);// new TypeReference<Object>() {}
GenericRecord datum = new GenericData.Record(this.schema);
// 将数据加到datum中
for (int i = 0; i < filedsArrayList.size(); i++) {
datum.put(filedsArrayList.get(i), new Utf8(String.valueOf(jsonObject.get(filedsArrayList.get(i)))));
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter<GenericRecord> write = new GenericDatumWriter<GenericRecord>(this.schema);
// 然后由Encoder写到数据流。
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
try {
write.write(datum, encoder);
encoder.flush();
} catch (IOException e) {
System.out.println("序列化失败" + e);
} finally {
if (out != null) {
try {
out.close();
} catch (IOException e) {
System.out.println("序列化失败" + e);
}
}
}
try {
returnstr = out.toByteArray();
} catch (Exception e) {
System.out.println("序列化失败" + e);
}
return returnstr;
}
/**
* 序列化json对象
*
* @param jsonObject
* @return
*/
private synchronized byte[] serializing(JSONObject jsonObject) {
byte[] returnstr = null;
GenericRecord datum = new GenericData.Record(this.schema);
// 将数据加到datum中
for (int i = 0; i < filedsArrayList.size(); i++) {
datum.put(filedsArrayList.get(i), jsonObject.get(filedsArrayList.get(i)));
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter<GenericRecord> write = new GenericDatumWriter<GenericRecord>(this.schema);
// 然后由Encoder写到数据流。
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
try {
write.write(datum, encoder);
encoder.flush();
} catch (IOException e) {
System.out.println("序列化失败" + e);
} finally {
if (out != null) {
try {
out.close();
} catch (IOException e) {
System.out.println("序列化失败" + e);
}
}
}
try {
returnstr = out.toByteArray();
} catch (Exception e) {
System.out.println("序列化失败" + e);
}
return returnstr;
}
/**
* 序列化对象
*/
public synchronized byte[] serializing(GenericRecord datum) {
byte[] returnstr = null;
ByteArrayOutputStream out = new ByteArrayOutputStream();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter<GenericRecord> write = new GenericDatumWriter<GenericRecord>(this.schema);
// 然后由Encoder写到数据流。
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
try {
write.write(datum, encoder);
encoder.flush();
} catch (IOException e) {
System.out.println("序列化失败" + e);
} finally {
if (out != null) {
try {
out.close();
} catch (IOException e) {
System.out.println("序列化失败" + e);
}
}
}
try {
returnstr = out.toByteArray();
} catch (Exception e) {
System.out.println("序列化失败" + e);
}
// GenericRecord s = AvroDeserializerFactory.getTopicmetadataDeserializer().deserializing(returnstr);
return returnstr;
}
/**
* 序列化对象
*/
public synchronized byte[] serializingLog(String logTypeName, String timestamp, String source, String offset, Map<String, String> dimensions, Map<String, Double> metrics,
Map<String, String> normalFields) {
GenericRecord datum = new GenericData.Record(this.schema);
// 将数据加到datum中
datum.put(0, logTypeName);
datum.put(1, timestamp);
datum.put(2, source);
datum.put(3, offset);
datum.put(4, dimensions);
datum.put(5, metrics);
datum.put(6, normalFields);
return serializing(datum);
}
/**
* 序列化对象
*/
public synchronized byte[] serializingMetric(String metricSetName, String timestamp, Map<String, String> dimensions, Map<String, Double> metrics) {
GenericRecord datum = new GenericData.Record(this.schema);
// 将数据加到datum中
datum.put(0, metricSetName);
datum.put(1, timestamp);
datum.put(2, dimensions);
datum.put(3, metrics);
return serializing(datum);
}
private synchronized byte[] serializing(GenericRecord genericRecord, String key[]) {
byte[] returnstr = null;
GenericRecord datum = new GenericData.Record(this.schema);
// 将数据加到datum中
for (int i = 0; i < filedsArrayList.size(); i++) {
datum.put(filedsArrayList.get(i), new Utf8(String.valueOf(genericRecord.get(key[i]))));
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter<GenericRecord> write = new GenericDatumWriter<GenericRecord>(this.schema);
// 然后由Encoder写到数据流。
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
try {
write.write(datum, encoder);
encoder.flush();
} catch (IOException e) {
System.out.println("序列化失败" + e);
} finally {
if (out != null) {
try {
out.close();
} catch (IOException e) {
System.out.println("序列化失败" + e);
}
}
}
try {
returnstr = out.toByteArray();
} catch (Exception e) {
System.out.println("序列化失败" + e);
}
return returnstr;
}
}
package com.zorkdta.tools.avro;
/**
* todo
*
* @version V1.0
* @Author XieSen
* @Date 2019/4/3 11:05.
*/
public class AvroSerializerFactory {
private static AvroSerializer metricMetadata = null;
private static AvroSerializer logMetadata = null;
public static AvroSerializer getLogAvorSerializer() {
if (logMetadata == null) {
logMetadata = new AvroSerializer(LogAvroMacroDef.metadata);
}
return logMetadata;
}
public static AvroSerializer getMetricAvorSerializer() {
if (metricMetadata == null) {
metricMetadata = new AvroSerializer(MetricAvroMacroDef.metadata);
}
return metricMetadata;
}
}
package com.zorkdta.tools.avro;
/**
* todo
*
* @version V1.0
* @Author XieSen
* @Date 2019/4/3 10:59.
*/
public class LogAvroMacroDef {
public static String metadata = "{\n" +
" \"namespace\": \"com.zork.logs\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"logs\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"logTypeName\",\n" +
" \"type\": [\n" +
" \"string\",\n" +
" \"null\"\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"timestamp\",\n" +
" \"type\": [\n" +
" \"string\",\n" +
" \"null\"\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"source\",\n" +
" \"type\": [\n" +
" \"string\",\n" +
" \"null\"\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"offset\",\n" +
" \"type\": [\n" +
" \"string\",\n" +
" \"null\"\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"dimensions\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"map\",\n" +
" \"values\": \"string\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"measures\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"map\",\n" +
" \"values\": \"double\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"normalFields\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"map\",\n" +
" \"values\": \"string\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
"}";
}
package com.zorkdta.tools.avro;
/**
* todo
*
* @version V1.0
* @Author XieSen
* @Date 2019/4/3 11:00.
*/
public class MetricAvroMacroDef {
public static String metadata = "{\n" +
" \"namespace\": \"com.zork.metrics\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"metrics\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"metricsetname\",\n" +
" \"type\": [\n" +
" \"string\",\n" +
" \"null\"\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"timestamp\",\n" +
" \"type\": [\n" +
" \"string\",\n" +
" \"null\"\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"dimensions\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"map\",\n" +
" \"values\": \"string\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"metrics\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"map\",\n" +
" \"values\": \"double\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
"}";
}
package com.zorkdta.tools.mock;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
/**
* 生产数据(功能正常)
* @author zhangwei (<a href="mailto:zhangwei@zork.com.cn">zhangwei@zork.com.cn</a>)
* @date 2020-06-15 14:36
*/
public class MockConnectJsonData {
private static String topic;
private static String brokerlist;
private static KafkaProducer<String, String> producer;
private static void init() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerlist);
props.put("acks", "1");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", StringSerializer.class.getName());
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
producer = new KafkaProducer<String, String>(props);
}
/**
* 获取当前采集时间
*
* @return String
*/
private static String getLogTime() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
return sdf.format(new Date());
}
private static String getCollectTime() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
return sdf.format(new Date());
}
private static String buildMsg() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("logtypename", "tdx_filebeat");
jsonObject.put("hostname", "kafka-connect-2");
jsonObject.put("appprogram", "tdx");
jsonObject.put("offset", String.valueOf(System.currentTimeMillis()));
jsonObject.put("message", "10:06:41.335 功能请求 IP:182.140.129.3 MAC:F8A963586DFF 线程:00004364 通道ID:4 事务ID:16 请求:(0-98)集成客户校验(*) 营业部:(0001)国金证券集中交易(*)\\n66650109|************|XshR9/S5SDE=|8|0||12|7.37.0||||||||||0||0|182.140.129.3;PENGKANG;Administrator;83025;Intel(R)Core(TM)i7-4510UCPU@2.00GHz*4;bfebfbff00040651-GenuineIntel;Windows7 Service Pack 1 (Build 7601);182.140.129.3,0.0.0.0,0.0.0.0;F8A963586DFF,00FF8C535532,A0A8CD0D00B0;TF655AWJ16NG2L,143116404707;07/15/2014;8DC03929-0822-453C-A2D5-EFBE95E359BE;182.140.129.3;;NTFS;0C17-8FD7;C:;113G;HTS725050A7E630;GH2Z;TF655AWJ16NG2L;|||||2,Mar 1 2018,10:22:32|0|||GETLOGINPARAM||7.37,6.01,Mar 1 2018,10:37:07|8噝\\\\5\\\\3||||\\n10:06:41.491 调用失败 IP:182.140.129.3 MAC:F8A963586DFF 线程:00004364 通道ID:4 事务ID:16 请求:(0-98)集成客户校验(*) 营业部:(0001)国金证券集中交易(*) 耗时A:156 耗时B:0 排队:0\\n-4|资金账号或密码错误!|0|||\\n10:06:52.678 系统信息 开始关闭交易中心服务。\\n10:06:53.303 系统信息 (HS_TCP2.dll)连接守护线程退出!\\n10:06:53.335 系统信息 (HS_TCP2.dll)\\\"刷新约定购回标的证券信息\\\"线程成功退出!(记录总条数:3536)\\n10:06:54.413 系统信息 港股行情服务: 保存代码表(港股)缓存。\\n10:06:54.678 系统信息 深沪行情服务: 保存代码表缓存。\\n10:06:54.960 系统信息 交易中心服务已经成功关闭。\\n10:06:54.960 系统信息 系统正常关闭\\n");
jsonObject.put("logdate", getLogTime());
jsonObject.put("source", "/opt/log_TDX/20180320.log");
jsonObject.put("collecttime", getCollectTime());
jsonObject.put("appsystem", "tdx");
jsonObject.put("logtimeflag", "true");
System.out.println(jsonObject.toJSONString());
return jsonObject.toString();
}
private static void send(String message) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, null, message);
producer.send(producerRecord);
}
public static void main(String[] args) {
topic = "tdx3";
brokerlist = "kafka-1:19092,kafka-2:19092,kafka-3:19092";
init();
for (int i = 0; i <= 10000; i++) {
String message = buildMsg();
send(message);
}
}
}
package com.zorkdta.tools.mock;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @description:
* @author: 谢森
* @Email xiesen@zork.com.cn
* @time: 2020/1/17 0017 10:57
*/
public class MockFlinkxJson {
// private static String topic = "flinkx_json";
private static String topic = "flinkx_json";
private static String brokerAddr = "zorkdata-95:9092";
private static ProducerRecord<String, String> producerRecord = null;
private static KafkaProducer<String, String> producer = null;
public static void init() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerAddr);
props.put("acks", "1");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", StringSerializer.class.getName());
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
producer = new KafkaProducer<String, String>(props);
}
public static void send(String topic, String msg) throws ExecutionException, InterruptedException {
init();
producerRecord = new ProducerRecord<String, String>(
topic,
null,
msg
);
producer.send(producerRecord);
}
public static void main(String[] args) throws Exception {
for (int i = 0; i <= 100; i++) {
// {"user_id":"59","name":"xs-59","id":"59","content":"xd"}
JSONObject jsonObject = new JSONObject();
jsonObject.put("id", "" + i);
jsonObject.put("user_id", "" + i);
jsonObject.put("name", "jack" + i);
jsonObject.put("content", "xxxx");
String json = jsonObject.toJSONString();
send(topic, json);
}
}
}
package com.zorkdta.tools.mock;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import com.zorkdta.tools.avro.AvroSerializer;
import com.zorkdta.tools.avro.AvroSerializerFactory;
import com.zorkdta.tools.utils.DateUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @description:
* @author: 谢森
* @Email xiesen@zork.com.cn
* @time: 2020/1/17 0017 10:57
*/
public class MockKafkaConnect {
// private static String topic = "test";
private static String topic = "zorkdata_log_test";
private static String brokerAddr = "zorkdata-95:9092";
private static ProducerRecord<String, byte[]> producerRecord = null;
private static KafkaProducer<String, byte[]> producer = null;
public static void init() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerAddr);
props.put("acks", "1");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", ByteArraySerializer.class.getName());
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
producer = new KafkaProducer<String, byte[]>(props);
}
public static byte[] buildKafkaConnect() {
String logTypeName = "tc50_biz_filebeat";
String timestamp = DateUtil.getUTCTimeStr();
String source = "/opt/20191231.log";
String offset = String.valueOf(6322587L);
Map<String, String> dimensions = new HashMap<>();
dimensions.put("hostname", "localhost");
dimensions.put("appprogramname", "tc50");
dimensions.put("appsystem", "TXJY");
Map<String, Double> measures = new HashMap<>();
measures.put("latence", 301.0);
Map<String, String> normalFields = new HashMap<>();
normalFields.put("message", "成功处理");
AvroSerializer avroSerializer = AvroSerializerFactory.getLogAvorSerializer();
byte[] bytes = avroSerializer.serializingLog(logTypeName, timestamp, source, offset, dimensions, measures, normalFields);
return bytes;
}
public static void send(String topic) throws ExecutionException, InterruptedException {
byte[] req = buildKafkaConnect();
send(topic, req);
}
public static void send(String topic, byte[] msg) throws ExecutionException, InterruptedException {
init();
producerRecord = new ProducerRecord<String, byte[]>(
topic,
null,
msg
);
producer.send(producerRecord);
}
public static void main(String[] args) throws Exception {
for (int i = 0; i <= 100; i++) {
String logTypeName = "tc50_biz_filebeat";
String timestamp = DateUtil.getUTCTimeStr();
String source = "/opt/20191231.log";
String offset = String.valueOf(6322587L);
Map<String, String> dimensions = new HashMap<>();
dimensions.put("hostname", "localhost");
dimensions.put("appprogramname", "tc50");
dimensions.put("appsystem", "TXJY");
Map<String, Double> measures = new HashMap<>();
measures.put("latence", 301.0);
Map<String, String> normalFields = new HashMap<>();
normalFields.put("message", "成功处理");
normalFields.put("id", String.valueOf(i));
JSONObject jsonObject = new JSONObject();
jsonObject.put("logTypeName", logTypeName);
jsonObject.put("timestamp", timestamp);
jsonObject.put("source", source);
jsonObject.put("offset", offset);
jsonObject.put("dimensions", dimensions);
jsonObject.put("measures", measures);
jsonObject.put("normalFields", normalFields);
System.out.println(jsonObject.toJSONString());
AvroSerializer avroSerializer = AvroSerializerFactory.getLogAvorSerializer();
byte[] bytes = avroSerializer.serializingLog(logTypeName, timestamp, source, offset, dimensions, measures, normalFields);
send(topic, bytes);
}
}
}
package com.zorkdta.tools.mock;
import com.alibaba.fastjson.JSONObject;
import com.zorkdta.tools.utils.*;
import top.xiesen.mock.kafka.utils.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* @description:
* @author: 谢森
* @Email xiesen@zork.com.cn
* @time: 2020/1/17 0017 10:57
*/
public class MockKafkaConnectAvro {
private static long getSize(String propertiesName) throws Exception {
Properties properties = PropertiesUtil.getProperties(propertiesName);
long logSize = StringUtil.getLong(properties.getProperty("log.size", "5000").trim(), 1);
return logSize;
}
public static String sum(String logTypeName, String timestamp, String source, String offset,
Map<String, String> dimensions, Map<String, Double> metrics, Map<String, String> normalFields) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("logTypeName", logTypeName);
jsonObject.put("timestamp", timestamp);
jsonObject.put("source", source);
jsonObject.put("offset", offset);
jsonObject.put("dimensions", dimensions);
jsonObject.put("metrics", metrics);
jsonObject.put("normalFields", normalFields);
return jsonObject.toString();
}
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
if (args.length == 0) {
System.out.println("请指定配置文件");
System.exit(-1);
}
String propertiesName = args[0];
long size = getSize(propertiesName);
for (int i = 0; i < size; i++) {
String logTypeName = "tc50_biz_filebeat";
String timestamp = DateUtil.getUTCTimeStr();
String source = "/opt/20191231.log";
String offset = String.valueOf(6322587L);
Map<String, String> dimensions = new HashMap<>();
dimensions.put("hostname", "localhost");
dimensions.put("appprogramname", "tc50");
dimensions.put("appsystem", "TXJY");
Map<String, Double> measures = new HashMap<>();
measures.put("latence", 301.0);
Map<String, String> normalFields = new HashMap<>();
normalFields.put("message", "成功处理");
normalFields.put("id", String.valueOf(i));
System.out.println(sum(logTypeName, timestamp, source, offset, dimensions, measures, normalFields));
// System.out.println("--------------------- start ----------------------------");
// long l1 = System.currentTimeMillis();
CustomerProducer producer = ProducerPool.getInstance(propertiesName).getProducer();
// long l2 = System.currentTimeMillis();
// System.out.println("获取 producer 需要的时间: " + (l2 - l1) + "ms");
producer.sendLog(logTypeName, timestamp, source, offset, dimensions, measures, normalFields);
// long l3 = System.currentTimeMillis();
// System.out.println("发送数据执行的时间: " + (l3 - l2) + "ms");
// System.out.println("--------------------- end ----------------------------");
}
long end = System.currentTimeMillis();
Thread.sleep(5000);
System.out.println("写入 " + size + " 条数据,一共耗时 " + (end - start) + " ms");
}
}
package com.zorkdta.tools.mock;
import com.zorkdta.tools.utils.DateUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import com.zorkdta.tools.avro.AvroSerializer;
import com.zorkdta.tools.avro.AvroSerializerFactory;
import top.xiesen.mock.kafka.utils.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* @description:
* @author: 谢森
* @Email xiesen@zork.com.cn
* @time: 2020/1/17 0017 10:57
*/
public class MockKafkaConnectAvroTest {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
int size = 1000;
String topic = "info";
String brokerAddr = "kafka-1:9092,kafka-2:9092,kafka-3:9092";
if (args.length == 3) {
size = Integer.valueOf(args[0]);
topic = args[1];
brokerAddr = args[2];
System.out.println("请输出 topic 以及 kafka 地址");
}
Properties props = new Properties();
props.put("bootstrap.servers", brokerAddr);
props.put("acks", "1");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", ByteArraySerializer.class.getName());
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
for (int i = 0; i <= size; i++) {
String logTypeName = "tc50_biz_filebeat";
String timestamp = DateUtil.getUTCTimeStr();
String source = "/opt/20191231.log";
String offset = String.valueOf(6322587L);
Map<String, String> dimensions = new HashMap<>();
dimensions.put("hostname", "localhost");
dimensions.put("appprogramname", "tc50");
dimensions.put("appsystem", "TXJY");
Map<String, Double> measures = new HashMap<>();
measures.put("latence", 301.0);
Map<String, String> normalFields = new HashMap<>();
normalFields.put("message", "成功处理");
normalFields.put("id", String.valueOf(i));
AvroSerializer avroSerializer = AvroSerializerFactory.getLogAvorSerializer();
byte[] bytes = avroSerializer.serializingLog(logTypeName, timestamp, source, offset, dimensions, measures, normalFields);
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<String, byte[]>(
topic,
null,
bytes
);
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
producer.send(producerRecord);
producer.close();
long end = System.currentTimeMillis();
System.out.println("写入 " + size + " 条数据,一共耗时 " + (end - start) + " ms");
}
}
}
package com.zorkdta.tools.mock;
import com.alibaba.fastjson.JSONObject;
import com.zorkdta.tools.utils.CustomerProducer;
import com.zorkdta.tools.utils.ProducerPool;
import com.zorkdta.tools.utils.PropertiesUtil;
import com.zorkdta.tools.utils.StringUtil;
import top.xiesen.mock.kafka.utils.*;
import java.util.Properties;
/**
* @description:
* @author: 谢森
* @Email xiesen@zork.com.cn
* @time: 2020/1/17 0017 10:57
*/
public class MockKafkaConnectJson {
private static long getSize(String propertiesName) throws Exception {
Properties properties = PropertiesUtil.getProperties(propertiesName);
long logSize = StringUtil.getLong(properties.getProperty("log.size", "5000").trim(), 1);
return logSize;
}
private static int dataSize() {
JSONObject jsonObject = new JSONObject();
String className = "MockKafkaConnectJson";
String message = "Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization. This documentation is for Apache Flink version 1.10. These p";
jsonObject.put("className", className);
jsonObject.put("message", message);
int length = jsonObject.toJSONString().length();
System.out.println(length);
System.out.println(length);
return length;
}
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
if (args.length == 0) {
System.out.println("请指定配置文件");
System.exit(-1);
}
String propertiesName = args[0];
long size = getSize(propertiesName);
for (int i = 0; i <= size; i++) {
JSONObject jsonObject = new JSONObject();
String className = "MockKafkaConnectJson";
String message = "Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization. This documentation is for Apache Flink version 1.10. These p";
jsonObject.put("className", className);
jsonObject.put("message", message);
int length = jsonObject.toJSONString().length();
System.out.println(length);
CustomerProducer producer = ProducerPool.getInstance(propertiesName).getProducer();
producer.sendJsonLog(jsonObject.toJSONString());
}
long end = System.currentTimeMillis();
System.out.println("写入 " + size + " 条数据,一共耗时 " + (end - start) + " ms");
}
}
package com.zorkdta.tools.mock;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import com.zorkdta.tools.pojo.MetricEvent;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @description:
* @author: 谢森
* @Email xiesen@zork.com.cn
* @time: 2020/1/16 0016 9:28
*/
public class MockMetricEvent {
private static String topic = "flink-metric";
private static String brokerAddr = "zorkdata-91:9092";
private static ProducerRecord<String, String> producerRecord = null;
private static KafkaProducer<String, String> producer = null;
public static void init() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerAddr);
props.put("acks", "1");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", StringSerializer.class.getName());
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
producer = new KafkaProducer(props);
}
public static String buildMetricEvent() {
String name = "metric";
Long timestamp = System.currentTimeMillis();
Map<String, Object> fields = new HashMap<>();
fields.put("cpu_used", 0.6);
fields.put("disk_used", 0.4);
Map<String, String> tags = new HashMap<>();
tags.put("hostname", "localhost");
MetricEvent metricEvent = new MetricEvent(name, timestamp, fields, tags);
return JSON.toJSONString(metricEvent);
}
public static void send() throws ExecutionException, InterruptedException {
init();
String req = buildMetricEvent();
System.out.println(req);
producerRecord = new ProducerRecord<String, String>(
topic,
null,
req
);
producer.send(producerRecord).get();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
for (int i = 0; i < 300; i++) {
send();
}
}
}
package com.zorkdta.tools.mock;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Date;
import java.util.Properties;
/**
* @description:
* @author: 谢森
* @Email xiesen@zork.com.cn
* @time: 2020/1/17 0017 10:57
*/
public class MockStreamxJson {
private static String topic = "streamx_json_test";
private static String brokerAddr = "zorkdata-95:9092";
private static ProducerRecord<String, String> producerRecord = null;
private static KafkaProducer<String, String> producer = null;
public static void init() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerAddr);
props.put("acks", "1");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", StringSerializer.class.getName());
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
producer = new KafkaProducer<String, String>(props);
}
public static String buildMsg() {
JSONObject jsonObject = new JSONObject();
// 2020-03-08T12:35:02.659 [main] DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator
jsonObject.put("className", "org.apache.flink.streaming.api.graph.StreamGraphGenerator");
jsonObject.put("methodName", "main");
jsonObject.put("datetime", new Date().toString());
return jsonObject.toString();
}
public static void send(String topic) {
init();
String req = buildMsg();
producerRecord = new ProducerRecord<String, String>(
topic,
null,
req
);
producer.send(producerRecord);
}
public static void main(String[] args) {
for (int i = 0; i <= 100; i++) {
send(topic);
}
}
}
package com.zorkdta.tools.mock;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Random;
/**
* @description:
* @author: 谢森
* @Email xiesen@zork.com.cn
* @time: 2020/1/17 0017 10:57
*/
public class MockStreamxJson1 {
private static String topic = "streamx_json";
private static String brokerAddr = "zorkdata-95:9092";
private static ProducerRecord<String, String> producerRecord = null;
private static KafkaProducer<String, String> producer = null;
public static void init() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerAddr);
props.put("acks", "1");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", StringSerializer.class.getName());
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
producer = new KafkaProducer<String, String>(props);
}
public static String buildMsg() {
JSONObject jsonObject = new JSONObject();
// {"name":"tom","obj":{"channel":"root"},"pv":4,"xctime":1572932485}
jsonObject.put("name", "tom");
JSONObject jsonObject1 = new JSONObject();
jsonObject1.put("channel", "root");
jsonObject.put("obj", jsonObject1);
jsonObject.put("pv", new Random().nextInt(100));
jsonObject.put("xctime", System.currentTimeMillis());
return jsonObject.toString();
}
public static void send(String topic) {
init();
String req = buildMsg();
producerRecord = new ProducerRecord<String, String>(
topic,
null,
req
);
producer.send(producerRecord);
}
public static void main(String[] args) {
for (int i = 0; i <= 100; i++) {
send(topic);
}
}
}
package com.zorkdta.tools.mock;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import com.zorkdta.tools.avro.AvroSerializer;
import com.zorkdta.tools.avro.AvroSerializerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
/**
* @Description
* @className top.xiesen.mock.kafka.mock.MockZorkMetric
* @Author 谢森
* @Email xiesen@zork.com.cn
* @Date 2020/3/15 18:15
*/
public class MockZorkMetric {
private static String topic = "zorkdata_metric";
private static String brokerAddr = "zorkdata-95:9092";
private static ProducerRecord<String, byte[]> producerRecord = null;
private static KafkaProducer<String, byte[]> producer = null;
public static void init() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerAddr);
props.put("acks", "1");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", ByteArraySerializer.class.getName());
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
producer = new KafkaProducer<String, byte[]>(props);
}
public static byte[] buildMetric() {
Random random = new Random();
String metricSetName = "influx_cpu";
String timestamp = String.valueOf(System.currentTimeMillis());
Map<String, String> dimensions = new HashMap<>();
dimensions.put("hostname", "localhost");
dimensions.put("appprogramname", "tc50");
dimensions.put("appsystem", "TXJY");
Map<String, Double> metrics = new HashMap<>();
metrics.put("cpu_usage", random.nextDouble());
AvroSerializer metricSerializer = AvroSerializerFactory.getMetricAvorSerializer();
byte[] bytes = metricSerializer.serializingMetric(metricSetName, timestamp, dimensions, metrics);
return bytes;
}
public static void send(String topic) {
init();
byte[] req = buildMetric();
producerRecord = new ProducerRecord<String, byte[]>(
topic,
null,
req
);
producer.send(producerRecord);
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i <= 100; i++) {
send(topic);
Thread.sleep(1000);
}
}
}
package com.zorkdta.tools.pojo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
/**
* @description:
* @author: 谢森
* @Email xiesen@zork.com.cn
* @time: 2020/1/16 0016 9:27
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MetricEvent {
/**
* Metric name
*/
private String name;
/**
* Metric timestamp
*/
private Long timestamp;
/**
* Metric fields
*/
private Map<String, Object> fields;
/**
* Metric tags
*/
private Map<String, String> tags;
}
package com.zorkdta.tools.pojo;
/**
* @description:
* @author: 谢森
* @Email xiesen@zork.com.cn
* @time: 2020/1/13 0013 10:19
*/
public class Rule implements Comparable<Rule> {
private String operator;
private String times;
private String level;
public String getOperator() {
return operator;
}
public void setOperator(String operator) {
this.operator = operator;
}
public String getTimes() {
return times;
}
public void setTimes(String times) {
this.times = times;
}
public String getLevel() {
return level;
}
public void setLevel(String level) {
this.level = level;
}
/**
* 从大到小
*
* @param o
* @return
*/
@Override
public int compareTo(Rule o) {
return Integer.parseInt(o.times) - Integer.parseInt(this.times);
}
}
package com.zorkdta.tools.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.Map;
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class ZorkData {
private String logTypeName;
private String source;
private String timestamp;
private String offset;
private Map<String, Double> measures;
private Map<String, String> normalFields;
private Map<String, String> dimensions;
}
package com.zorkdta.tools.test;
import com.zorkdta.tools.pojo.Rule;
import java.util.Collections;
import java.util.List;
/**
* @description:
* @author: 谢森
* @Email xiesen@zork.com.cn
* @time: 2020/1/13 0013 9:24
*/
public class LogicRule {
public static String comp1(List<Rule> list, int times) {
// 默认从大到小
Collections.sort(list);
List<Rule> newList = list;
for (Rule rule : newList) {
String operator = rule.getOperator();
// 如果是 <= 或者 < 符号,规则数组进行翻转
if ("<=".equals(operator) || "<=".equals(operator)) {
Collections.reverse(newList);
}
if (">=".equals(operator)) {
for (Rule r1 : newList) {
if (times >= Integer.parseInt(r1.getTimes())) {
return r1.getLevel();
}
continue;
}
}
if (">".equals(operator)) {
for (Rule r1 : newList) {
if (times > Integer.parseInt(r1.getTimes())) {
return r1.getLevel();
}
continue;
}
}
if ("<=".equals(operator)) {
for (Rule r1 : newList) {
if (times <= Integer.parseInt(r1.getTimes())) {
return r1.getLevel();
}
continue;
}
}
if ("<".equals(operator)) {
for (Rule r1 : newList) {
if (times < Integer.parseInt(r1.getTimes())) {
return r1.getLevel();
}
continue;
}
}
break;
}
return null;
}
}
package com.zorkdta.tools.test;
import com.zorkdta.tools.pojo.ZorkData;
import org.apache.commons.lang.StringUtils;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.*;
/**
* @Description
* @className top.xiesen.mock.kafka.test.ReflectionUtils
* @Author 谢森
* @Email xiesen@zork.com.cn
* @Date 2020/3/11 17:06
*/
public class ReflectionUtils {
/**
* 构造方法私有
*/
private ReflectionUtils() {
}
/**
* 根据字段名称获取对象的属性
*
* @param fieldName
* @param target
* @return
* @throws Exception
*/
public static Object getFieldValueByName(String fieldName, Object target) throws Exception {
String firstLetter = fieldName.substring(0, 1).toUpperCase();
String getter = "get" + firstLetter + fieldName.substring(1);
Method method = target.getClass().getMethod(getter, new Class[0]);
Object e = method.invoke(target, new Object[0]);
return e;
}
/**
* 获取所有字段名字
*
* @param target
* @return
*/
public static String[] getFiledName(Object target) throws Exception {
Field[] fields = target.getClass().getDeclaredFields();
String[] fieldNames = new String[fields.length];
for (int i = 0; i < fields.length; ++i) {
fieldNames[i] = fields[i].getName();
}
return fieldNames;
}
/**
* 获取所有属性的值
*
* @param target
* @return
* @throws Exception
*/
public static Object[] getFiledValues(Object target) throws Exception {
String[] fieldNames = getFiledName(target);
Object[] value = new Object[fieldNames.length];
for (int i = 0; i < fieldNames.length; ++i) {
value[i] = getFieldValueByName(fieldNames[i], target);
}
return value;
}
public static String getMapKey(String s) {
int startindex = s.indexOf("[");
if (startindex < 0) {
return null;
}
String substr = s.substring(0, startindex);
return substr;
}
public static String getMapValue(String s) {
int startindex = s.indexOf("'");
if (startindex < 0) {
return null;
}
String substr = s.substring(startindex + 1);
int endindex = substr.indexOf("'");
if (endindex < 0) {
return null;
}
String ret = substr.substring(0, endindex);
return ret;
}
public static String getPartition(ZorkData zorkData, String partition) throws Exception {
StringBuilder builder = new StringBuilder();
if (StringUtils.isNotEmpty(partition)) {
List<String> list = Arrays.asList(partition.split(","));
for (int i = 0; i < list.size(); i++) {
String value = list.get(i);
if ("timestamp".equals(value)) {
// 日期格式化
}
if (value.contains("[")) {
String mapKey = getMapKey(value);
Map map = (Map) getFieldValueByName(mapKey, zorkData);
Object o = map.get(getMapValue(value));
builder.append("/" + o.toString());
} else {
Object o = getFieldValueByName(value, zorkData);
builder.append("/" + o.toString());
}
}
}
return builder.toString();
}
public static void main(String[] args) throws Exception {
ZorkData zorkData = new ZorkData();
zorkData.setLogTypeName("test");
zorkData.setTimestamp(new Date().toString());
Map<String, String> dimensions = new HashMap<>();
dimensions.put("appsystem", "tdx");
zorkData.setDimensions(dimensions);
Object logTypeName = getFieldValueByName("logTypeName", zorkData);
System.out.println("logTypeName = " + logTypeName);
Object dimensions1 = getFieldValueByName("dimensions", zorkData);
System.out.println(dimensions1);
Map map = (Map) dimensions1;
System.out.println(map.get("appsystem"));
String str = "logTypeName,timestamp,dimensions['appsystem']";
System.out.println(getMapKey("dimensions['appsystem']"));
System.out.println(getMapValue("dimensions['appsystem']"));
String partition = getPartition(zorkData, str);
System.out.println(partition);
}
}
package com.zorkdta.tools.test;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @Description
* @className top.xiesen.mock.kafka.test.RegularMatchTest
* @Author 谢森
* @Email xiesen@zork.com.cn
* @Date 2020/4/13 13:08
*/
public class RegularMatchTest {
public static void main(String[] args) {
String str = "{id=1, name=周军锋, password=1234, create_time=2020-04-02 17:04:04, message={\"id\":\"1\",\"name\":\"周军锋\",\"password\":\"1234\",\"create_time\":\"2020-04-02 17:04:04\"}}";
Map<String, Object> map = regularMatchMessage(str);
//遍历选项
for (Map.Entry<String, Object> entry : map.entrySet()) {
System.out.println(entry.getKey() + " = " + entry.getValue());
}
Object id = map.get("id");
System.out.println("id: " + id);
System.out.println("name: " + map.get("name"));
System.out.println("password: " + map.get("password"));
System.out.println("create_time: " + map.get("create_time"));
}
private static Map<String, Object> regularMatchMessage(String data) {
Map<String, Object> map = new HashMap<>();
String regex = "(?<=message=\\{)[^}]*(?=\\})";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(data);
String formatData;
formatData = null;
while (matcher.find()) {
formatData = matcher.group();
}
if (null != formatData) {
String[] split = formatData.split(",");
for (int i = 0; i < split.length; i++) {
String tmp = split[i];
String[] split1 = tmp.split(":");
String key = split1[0];
StringBuilder str = new StringBuilder();
for (int j = 1; j <= split1.length - 1; j++) {
str.append(split1[j]);
str.append(":");
}
String value = str.toString();
map.put(key.substring(1, key.length() - 1), value.substring(1, value.length() - 2));
}
}
return map;
}
}
package com.zorkdta.tools.test;
import java.util.Properties;
/**
* @Description
* @className top.xiesen.mock.kafka.test.Test01
* @Author 谢森
* @Email xiesen@zork.com.cn
* @Date 2020/3/16 16:24
*/
public class Test01 {
public static void main(String[] args) {
String a = "CREATE TABLE myTable(\n" +
"\tname varchar,\n" +
"\tobj.channel varchar as channel,\n" +
"\tpv INT,\n" +
"\txctime bigint\n" +
")WITH(\n" +
"\ttype='kafka11',\n" +
"\tbootstrapServers='zorkdata-95:9092',\n" +
"\tzookeeperQuorum='zorkdata-91:2181/kafka111,zorkdata-92:2181/kafka111,zorkdata-95:2181/kafka111',\n" +
"\tkafka.key.deserializer='org.apache.kafka.common.serialization.StringDeserializer',\n" +
"\tkafka.value.deserializer='org.apache.kafka.common.serialization.StringDeserializer',\n" +
"\toffsetReset='earliest',\n" +
"\tgroupId='streamx_sql_01',\n" +
"\ttopic='streamx_json',\n" +
"\tsourceDataType='json',\n" +
"\tparallelism='2'\n" +
");\n" +
"\n" +
"CREATE TABLE MyResult(\n" +
" name varchar,\n" +
" pv INT\n" +
" )WITH(\n" +
" type ='elasticsearch',\n" +
" address ='192.168.70.25:9200',\n" +
" cluster='dev-es6',\n" +
" estype ='type1',\n" +
" index ='streamx_test',\n" +
" parallelism ='1',\n" +
"\tid='0,1'\n" +
" );\n" +
"\n" +
"insert into MyResult select name,pv from myTable;";
System.out.println(a.replaceAll("\n", "").replaceAll("\t", ""));
}
}
package com.zorkdta.tools.test;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.springframework.util.ReflectionUtils;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
public class TimeStampTest {
public static Date StrToDate(String str) {
SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmssSSS");
Date date = null;
try {
date = format.parse(str);
} catch (ParseException e) {
System.out.print("日期转换异常");
}
return date;
}
private static DateTimeFormatter dateFormat1 = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS");
public static void test() {
String mestime = String.valueOf("20190424092143333");
String datetime = mestime.substring(0, 15);
Date date = StrToDate(datetime);
String times = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(date).toString();
DateTime dateTime2 = DateTime.parse(times, dateFormat1);
String timestamp = dateTime2.toString();
System.out.println(timestamp);
}
public static int daysBetween(Date smdate, Date bdate) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
smdate = sdf.parse(sdf.format(smdate));
bdate = sdf.parse(sdf.format(bdate));
Calendar cal = Calendar.getInstance();
cal.setTime(smdate);
long time1 = cal.getTimeInMillis();
cal.setTime(bdate);
long time2 = cal.getTimeInMillis();
long between_days = (time2 - time1) / (1000 * 3600 * 24);
return Integer.parseInt(String.valueOf(between_days));
}
/**
* 字符串的日期格式的计算
*/
public static int daysBetween(String smdate, String bdate) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Calendar cal = Calendar.getInstance();
cal.setTime(sdf.parse(smdate));
long time1 = cal.getTimeInMillis();
cal.setTime(sdf.parse(bdate));
long time2 = cal.getTimeInMillis();
long between_days = (time2 - time1) / (1000 * 3600 * 24);
return Integer.parseInt(String.valueOf(between_days));
}
public static void main(String[] args) throws ParseException {
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
String today = sdf.format(new Date());
System.out.println(daysBetween("2018-03-17", today));
}
}
package com.zorkdta.tools.utils;
import com.zorkdta.tools.avro.AvroSerializerFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.*;
import java.util.concurrent.ExecutionException;
/**
* @Description
* @className top.xiesen.mock.kafka.utils.CustomerProducer
* @Author 谢森
* @Email xiesen@zork.com.cn
* @Date 2020/4/2 9:39
*/
@Slf4j
public class CustomerProducer {
static String servers = "kafka-1:9092,kafka-2:9092,kafka-3:9092";
static int batchSize = 1;
static CustomerProducer testProducer;
static String topics;
public static long logSize;
private static KafkaProducer<String, byte[]> producer;
private static KafkaProducer<String, String> noAvroProducer;
public static synchronized CustomerProducer getInstance(String propertiesName) {
if (testProducer == null) {
testProducer = new CustomerProducer(propertiesName);
}
return testProducer;
}
public CustomerProducer(String propertiesName) {
try {
initConfig(propertiesName);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
/**
* 这个参数控制着相同分区内数据发送的批次个数大小,也就是当数据达到 这个size 时,进行数据发送,
* 但是并不是数据达不到 size 的值,就不会发送数据,默认是 1048576,即 16k
*/
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
// 当数据发送失败时,重试次数设置
props.put(ProducerConfig.RETRIES_CONFIG, 5);
/**
* 消息是否发送,不是仅仅通过 batch.size 的值来控制的,实际上是一种权衡策略,即吞吐量和延时之间的权衡
* linger.ms 参数就是控制消息发送延时行为的,默认是 0,表示消息需要被立即发送。
*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
/**
* 控制消息发送的最大消息大小,默认是 10485760 字节 即 10Mb
*/
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
/**
* 当 producer 发送消息到 broker 时,broker 需要在规定的时间内返回结果,这个时间就是该参数控制的,默认是 30s
*/
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
/**
* 指定了producer 端用于缓存的缓存区大小,单位是字节,默认是 33554432, 即 32G
*/
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
/**
* 用户控制 生产者的持久性 acks 有3个值,
* 0: 表示producer 完全不理睬 broker 的处理结果
* all: 表示发送数据时,broker 不仅会将消息写入到本地磁盘,同时也要保证其他副本也写入完成,才返回结果
* 1: 表示发送数据时,broker 接收到消息写入到本地磁盘即可,无需保证其他副本是否写入成功
*/
props.put(ProducerConfig.ACKS_CONFIG, "1");
producer = new KafkaProducer<String, byte[]>(props);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
noAvroProducer = new KafkaProducer<String, String>(props);
} catch (Exception ex) {
log.error("初始化Kafka失败,系统自动退出! ", ex);
System.exit(1);
}
}
public void initConfig(String propertiesName) throws Exception {
Properties properties = PropertiesUtil.getProperties(propertiesName);
topics = properties.getProperty("log.topic");
servers = properties.getProperty("kafka.servers", "zorkdata-151:9092").trim();
batchSize = StringUtil.getInt(properties.getProperty("kafka.batch.size", "5000").trim(), 1);
logSize = StringUtil.getLong(properties.getProperty("log.size", "5000").trim(), 1);
}
public void sendLog(String logTypeName, String timestamp, String source, String offset,
Map<String, String> dimensions, Map<String, Double> metrics, Map<String, String> normalFields) throws ExecutionException, InterruptedException {
try {
long l1 = System.currentTimeMillis();
byte[] bytes = AvroSerializerFactory.getLogAvorSerializer().serializingLog(logTypeName, timestamp, source,
offset, dimensions, metrics, normalFields);
long l2 = System.currentTimeMillis();
// System.out.println("数据序列化需要的时间: " + (l2 - l1) + "ms");
producer.send(new ProducerRecord<String, byte[]>(topics, null, bytes));
long l3 = System.currentTimeMillis();
// System.out.println("执行 producer.send 方法所需要的时间: " + (l3 - l2) + "ms");
} catch (Exception e) {
log.error("sendLog-插入Kafka失败", e);
}
}
public void sendJsonLog(String logJson) {
try {
noAvroProducer.send(new ProducerRecord<String, String>(topics, null, logJson));
} catch (Exception e) {
log.error("send json Log-插入Kafka失败", e);
}
}
}
package com.zorkdta.tools.utils;
import org.joda.time.DateTime;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class DateUtil {
private static DateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
private static ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy.MM.dd");
}
};
private static ThreadLocal<SimpleDateFormat> utcSdf = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
}
};
public static Long timestamp(String timestamp) {
return new DateTime(timestamp).toDate().getTime();
}
public static String format(String timestamp) throws ParseException {
return sdf.get().format(new DateTime(timestamp).toDate());
}
public static Long utcDate2Timestamp(String utcDateStr) throws ParseException {
return utcSdf.get().parse(utcDateStr).getTime();
}
public static String getUTCTimeStr() {
return format.format(new Date()).toString();
}
public static String getUTCTimeStr(long interval) {
long currentTimeMillis = System.currentTimeMillis();
return format.format(new Date(currentTimeMillis + interval)).toString();
}
public static void main(String[] args) {
String timeStr = getUTCTimeStr();
Date date = new DateTime(timeStr).toDate();
System.out.println(sdf.get().format(date));
}
}
package com.zorkdta.tools.utils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Description
* @className top.xiesen.mock.kafka.utils.JConsumerMutil
* @Author 谢森
* @Email xiesen@zork.com.cn
* @Date 2020/4/19 23:05
*/
public class JConsumerMutil {
private final static Logger log = LoggerFactory.getLogger(JConsumerMutil.class);
private final KafkaConsumer<String, String> consumer;
private ExecutorService executorService;
public JConsumerMutil() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "ke1");
// 开启自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test_kafka_game_x"));
}
public void execute() {
// 初始化线程池
executorService = Executors.newFixedThreadPool(1);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (null != records) {
executorService.submit(new KafkaConsumerThread(records, consumer));
}
}
}
public void shutdown() {
try {
if (consumer != null) {
consumer.close();
}
if (executorService != null) {
executorService.shutdown();
}
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
log.error("shutdown kafka consumer thread timeout.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* 消费者线程实例
*/
class KafkaConsumerThread implements Runnable {
private ConsumerRecords<String, String> records;
public KafkaConsumerThread(ConsumerRecords<String, String> records,
KafkaConsumer<String, String> consumer) {
this.records = records;
}
@Override
public void run() {
for (TopicPartition partition : records.partitions()) {
// 获取消费记录数据集
List<ConsumerRecord<String, String>> partitionRecords = this.records.records(partition);
log.info("Thread Id : " + Thread.currentThread().getId());
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println("offset =" + record.offset() + ", key=" + record.key() + ", value=" + record.value());
}
}
}
}
public static void main(String[] args) {
JConsumerMutil consumer = new JConsumerMutil();
try {
consumer.execute();
} catch (Exception e) {
log.error("mutil consumer from kafka has error , msg is " + e.getMessage());
consumer.shutdown();
}
}
}
package com.zorkdta.tools.utils;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Properties;
/**
* @Description 单线程实现一个 kafka 生产者客户端
* @className top.xiesen.mock.kafka.utils.JProducer
* @Author 谢森
* @Email xiesen@zork.com.cn
* @Date 2020/4/19 21:22
*/
public class JProducer extends Thread {
private final Logger log = LoggerFactory.getLogger(JProducer.class);
/**
* 配置 kafka 链接信息
*
* @return
*/
public Properties configure() {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop:9092");
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
@Override
public void run() {
KafkaProducer<String, String> producer = new KafkaProducer<>(configure());
for (int i = 0; i < 100; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("ip", "192.168.0." + i);
json.put("date", new Date().toString());
String k = "key" + i;
producer.send(new ProducerRecord<String, String>("test_kafka_game_x", k, json.toJSONString()), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (null != e) {
log.error("send error,msg is " + e.getMessage());
} else {
log.info("the offset of the record we just send is: " + recordMetadata.offset());
}
}
});
}
try {
sleep(3000);
} catch (InterruptedException e) {
log.error("Interrupted thread error, msg is " + e.getMessage());
}
producer.close();
}
public static void main(String[] args) {
JProducer producer = new JProducer();
producer.start();
}
}
package com.zorkdta.tools.utils;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Description 实现一个多线程生产者应用客户端
* @className top.xiesen.mock.kafka.utils.JProducerThread
* @Author 谢森
* @Email xiesen@zork.com.cn
* @Date 2020/4/19 22:50
*/
public class JProducerThread extends Thread {
private final Logger log = LoggerFactory.getLogger(JProducerThread.class);
private final static int MAX_THREAD_SIZE = 6;
public Properties configure() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop:9092");
/**
* 这个参数控制着相同分区内数据发送的批次个数大小,也就是当数据达到 这个size 时,进行数据发送,
* 但是并不是数据达不到 size 的值,就不会发送数据,默认是 1048576,即 16k
*/
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
/**
* 消息是否发送,不是仅仅通过 batch.size 的值来控制的,实际上是一种权衡策略,即吞吐量和延时之间的权衡
* linger.ms 参数就是控制消息发送延时行为的,默认是 0,表示消息需要被立即发送。
*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
/**
* 指定了producer 端用于缓存的缓存区大小,单位是字节,默认是 33554432, 即 32G
*/
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
/**
* 用户控制 生产者的持久性 acks 有3个值,
* 0: 表示producer 完全不理睬 broker 的处理结果
* all: 表示发送数据时,broker 不仅会将消息写入到本地磁盘,同时也要保证其他副本也写入完成,才返回结果
* 1: 表示发送数据时,broker 接收到消息写入到本地磁盘即可,无需保证其他副本是否写入成功
*/
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
@Override
public void run() {
KafkaProducer<String, String> producer = new KafkaProducer<>(configure());
for (int i = 0; i < 100; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("ip", "192.168.0." + i);
json.put("date", new Date().toString());
String k = "key" + i;
producer.send(new ProducerRecord<String, String>("test_kafka_game_x", k, json.toJSONString()), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (null != e) {
log.error("send error,msg is " + e.getMessage());
} else {
log.info("the offset of the record we just send is: " + recordMetadata.offset());
}
}
});
}
try {
sleep(3000);
} catch (InterruptedException e) {
log.error("Interrupted thread error, msg is " + e.getMessage());
}
producer.close();
}
public static void main(String[] args) {
// 创建一个固定线程数的线程池
ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD_SIZE);
// 提交任务
executorService.submit(new JProducerThread());
// 关闭线程
executorService.shutdown();
}
}
package com.zorkdta.tools.utils;
import java.io.Closeable;
import java.io.IOException;
/**
* @Description
* @className top.xiesen.mock.kafka.utils.ProducerPool
* @Author 谢森
* @Email xiesen@zork.com.cn
* @Date 2020/4/2 9:39
*/
public class ProducerPool implements Closeable {
private CustomerProducer[] pool;
private int threadNum = 15;
// 轮循id
private int index = 0;
private static ProducerPool producerInstance = null;
public static ProducerPool getInstance(String propertiesName) {
if (producerInstance == null) {
producerInstance = new ProducerPool(propertiesName);
}
return ProducerPool.producerInstance;
}
private ProducerPool(String propertiesName) {
init(propertiesName);
}
public void init(String propertiesName) {
pool = new CustomerProducer[threadNum];
for (int i = 0; i < threadNum; i++) {
pool[i] = new CustomerProducer(propertiesName);
}
}
public CustomerProducer getProducer() {
if (index > 65535) {
index = 0;
}
return pool[index++ % threadNum];
}
@Override
public void close() throws IOException {
}
}
package com.zorkdta.tools.utils;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties;
/**
* @Description
* @className top.xiesen.mock.kafka.utils.PropertiesUtil
* @Author 谢森
* @Email xiesen@zork.com.cn
* @Date 2020/4/2 9:41
*/
public class PropertiesUtil {
/**
* 根据文件名获取该properties对象
*
* @param propertieFileName
* @return
*/
public static Properties getProperties(String propertieFileName) throws Exception {
Properties properties = new Properties();
InputStream inputStream = null;
InputStreamReader inputStreamReader = null;
try {
// inputStream = PropertiesUtil.class.getResourceAsStream(propertieFileName);
inputStream = new FileInputStream(new File(propertieFileName));
inputStreamReader = new InputStreamReader(inputStream, "UTF-8");
properties.load(inputStreamReader);
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (Exception ex) {
}
}
if (inputStreamReader != null) {
try {
inputStreamReader.close();
} catch (Exception ex) {
}
}
}
return properties;
}
}
package com.zorkdta.tools.utils;
import org.joda.time.DateTime;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @Description
* @className top.xiesen.mock.kafka.utils.StringUtil
* @Author 谢森
* @Email xiesen@zork.com.cn
* @Date 2020/4/2 9:50
*/
public class StringUtil {
public static void main(String[] args) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long a = Long.parseLong("1487810258000");
System.out.println(df.format(new Date(a)));
System.out.println(getMSTime("2017-02-23T00:37:38Z"));
}
public static String getISOTime(String str) {
if (str == null) {
return str;
}
str = str.trim();
try {
DateTime datetime;
if (str.length() == 13 && str.startsWith("1")) {// 1能管到2033年
long t = Long.parseLong(str);
datetime = new DateTime(t);
} else if (str.length() == 10 && str.startsWith("1")) {// 秒数,1能管到2033年
long t = Long.parseLong(str) * 1000;
datetime = new DateTime(t);
} else {
datetime = new DateTime(str);
// datetime = new DateTime("2033-02-13T00:37:38.778Z");
// datetime = new DateTime("2017-02-23T00:37:38.778+08:00");
}
return datetime.toDateTimeISO().toString();// "2017-02-23T00:37:38.778+08:00"
} catch (Exception ex) {
return null;
}
}
public static long getMSTime(String str) {
if (str == null) {
return -1;
}
str = str.trim();
try {
if (str.length() == 13 && str.startsWith("1")) {// 豪秒数,1能管到2033年
return Long.parseLong(str);
} else if (str.length() == 10 && str.startsWith("1")) {// 秒数,1能管到2033年
return Long.parseLong(str) * 1000;
} else {
// datetime = new DateTime("2033-02-13T00:37:38.778Z");
// datetime = new DateTime("2017-02-23T00:37:38.778+08:00");
return new DateTime(str).getMillis();
}
} catch (Exception ex) {
return -1;
}
}
// 指标入kafka的后,spark处理不了特殊字符
public static String replaceChar4MetricKey(String str) {
str = str.replaceAll("\"", "").replaceAll(",", "_").replaceAll("\\[", "").replaceAll("]", "").replaceAll("\\\\", "").replaceAll(" ", "_").replaceAll("=", "").replaceAll(":", "")
.replaceAll("\\.", "_");
return str;
}
public static List<String> numbers = new ArrayList<String>() {
{
add("0");
add("1");
add("2");
add("3");
add("4");
add("5");
add("6");
add("7");
add("8");
add("9");
}
};
private static Pattern pattern = Pattern.compile("^(-?\\d+)(\\.\\d+)?$");
public static boolean isNumeric(String str) {
Matcher isNum = pattern.matcher(str);
return isNum.matches();
}
public static boolean isNull(String str) {
if (str == null) {
return true;
}
str = str.trim();
return str.equals("") || str.equalsIgnoreCase("NULL");
}
public static boolean isDouble(String str) {
if (str == null) {
return false;
}
str = str.trim();
try {
Double.parseDouble(str);
return true;
} catch (Exception ex) {
return false;
}
}
public static Double getDouble(String str) {
if (str == null) {
return null;
}
str = str.trim();
try {
return Double.valueOf(str);
} catch (Exception Ex) {
return null;
}
}
public static double getDouble(String str, double defaultValue) {
Double d = getDouble(str);
return d == null ? defaultValue : d;
}
public static long getLong(String str, long defaultValue) {
if (str == null) {
return defaultValue;
}
str = str.trim();
try {
return Long.valueOf(str);
} catch (Exception Ex) {
return defaultValue;
}
}
public static int getInt(String str, int defaultValue) {
if (str == null) {
return defaultValue;
}
str = str.trim();
try {
return Integer.valueOf(str);
} catch (Exception Ex) {
return defaultValue;
}
}
}
#kafka.servers = kafka-1:19092,kafka-2:19092,kafka-3:19092
kafka.servers = zorkdata-95:9092
kafka.batch.size = 1
log.size = 10000
log.topic=flinkx
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment