Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
Mock-Data
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
屈庆涛
Mock-Data
Commits
be982b36
Commit
be982b36
authored
Apr 09, 2021
by
quqingtao
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
<dev>
1. something
parent
ab269bc9
Changes
34
Show whitespace changes
Inline
Side-by-side
Showing
34 changed files
with
2867 additions
and
27 deletions
+2867
-27
src/main/java/com/zorkdata/tools/avro/AvroSerializer.java
src/main/java/com/zorkdata/tools/avro/AvroSerializer.java
+13
-0
src/main/java/com/zorkdata/tools/kafka/AvroDeserializer.java
src/main/java/com/zorkdata/tools/kafka/AvroDeserializer.java
+66
-0
src/main/java/com/zorkdata/tools/kafka/AvroDeserializerFactory.java
...ava/com/zorkdata/tools/kafka/AvroDeserializerFactory.java
+25
-0
src/main/java/com/zorkdata/tools/kafka/AvroMacroDef.java
src/main/java/com/zorkdata/tools/kafka/AvroMacroDef.java
+51
-0
src/main/java/com/zorkdata/tools/kafka/AvroSerializer.java
src/main/java/com/zorkdata/tools/kafka/AvroSerializer.java
+281
-0
src/main/java/com/zorkdata/tools/kafka/AvroSerializerFactory.java
.../java/com/zorkdata/tools/kafka/AvroSerializerFactory.java
+26
-0
src/main/java/com/zorkdata/tools/kafka/LogAvroMacroDef.java
src/main/java/com/zorkdata/tools/kafka/LogAvroMacroDef.java
+75
-0
src/main/java/com/zorkdata/tools/kafka/Producer.java
src/main/java/com/zorkdata/tools/kafka/Producer.java
+105
-0
src/main/java/com/zorkdata/tools/kafka/ProducerPool.java
src/main/java/com/zorkdata/tools/kafka/ProducerPool.java
+66
-0
src/main/java/com/zorkdata/tools/mock/LogAvroProducer.java
src/main/java/com/zorkdata/tools/mock/LogAvroProducer.java
+92
-0
src/main/java/com/zorkdata/tools/mock/MockFilebeatDataToKafka.java
...java/com/zorkdata/tools/mock/MockFilebeatDataToKafka.java
+2
-2
src/main/java/com/zorkdata/tools/mock/MockMetricEvent.java
src/main/java/com/zorkdata/tools/mock/MockMetricEvent.java
+20
-12
src/main/java/com/zorkdata/tools/mock/MockStreamxLogAvro.java
...main/java/com/zorkdata/tools/mock/MockStreamxLogAvro.java
+131
-0
src/main/java/com/zorkdata/tools/mock/MockStreamxLogAvroQQT.java
...n/java/com/zorkdata/tools/mock/MockStreamxLogAvroQQT.java
+104
-0
src/main/java/com/zorkdata/tools/mock/MockStreamxLogAvroTest1.java
...java/com/zorkdata/tools/mock/MockStreamxLogAvroTest1.java
+161
-0
src/main/java/com/zorkdata/tools/mock/MockStreamxLogAvroTest2.java
...java/com/zorkdata/tools/mock/MockStreamxLogAvroTest2.java
+130
-0
src/main/java/com/zorkdata/tools/mock/MockStreamxLogAvroTest20201218.java
...m/zorkdata/tools/mock/MockStreamxLogAvroTest20201218.java
+119
-0
src/main/java/com/zorkdata/tools/mock/MockStreamxLogAvroTestRandom.java
...com/zorkdata/tools/mock/MockStreamxLogAvroTestRandom.java
+140
-0
src/main/java/com/zorkdata/tools/mock/MockStreamxMetricAvro.java
...n/java/com/zorkdata/tools/mock/MockStreamxMetricAvro.java
+133
-0
src/main/java/com/zorkdata/tools/mock/MockStreamxMetricAvro0402.java
...va/com/zorkdata/tools/mock/MockStreamxMetricAvro0402.java
+90
-0
src/main/java/com/zorkdata/tools/mock/MockStreamxMetricAvroFlink.java
...a/com/zorkdata/tools/mock/MockStreamxMetricAvroFlink.java
+88
-0
src/main/java/com/zorkdata/tools/mock/MockStreamxMetricAvroQQT.java
...ava/com/zorkdata/tools/mock/MockStreamxMetricAvroQQT.java
+152
-0
src/main/java/com/zorkdata/tools/mock/MockStreamxMetricAvroTestRandom.java
.../zorkdata/tools/mock/MockStreamxMetricAvroTestRandom.java
+89
-0
src/main/java/com/zorkdata/tools/mock/MockZorkMetric.java
src/main/java/com/zorkdata/tools/mock/MockZorkMetric.java
+15
-11
src/main/java/com/zorkdata/tools/mock/MockZorkMetricAfter15Seconds.java
...com/zorkdata/tools/mock/MockZorkMetricAfter15Seconds.java
+106
-0
src/main/java/com/zorkdata/tools/mock/MockZorkMetricAfterTime.java
...java/com/zorkdata/tools/mock/MockZorkMetricAfterTime.java
+96
-0
src/main/java/com/zorkdata/tools/mock/QQTMockZorkMetricHostBeatSimpleCycleAppSystem.java
...s/mock/QQTMockZorkMetricHostBeatSimpleCycleAppSystem.java
+87
-0
src/main/java/com/zorkdata/tools/mock/QQTMockZorkMetricHostBeatSimpleCyle.java
...kdata/tools/mock/QQTMockZorkMetricHostBeatSimpleCyle.java
+87
-0
src/main/java/com/zorkdata/tools/mock/QQTMockZorkMetricSimpleCyle.java
.../com/zorkdata/tools/mock/QQTMockZorkMetricSimpleCyle.java
+96
-0
src/main/java/com/zorkdata/tools/mock/QQTMockZorkMetricSimpleCyle2.java
...com/zorkdata/tools/mock/QQTMockZorkMetricSimpleCyle2.java
+94
-0
src/main/java/com/zorkdata/tools/mock/QQTTest.java
src/main/java/com/zorkdata/tools/mock/QQTTest.java
+24
-0
src/main/java/com/zorkdata/tools/mock/ScheduledExecutorServiceTest.java
...com/zorkdata/tools/mock/ScheduledExecutorServiceTest.java
+89
-0
src/main/java/com/zorkdata/tools/mock/TaskService.java
src/main/java/com/zorkdata/tools/mock/TaskService.java
+12
-0
src/main/resources/config.properties
src/main/resources/config.properties
+2
-2
No files found.
src/main/java/com/zorkdata/tools/avro/AvroSerializer.java
View file @
be982b36
...
...
@@ -245,6 +245,19 @@ public class AvroSerializer {
return
serializing
(
datum
);
}
/**
* 序列化对象
*/
public
synchronized
byte
[]
serializingMetricInt
(
String
metricSetName
,
String
timestamp
,
Map
<
String
,
String
>
dimensions
,
Map
<
String
,
Integer
>
metrics
)
{
GenericRecord
datum
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到datum中
datum
.
put
(
0
,
metricSetName
);
datum
.
put
(
1
,
timestamp
);
datum
.
put
(
2
,
dimensions
);
datum
.
put
(
3
,
metrics
);
return
serializing
(
datum
);
}
private
synchronized
byte
[]
serializing
(
GenericRecord
genericRecord
,
String
key
[])
{
byte
[]
returnstr
=
null
;
GenericRecord
datum
=
new
GenericData
.
Record
(
this
.
schema
);
...
...
src/main/java/com/zorkdata/tools/kafka/AvroDeserializer.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.kafka
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
org.apache.avro.Schema
;
import
org.apache.avro.generic.GenericData
;
import
org.apache.avro.generic.GenericDatumReader
;
import
org.apache.avro.generic.GenericRecord
;
import
org.apache.avro.io.DatumReader
;
import
org.apache.avro.io.Decoder
;
import
org.apache.avro.io.DecoderFactory
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* Created by zhuzhigang on 16/2/29.
*/
public
class
AvroDeserializer
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
AvroDeserializer
.
class
);
public
JSONObject
jsonObject
;
public
JSONArray
jsonArray
;
public
Schema
schema
;
public
String
[]
keys
;
public
AvroDeserializer
(
String
schema
)
{
getKeysFromjson
(
schema
);
}
/**
* @param schema:Avro序列化所使用的schema
* @return void 返回类型
* @throws
* @Title: getKeysFromjson
* @Description:用于获取Avro的keys
*/
void
getKeysFromjson
(
String
schema
)
{
this
.
jsonObject
=
JSONObject
.
parseObject
(
schema
);
this
.
schema
=
new
Schema
.
Parser
().
parse
(
schema
);
this
.
jsonArray
=
this
.
jsonObject
.
getJSONArray
(
"fields"
);
this
.
keys
=
new
String
[
this
.
jsonArray
.
size
()];
for
(
int
i
=
0
;
i
<
this
.
jsonArray
.
size
();
i
++)
{
this
.
keys
[
i
]
=
this
.
jsonArray
.
getJSONObject
(
i
).
get
(
"name"
).
toString
();
}
}
/**
* @param body 参数:byte[] body:kafka消息。
* @param @return 设定文件
* @return String 返回类型
* @throws
* @Title: deserializing
* @Description: 用于Avro的反序列化。
*/
public
GenericRecord
deserializing
(
byte
[]
body
)
{
DatumReader
<
GenericData
.
Record
>
datumReader
=
new
GenericDatumReader
<
GenericData
.
Record
>(
this
.
schema
);
Decoder
decoder
=
DecoderFactory
.
get
().
binaryDecoder
(
body
,
null
);
GenericData
.
Record
result
=
null
;
try
{
result
=
datumReader
.
read
(
null
,
decoder
);
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
String
.
format
(
"error Avro反序列化"
),
e
);
}
return
result
;
}
}
src/main/java/com/zorkdata/tools/kafka/AvroDeserializerFactory.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.kafka
;
/**
* Created by zhuzhigang on 16/2/29.
*/
public
class
AvroDeserializerFactory
{
private
static
AvroDeserializer
bappLauch
=
null
;
public
static
void
init
()
{
bappLauch
=
null
;
}
/**
* Topicmetadata
*
* @return
*/
public
static
AvroDeserializer
getTopicmetadataDeserializer
()
{
if
(
bappLauch
==
null
)
{
bappLauch
=
new
AvroDeserializer
(
LogAvroMacroDef
.
metadata
);
}
return
bappLauch
;
}
}
src/main/java/com/zorkdata/tools/kafka/AvroMacroDef.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.kafka
;
/**
* @author zhuzhigang
* @since 1.0
*/
public
class
AvroMacroDef
{
public
static
String
metadata
=
"{\n"
+
" \"namespace\": \"com.zork.metrics\",\n"
+
" \"type\": \"record\",\n"
+
" \"name\": \"metrics\",\n"
+
" \"fields\": [\n"
+
" {\n"
+
" \"name\": \"metricsetname\",\n"
+
" \"type\": [\n"
+
" \"string\",\n"
+
" \"null\"\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"timestamp\",\n"
+
" \"type\": [\n"
+
" \"string\",\n"
+
" \"null\"\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"dimensions\",\n"
+
" \"type\": [\n"
+
" \"null\",\n"
+
" {\n"
+
" \"type\": \"map\",\n"
+
" \"values\": \"string\"\n"
+
" }\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"metrics\",\n"
+
" \"type\": [\n"
+
" \"null\",\n"
+
" {\n"
+
" \"type\": \"map\",\n"
+
" \"values\": \"double\"\n"
+
" }\n"
+
" ]\n"
+
" }\n"
+
" ]\n"
+
"}"
;
}
src/main/java/com/zorkdata/tools/kafka/AvroSerializer.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.kafka
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
org.apache.avro.Schema
;
import
org.apache.avro.generic.GenericData
;
import
org.apache.avro.generic.GenericDatumWriter
;
import
org.apache.avro.generic.GenericRecord
;
import
org.apache.avro.io.DatumWriter
;
import
org.apache.avro.io.Encoder
;
import
org.apache.avro.io.EncoderFactory
;
import
org.apache.avro.util.Utf8
;
import
java.io.ByteArrayOutputStream
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
/**
* Created by zhuzhigang on 16/3/1.
*/
public
class
AvroSerializer
{
public
JSONObject
jsonObject
;
public
JSONArray
jsonArray
;
public
Schema
schema
;
public
List
<
String
>
filedsArrayList
=
new
ArrayList
<
String
>();
public
AvroSerializer
(
String
schema
)
{
getKeysFromjson
(
schema
);
}
/**
* @param schema
* :Avro序列化所使用的schema
* @return void 返回类型
* @throws
* @Title: getKeysFromjson
* @Description:用于获取Avro的keys
*/
void
getKeysFromjson
(
String
schema
)
{
this
.
jsonObject
=
JSONObject
.
parseObject
(
schema
);
this
.
schema
=
new
Schema
.
Parser
().
parse
(
schema
);
this
.
jsonArray
=
this
.
jsonObject
.
getJSONArray
(
"fields"
);
if
(
filedsArrayList
!=
null
&&
filedsArrayList
.
size
()
>
0
)
{
filedsArrayList
.
clear
();
}
for
(
int
i
=
0
;
i
<
this
.
jsonArray
.
size
();
i
++)
{
filedsArrayList
.
add
(
this
.
jsonArray
.
getJSONObject
(
i
).
get
(
"name"
).
toString
());
}
}
/**
* @param
* @param @return 设定文件
* @return String 返回类型
* @throws
* @Title: serializing
* @Description: 用于Avro的序列化。
*/
private
synchronized
byte
[]
serializing
(
List
<
String
>
temtuple
)
{
byte
[]
returnstr
=
null
;
GenericRecord
datum
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到datum中
for
(
int
i
=
0
;
i
<
filedsArrayList
.
size
();
i
++)
{
datum
.
put
(
filedsArrayList
.
get
(
i
),
temtuple
.
get
(
i
));
}
ByteArrayOutputStream
out
=
new
ByteArrayOutputStream
();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter
<
GenericRecord
>
write
=
new
GenericDatumWriter
<
GenericRecord
>(
this
.
schema
);
// 然后由Encoder写到数据流。
Encoder
encoder
=
EncoderFactory
.
get
().
binaryEncoder
(
out
,
null
);
try
{
write
.
write
(
datum
,
encoder
);
encoder
.
flush
();
}
catch
(
IOException
e
)
{
}
finally
{
if
(
out
!=
null
)
{
try
{
out
.
close
();
}
catch
(
IOException
e
)
{
}
}
}
try
{
returnstr
=
out
.
toByteArray
();
}
catch
(
Exception
e
)
{
}
return
returnstr
;
}
/**
* 序列化json串
*
* @param json
* @return
*/
private
synchronized
byte
[]
serializing
(
String
json
)
{
byte
[]
returnstr
=
null
;
JSONObject
jsonObject
=
(
JSONObject
)
JSONObject
.
parse
(
json
);
// new TypeReference<Object>() {}
GenericRecord
datum
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到datum中
for
(
int
i
=
0
;
i
<
filedsArrayList
.
size
();
i
++)
{
datum
.
put
(
filedsArrayList
.
get
(
i
),
new
Utf8
(
String
.
valueOf
(
jsonObject
.
get
(
filedsArrayList
.
get
(
i
)))));
}
ByteArrayOutputStream
out
=
new
ByteArrayOutputStream
();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter
<
GenericRecord
>
write
=
new
GenericDatumWriter
<
GenericRecord
>(
this
.
schema
);
// 然后由Encoder写到数据流。
Encoder
encoder
=
EncoderFactory
.
get
().
binaryEncoder
(
out
,
null
);
try
{
write
.
write
(
datum
,
encoder
);
encoder
.
flush
();
}
catch
(
IOException
e
)
{
}
finally
{
if
(
out
!=
null
)
{
try
{
out
.
close
();
}
catch
(
IOException
e
)
{
}
}
}
try
{
returnstr
=
out
.
toByteArray
();
}
catch
(
Exception
e
)
{
}
return
returnstr
;
}
/**
* 序列化json对象
*
* @param jsonObject
* @return
*/
private
synchronized
byte
[]
serializing
(
JSONObject
jsonObject
)
{
byte
[]
returnstr
=
null
;
GenericRecord
datum
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到datum中
for
(
int
i
=
0
;
i
<
filedsArrayList
.
size
();
i
++)
{
datum
.
put
(
filedsArrayList
.
get
(
i
),
jsonObject
.
get
(
filedsArrayList
.
get
(
i
)));
}
ByteArrayOutputStream
out
=
new
ByteArrayOutputStream
();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter
<
GenericRecord
>
write
=
new
GenericDatumWriter
<
GenericRecord
>(
this
.
schema
);
// 然后由Encoder写到数据流。
Encoder
encoder
=
EncoderFactory
.
get
().
binaryEncoder
(
out
,
null
);
try
{
write
.
write
(
datum
,
encoder
);
encoder
.
flush
();
}
catch
(
IOException
e
)
{
}
finally
{
if
(
out
!=
null
)
{
try
{
out
.
close
();
}
catch
(
IOException
e
)
{
}
}
}
try
{
returnstr
=
out
.
toByteArray
();
}
catch
(
Exception
e
)
{
}
return
returnstr
;
}
/**
* 序列化对象
*/
public
synchronized
byte
[]
serializing
(
GenericRecord
datum
)
{
byte
[]
returnstr
=
null
;
ByteArrayOutputStream
out
=
new
ByteArrayOutputStream
();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter
<
GenericRecord
>
write
=
new
GenericDatumWriter
<
GenericRecord
>(
this
.
schema
);
// 然后由Encoder写到数据流。
Encoder
encoder
=
EncoderFactory
.
get
().
binaryEncoder
(
out
,
null
);
try
{
write
.
write
(
datum
,
encoder
);
encoder
.
flush
();
}
catch
(
IOException
e
)
{
}
finally
{
if
(
out
!=
null
)
{
try
{
out
.
close
();
}
catch
(
IOException
e
)
{
}
}
}
try
{
returnstr
=
out
.
toByteArray
();
}
catch
(
Exception
e
)
{
}
// GenericRecord s = AvroDeserializerFactory.getTopicmetadataDeserializer().deserializing(returnstr);
return
returnstr
;
}
/**
* 序列化对象
*/
public
synchronized
byte
[]
serializingLog
(
String
logTypeName
,
String
timestamp
,
String
source
,
String
offset
,
Map
<
String
,
String
>
dimensions
,
Map
<
String
,
Double
>
metrics
,
Map
<
String
,
String
>
normalFields
)
{
GenericRecord
datum
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到datum中
datum
.
put
(
0
,
logTypeName
);
datum
.
put
(
1
,
timestamp
);
datum
.
put
(
2
,
source
);
datum
.
put
(
3
,
offset
);
datum
.
put
(
4
,
dimensions
);
datum
.
put
(
5
,
metrics
);
datum
.
put
(
6
,
normalFields
);
return
serializing
(
datum
);
}
/**
* 序列化对象
*/
public
synchronized
byte
[]
serializingMetric
(
String
metricSetName
,
String
timestamp
,
Map
<
String
,
String
>
dimensions
,
Map
<
String
,
Double
>
metrics
)
{
GenericRecord
datum
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到datum中
datum
.
put
(
0
,
metricSetName
);
datum
.
put
(
1
,
timestamp
);
datum
.
put
(
2
,
dimensions
);
datum
.
put
(
3
,
metrics
);
return
serializing
(
datum
);
}
private
synchronized
byte
[]
serializing
(
GenericRecord
genericRecord
,
String
key
[])
{
byte
[]
returnstr
=
null
;
GenericRecord
datum
=
new
GenericData
.
Record
(
this
.
schema
);
// 将数据加到datum中
for
(
int
i
=
0
;
i
<
filedsArrayList
.
size
();
i
++)
{
datum
.
put
(
filedsArrayList
.
get
(
i
),
new
Utf8
(
String
.
valueOf
(
genericRecord
.
get
(
key
[
i
]))));
}
ByteArrayOutputStream
out
=
new
ByteArrayOutputStream
();
// DatumWriter 将数据对象翻译成Encoder对象可以理解的类型
DatumWriter
<
GenericRecord
>
write
=
new
GenericDatumWriter
<
GenericRecord
>(
this
.
schema
);
// 然后由Encoder写到数据流。
Encoder
encoder
=
EncoderFactory
.
get
().
binaryEncoder
(
out
,
null
);
try
{
write
.
write
(
datum
,
encoder
);
encoder
.
flush
();
}
catch
(
IOException
e
)
{
}
finally
{
if
(
out
!=
null
)
{
try
{
out
.
close
();
}
catch
(
IOException
e
)
{
}
}
}
try
{
returnstr
=
out
.
toByteArray
();
}
catch
(
Exception
e
)
{
}
return
returnstr
;
}
}
src/main/java/com/zorkdata/tools/kafka/AvroSerializerFactory.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.kafka
;
/**
* @author zhuzhigang
* @since 1.0
*/
public
class
AvroSerializerFactory
{
private
static
AvroSerializer
metricMetadata
=
null
;
private
static
AvroSerializer
logMetadata
=
null
;
public
static
AvroSerializer
getLogAvorSerializer
()
{
if
(
logMetadata
==
null
)
{
logMetadata
=
new
AvroSerializer
(
LogAvroMacroDef
.
metadata
);
}
return
logMetadata
;
}
public
static
AvroSerializer
getMetricAvorSerializer
()
{
if
(
metricMetadata
==
null
)
{
metricMetadata
=
new
AvroSerializer
(
AvroMacroDef
.
metadata
);
}
return
metricMetadata
;
}
}
src/main/java/com/zorkdata/tools/kafka/LogAvroMacroDef.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.kafka
;
/**
* @author zhuzhigang
* @since 1.0
*/
public
class
LogAvroMacroDef
{
public
static
String
metadata
=
"{\n"
+
" \"namespace\": \"com.zork.logs\",\n"
+
" \"type\": \"record\",\n"
+
" \"name\": \"logs\",\n"
+
" \"fields\": [\n"
+
" {\n"
+
" \"name\": \"logTypeName\",\n"
+
" \"type\": [\n"
+
" \"string\",\n"
+
" \"null\"\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"timestamp\",\n"
+
" \"type\": [\n"
+
" \"string\",\n"
+
" \"null\"\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"source\",\n"
+
" \"type\": [\n"
+
" \"string\",\n"
+
" \"null\"\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"offset\",\n"
+
" \"type\": [\n"
+
" \"string\",\n"
+
" \"null\"\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"dimensions\",\n"
+
" \"type\": [\n"
+
" \"null\",\n"
+
" {\n"
+
" \"type\": \"map\",\n"
+
" \"values\": \"string\"\n"
+
" }\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"measures\",\n"
+
" \"type\": [\n"
+
" \"null\",\n"
+
" {\n"
+
" \"type\": \"map\",\n"
+
" \"values\": \"double\"\n"
+
" }\n"
+
" ]\n"
+
" },\n"
+
" {\n"
+
" \"name\": \"normalFields\",\n"
+
" \"type\": [\n"
+
" \"null\",\n"
+
" {\n"
+
" \"type\": \"map\",\n"
+
" \"values\": \"string\"\n"
+
" }\n"
+
" ]\n"
+
" }\n"
+
" ]\n"
+
"}"
;
}
src/main/java/com/zorkdata/tools/kafka/Producer.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.kafka
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
javax.security.auth.login.Configuration
;
import
java.util.*
;
public
class
Producer
{
// static String servers = "yf122:9092,yf121:9092,yf120:9092";
static
String
servers
=
"node1:9092,node2:9092,node3:9092"
;
// static String servers = "kafka-1:19092,kafka-2:19092,kafka-3:19092";
static
int
batchsize
=
1
;
static
Producer
testProducer
;
static
String
metricTopic
;
static
String
errorLogTopic
;
static
String
errorMetricTopic
;
static
String
alarmTopic
;
static
String
defaultLogTopic
;
static
List
<
String
>
cep_change_event_logtypename
=
new
ArrayList
<
String
>();
static
String
cep_change_event_topic
;
public
static
Map
<
String
,
String
>
APPSYSTEM_TOPIC_MAP
=
new
HashMap
<
String
,
String
>();
public
static
Map
<
String
,
Map
<
String
,
String
>>
APPSYSTEM_SERVICE_TOPIC_MAP
=
new
HashMap
<
String
,
Map
<
String
,
String
>>();
static
int
partition
=
6
;
private
static
KafkaProducer
<
String
,
byte
[]>
producer
;
private
static
KafkaProducer
<
String
,
String
>
noAvroProducer
;
public
Producer
()
{
try
{
initConfig
();
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
servers
);
// props.put("client.id", "webAPI4LogGather");
props
.
put
(
"key.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
put
(
"value.serializer"
,
"org.apache.kafka.common.serialization.ByteArraySerializer"
);
props
.
put
(
"batch.size"
,
batchsize
);
producer
=
new
KafkaProducer
<
String
,
byte
[]>(
props
);
props
.
put
(
"value.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
noAvroProducer
=
new
KafkaProducer
<
String
,
String
>(
props
);
}
catch
(
Exception
ex
)
{
ex
.
printStackTrace
();
}
}
public
void
initConfig
()
throws
Exception
{
servers
=
"node1:9092,node2:9092,node3:9092"
;
// servers = "kafka-1:19092,kafka-2:19092,kafka-3:19092";
batchsize
=
100000
;
}
public
void
sendLog
(
String
topic
,
String
logTypeName
,
String
timestamp
,
String
source
,
String
offset
,
Map
<
String
,
String
>
dimensions
,
Map
<
String
,
Double
>
metrics
,
Map
<
String
,
String
>
normalFields
)
{
try
{
byte
[]
bytes
=
AvroSerializerFactory
.
getLogAvorSerializer
().
serializingLog
(
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
metrics
,
normalFields
);
producer
.
send
(
new
ProducerRecord
<
String
,
byte
[]>(
topic
,
""
,
bytes
));
System
.
out
.
println
(
new
String
(
bytes
));
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
public
void
sendErrorLog
(
String
logJson
)
{
try
{
noAvroProducer
.
send
(
new
ProducerRecord
<
String
,
String
>(
errorLogTopic
,
null
,
logJson
));
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
public
void
sendErrorMetric
(
String
logJson
)
{
try
{
noAvroProducer
.
send
(
new
ProducerRecord
<
String
,
String
>(
errorMetricTopic
,
null
,
logJson
));
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
public
void
sendAlarm
(
String
alarmJson
)
{
try
{
noAvroProducer
.
send
(
new
ProducerRecord
<
String
,
String
>(
alarmTopic
,
null
,
alarmJson
));
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
public
void
sendMetric
(
String
metricSetName
,
String
timestamp
,
Map
<
String
,
String
>
dimensions
,
Map
<
String
,
Double
>
metrics
)
{
try
{
byte
[]
bytes
=
AvroSerializerFactory
.
getMetricAvorSerializer
().
serializingMetric
(
metricSetName
,
timestamp
,
dimensions
,
metrics
);
producer
.
send
(
new
ProducerRecord
<
String
,
byte
[]>(
metricTopic
,
""
,
bytes
));
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
}
src/main/java/com/zorkdata/tools/kafka/ProducerPool.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.kafka
;
import
java.io.Closeable
;
import
java.io.IOException
;
/**
* ClassName: ProducerPool
* Email: zhuzhigang@zork.com.cn
* Date: 2018\12\13 0013
*
* @author: zhuzhigang
**/
public
class
ProducerPool
implements
Closeable
{
private
Producer
[]
pool
;
private
int
threadNum
=
30
;
// 轮循id
private
int
index
=
0
;
private
static
ProducerPool
_interance
=
null
;
public
static
ProducerPool
getInstance
()
{
if
(
_interance
==
null
)
{
_interance
=
new
ProducerPool
();
}
return
ProducerPool
.
_interance
;
}
private
ProducerPool
()
{
init
();
}
public
void
init
()
{
pool
=
new
Producer
[
threadNum
];
for
(
int
i
=
0
;
i
<
threadNum
;
i
++)
{
pool
[
i
]
=
new
Producer
();
}
}
public
Producer
getProducer
()
{
if
(
index
>
65535
)
{
index
=
0
;
}
return
pool
[
index
++
%
threadNum
];
}
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
*
* <p> As noted in {@link AutoCloseable#close()}, cases where the
* close may fail require careful attention. It is strongly advised
* to relinquish the underlying resources and to internally
* <em>mark</em> the {@code Closeable} as closed, prior to throwing
* the {@code IOException}.
*
* @throws IOException if an I/O error occurs
*/
@Override
public
void
close
()
throws
IOException
{
}
}
\ No newline at end of file
src/main/java/com/zorkdata/tools/mock/LogAvroProducer.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.TypeReference
;
import
com.zorkdata.tools.kafka.Producer
;
import
com.zorkdata.tools.kafka.ProducerPool
;
import
com.zorkdata.tools.utils.StringUtil
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.Map
;
public
class
LogAvroProducer
{
public
static
final
String
MEASURES
=
"measures"
;
public
static
final
String
DIMENSIONS
=
"dimensions"
;
public
static
final
String
TIMESTAMP
=
"timestamp"
;
public
static
final
String
LOGTYPENAME
=
"logTypeName"
;
public
static
final
String
NORMALFIELDS
=
"normalFields"
;
public
static
final
String
SOURCE
=
"source"
;
public
static
final
String
OFFSET
=
"offset"
;
public
static
final
String
APPSYSTEM
=
"appsystem"
;
public
static
final
String
NULLSTR
=
""
;
public
static
void
main
(
String
[]
args
)
{
int
total
=
10
*
1000
*
1000
*
100
;
int
i
=
0
;
Long
begin
=
System
.
currentTimeMillis
();
while
(
i
<
total
)
{
Date
d
=
new
Date
();
JSONObject
logType
=
new
JSONObject
();
logType
.
put
(
LOGTYPENAME
,
"streamx_log_avro"
);
logType
.
put
(
TIMESTAMP
,
d
.
getTime
());
logType
.
put
(
SOURCE
,
"0"
);
logType
.
put
(
OFFSET
,
"0"
);
JSONObject
dimensions
=
new
JSONObject
();
dimensions
.
put
(
"hostname"
,
"zorkdata"
+
i
);
dimensions
.
put
(
"appprogramname"
,
"zorkdata"
+
i
);
dimensions
.
put
(
"ip"
,
"192.168.1.1"
);
dimensions
.
put
(
"appsystem"
,
"zorkdata"
+
i
);
JSONObject
normalFields
=
new
JSONObject
();
normalFields
.
put
(
"countryCode"
,
"SZ"
+
i
);
normalFields
.
put
(
"message"
,
"ABCD"
+
i
);
JSONObject
measures
=
new
JSONObject
();
logType
.
put
(
DIMENSIONS
,
dimensions
);
logType
.
put
(
NORMALFIELDS
,
normalFields
);
logType
.
put
(
MEASURES
,
measures
);
sendWhileData
(
"bigdata1"
,
logType
);
i
++;
}
Long
end
=
System
.
currentTimeMillis
();
System
.
out
.
println
(
"总耗时:"
+
(
end
-
begin
));
}
public
static
String
sendWhileData
(
String
topic
,
JSONObject
log
)
{
try
{
String
logTypeName
=
log
.
getString
(
LOGTYPENAME
);
String
_timestamp
=
log
.
getString
(
TIMESTAMP
);
String
timestamp
=
StringUtil
.
getISOTime
(
_timestamp
);
String
source
=
log
.
getString
(
SOURCE
);
String
offset
=
log
.
getString
(
OFFSET
);
if
(
source
==
null
)
{
source
=
NULLSTR
;
}
if
(
offset
==
null
)
{
offset
=
NULLSTR
;
}
JSONObject
dimensions
=
log
.
getJSONObject
(
DIMENSIONS
);
JSONObject
normalFields
=
log
.
getJSONObject
(
NORMALFIELDS
);
JSONObject
measures
=
log
.
getJSONObject
(
MEASURES
);
String
appSystem
=
dimensions
.
getString
(
APPSYSTEM
);
Map
<
String
,
String
>
dimensionsMap
=
dimensions
!=
null
?
JSONObject
.
parseObject
(
dimensions
.
toJSONString
(),
new
TypeReference
<
Map
<
String
,
String
>>()
{
})
:
new
HashMap
<>();
Map
<
String
,
String
>
normalFieldsMap
=
normalFields
!=
null
?
JSONObject
.
parseObject
(
normalFields
.
toJSONString
(),
new
TypeReference
<
Map
<
String
,
String
>>()
{
})
:
new
HashMap
<>();
Map
<
String
,
Double
>
measuresMap
=
measures
!=
null
?
JSONObject
.
parseObject
(
measures
.
toJSONString
(),
new
TypeReference
<
Map
<
String
,
Double
>>()
{
})
:
new
HashMap
<>();
Producer
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
topic
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensionsMap
,
measuresMap
,
normalFieldsMap
);
// System.out.println("white list send [" + logTypeName + "] success");
return
"ok"
;
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"Please define the log set. The data you sent through the whitelist is in an incorrect format"
);
return
"faild"
;
}
}
}
src/main/java/com/zorkdata/tools/mock/MockFilebeatDataToKafka.java
View file @
be982b36
...
...
@@ -122,7 +122,7 @@ public class MockFilebeatDataToKafka {
filebeatJson
.
put
(
"logTypeName"
,
"test_topic_log"
);
filebeatJson
.
put
(
"servicename"
,
"test_servicename"
);
filebeatJson
.
put
(
"servicecode"
,
"test_cdde"
);
filebeatJson
.
put
(
"collector
_rule_
id"
,
"1"
);
filebeatJson
.
put
(
"collector
rule
id"
,
"1"
);
filebeatJson
.
put
(
"@metadata"
,
metadataJson
);
filebeatJson
.
put
(
"input"
,
inputJson
);
...
...
@@ -163,7 +163,7 @@ public class MockFilebeatDataToKafka {
filebeatJson
.
put
(
"logTypeName"
,
"test_topic_log"
);
filebeatJson
.
put
(
"servicename"
,
"test_servicename"
);
filebeatJson
.
put
(
"servicecode"
,
"test_cdde"
);
filebeatJson
.
put
(
"collector
_rule_
id"
,
"1"
);
filebeatJson
.
put
(
"collector
rule
id"
,
"1"
);
filebeatJson
.
put
(
"@metadata"
,
metadataJson
);
filebeatJson
.
put
(
"input"
,
inputJson
);
...
...
src/main/java/com/zorkdata/tools/mock/MockMetricEvent.java
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.StringSerializer
;
...
...
@@ -15,8 +16,8 @@ import java.util.concurrent.ExecutionException;
* @author DeleMing
*/
public
class
MockMetricEvent
{
private
static
String
topic
=
"
flink-metric
"
;
private
static
String
brokerAddr
=
"
zorkdata-91:
9092"
;
private
static
String
topic
=
"
alert_test_source
"
;
private
static
String
brokerAddr
=
"
kafka-1:19092,kafka-2:19092,kafka-3:1
9092"
;
private
static
ProducerRecord
<
String
,
String
>
producerRecord
=
null
;
private
static
KafkaProducer
<
String
,
String
>
producer
=
null
;
...
...
@@ -35,15 +36,22 @@ public class MockMetricEvent {
}
public
static
String
buildMetricEvent
()
{
String
name
=
"metric"
;
Long
timestamp
=
System
.
currentTimeMillis
();
Map
<
String
,
Object
>
fields
=
new
HashMap
<>();
fields
.
put
(
"cpu_used"
,
0.6
);
fields
.
put
(
"disk_used"
,
0.4
);
Map
<
String
,
String
>
tags
=
new
HashMap
<>();
tags
.
put
(
"hostname"
,
"localhost"
);
MetricEvent
metricEvent
=
new
MetricEvent
(
name
,
timestamp
,
fields
,
tags
);
return
JSON
.
toJSONString
(
metricEvent
);
// String name = "cpu_system_mb";
// Long timestamp = System.currentTimeMillis();
// Map<String, Object> fields = new HashMap<>();
// fields.put("used_pct", 0.5);
// Map<String, String> tags = new HashMap<>();
// tags.put("hostname", "yf170");
// MetricEvent metricEvent = new MetricEvent(name, timestamp, fields, tags);
JSONObject
jsonObject
=
new
JSONObject
();
jsonObject
.
put
(
"id"
,
"1"
);
jsonObject
.
put
(
"alarmTime"
,
"1608775050000"
);
jsonObject
.
put
(
"state"
,
"normal"
);
return
JSON
.
toJSONString
(
jsonObject
);
}
...
...
@@ -61,7 +69,7 @@ public class MockMetricEvent {
public
static
void
main
(
String
[]
args
)
throws
ExecutionException
,
InterruptedException
{
for
(
int
i
=
0
;
i
<
3
00
;
i
++)
{
for
(
int
i
=
0
;
i
<
3
;
i
++)
{
send
();
}
}
...
...
src/main/java/com/zorkdata/tools/mock/MockStreamxLogAvro.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.alibaba.fastjson.JSONObject
;
import
com.zorkdata.tools.kafka.Producer
;
import
com.zorkdata.tools.kafka.ProducerPool
;
import
com.zorkdata.tools.utils.DateUtil
;
import
com.zorkdata.tools.utils.PropertiesUtil
;
import
com.zorkdata.tools.utils.StringUtil
;
import
org.joda.time.DateTime
;
import
java.util.*
;
/**
* @author zhuzhigang
*/
public
class
MockStreamxLogAvro
{
private
static
long
getSize
(
String
propertiesName
)
throws
Exception
{
Properties
properties
=
PropertiesUtil
.
getProperties
(
propertiesName
);
long
logSize
=
StringUtil
.
getLong
(
properties
.
getProperty
(
"log.size"
,
"5000"
).
trim
(),
1
);
return
logSize
;
}
public
static
String
printData
(
String
logTypeName
,
String
timestamp
,
String
source
,
String
offset
,
Map
<
String
,
String
>
dimensions
,
Map
<
String
,
Double
>
metrics
,
Map
<
String
,
String
>
normalFields
)
{
JSONObject
jsonObject
=
new
JSONObject
();
jsonObject
.
put
(
"logTypeName"
,
logTypeName
);
jsonObject
.
put
(
"timestamp"
,
timestamp
);
jsonObject
.
put
(
"source"
,
source
);
jsonObject
.
put
(
"offset"
,
offset
);
jsonObject
.
put
(
"dimensions"
,
dimensions
);
jsonObject
.
put
(
"measures"
,
metrics
);
jsonObject
.
put
(
"normalFields"
,
normalFields
);
return
jsonObject
.
toString
();
}
private
static
String
getRandomOffset
()
{
Random
random
=
new
Random
();
long
l
=
random
.
nextInt
(
10000
);
return
String
.
valueOf
(
l
);
}
private
static
Map
<
String
,
String
>
getRandomDimensions
()
{
Random
random
=
new
Random
();
int
i
=
random
.
nextInt
(
10
);
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"hostname"
,
"shandong2"
);
//"zorkdata" + i);
dimensions
.
put
(
"ip"
,
"192.168.70.220"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"cluster"
,
"基础监控"
);
dimensions
.
put
(
"module"
,
"ShanDong"
);
dimensions
.
put
(
"servicename"
,
"servicecode"
);
dimensions
.
put
(
"servicecode"
,
"servicecode"
);
return
dimensions
;
}
private
static
String
[]
codes
=
{
"AO"
,
"AF"
,
"AL"
,
"DZ"
,
"AD"
,
"AI"
,
"AG"
,
"AR"
,
"AM"
,
"AU"
,
"AT"
,
"AZ"
,
"BS"
,
"BH"
,
"BD"
,
"BB"
,
"BY"
,
"BE"
,
"BZ"
,
"BJ"
};
private
static
String
getRandomCountryCode
()
{
Random
random
=
new
Random
(
codes
.
length
);
return
codes
[
new
Random
(
codes
.
length
).
nextInt
(
codes
.
length
)];
}
private
static
Map
<
String
,
String
>
getRandomNormalFieldsError
()
{
Map
<
String
,
String
>
normalFields
=
new
HashMap
<>();
normalFields
.
put
(
"message"
,
"data update error"
);
// normalFields.put("countryCode", getRandomCountryCode());
// normalFields.put("collecttime", "testCollecttime");
return
normalFields
;
}
private
static
Map
<
String
,
String
>
getRandomNormalFieldsSuccess
()
{
Map
<
String
,
String
>
normalFields
=
new
HashMap
<>();
normalFields
.
put
(
"message"
,
"data update success"
);
// normalFields.put("countryCode", getRandomCountryCode());
// normalFields.put("collecttime", "testCollecttime");
return
normalFields
;
}
public
static
void
main
(
String
[]
args
)
throws
Exception
{
long
size
=
30000
;
int
cycle
=
0
;
DateTime
timeNow
=
new
DateTime
();
int
minute
=
timeNow
.
getSecondOfMinute
()/
15
*
15
;
DateTime
timeStart
=
new
DateTime
(
timeNow
.
getYear
(),
timeNow
.
getMonthOfYear
(),
timeNow
.
getDayOfMonth
(),
timeNow
.
getHourOfDay
(),
timeNow
.
getMinuteOfHour
(),
minute
,
000
);
// DateTime timeStart = new DateTime(2020,12,02,9, 00,00,000);
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
if
(
i
!=
0
)
{
timeStart
=
timeStart
.
plusSeconds
(
20
);
Thread
.
sleep
(
20000
);
}
DateTime
time
=
new
DateTime
(
new
Date
());
String
logTypeName
=
"default_analysis_template"
;
// String timestamp = DateUtil.getUTCTimeStr();
String
timestamp
=
timeStart
.
toString
();
String
source
=
"/var/log/test.log"
;
String
offset
=
getRandomOffset
();
Map
<
String
,
String
>
dimensions
=
getRandomDimensions
();
Map
<
String
,
Double
>
measures
=
new
HashMap
<>();
Map
<
String
,
String
>
normalFields
=
null
;
// if (cycle==0) {
// if (cycle==0||cycle==2||cycle==8||cycle==13) {
normalFields
=
getRandomNormalFieldsError
();
// cycle++;
// }
// else if (cycle>=8&&cycle<=10) {
// cycle++;
// normalFields = getRandomNormalFieldsSuccess();
// }
// else {
// if (cycle == 15) {
// cycle = 0;
// }else {
// cycle++;
// }
// continue;
// }
//System.out.println(printData(logTypeName, timestamp, source, offset, dimensions, measures, normalFields));
Producer
producer
=
ProducerPool
.
getInstance
().
getProducer
();
// producer.sendLog("alert_test_source", logTypeName, timestamp, source, offset, dimensions, measures, normalFields);
producer
.
sendLog
(
"dwd_default_log"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
}
Thread
.
sleep
(
1000
);
}
}
src/main/java/com/zorkdata/tools/mock/MockStreamxLogAvroQQT.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.alibaba.fastjson.JSONObject
;
import
com.zorkdata.tools.kafka.Producer
;
import
com.zorkdata.tools.kafka.ProducerPool
;
import
com.zorkdata.tools.utils.DateUtil
;
import
com.zorkdata.tools.utils.PropertiesUtil
;
import
com.zorkdata.tools.utils.StringUtil
;
import
org.joda.time.DateTime
;
import
java.util.*
;
/**
* @author zhuzhigang
*/
public
class
MockStreamxLogAvroQQT
{
private
static
long
getSize
(
String
propertiesName
)
throws
Exception
{
Properties
properties
=
PropertiesUtil
.
getProperties
(
propertiesName
);
long
logSize
=
StringUtil
.
getLong
(
properties
.
getProperty
(
"log.size"
,
"5000"
).
trim
(),
1
);
return
logSize
;
}
public
static
String
printData
(
String
logTypeName
,
String
timestamp
,
String
source
,
String
offset
,
Map
<
String
,
String
>
dimensions
,
Map
<
String
,
Double
>
metrics
,
Map
<
String
,
String
>
normalFields
)
{
JSONObject
jsonObject
=
new
JSONObject
();
jsonObject
.
put
(
"logTypeName"
,
logTypeName
);
jsonObject
.
put
(
"timestamp"
,
timestamp
);
jsonObject
.
put
(
"source"
,
source
);
jsonObject
.
put
(
"offset"
,
offset
);
jsonObject
.
put
(
"dimensions"
,
dimensions
);
jsonObject
.
put
(
"measures"
,
metrics
);
jsonObject
.
put
(
"normalFields"
,
normalFields
);
return
jsonObject
.
toString
();
}
private
static
String
getRandomOffset
()
{
Random
random
=
new
Random
();
long
l
=
random
.
nextInt
(
10000
);
return
String
.
valueOf
(
l
);
}
private
static
Map
<
String
,
String
>
getRandomDimensions
()
{
Random
random
=
new
Random
();
int
i
=
random
.
nextInt
(
10
);
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
// dimensions.put("hostname", "shandong2");//"zorkdata" + i);
// dimensions.put("ip", "192.168.70.220");
// dimensions.put("appsystem", "dev_test");
// dimensions.put("cluster", "基础监控");
// dimensions.put("module", "ShanDong");
dimensions
.
put
(
"servicename"
,
"linux模块"
);
dimensions
.
put
(
"servicecode"
,
"linux模块"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"clustername"
,
"基础监控"
);
dimensions
.
put
(
"appprogramname"
,
"linux模块"
);
dimensions
.
put
(
"hostname"
,
"host-11"
);
dimensions
.
put
(
"ip"
,
"192.168.13.11"
);
return
dimensions
;
}
private
static
String
[]
codes
=
{
"AO"
,
"AF"
,
"AL"
,
"DZ"
,
"AD"
,
"AI"
,
"AG"
,
"AR"
,
"AM"
,
"AU"
,
"AT"
,
"AZ"
,
"BS"
,
"BH"
,
"BD"
,
"BB"
,
"BY"
,
"BE"
,
"BZ"
,
"BJ"
};
private
static
String
getRandomCountryCode
()
{
Random
random
=
new
Random
(
codes
.
length
);
return
codes
[
new
Random
(
codes
.
length
).
nextInt
(
codes
.
length
)];
}
private
static
Map
<
String
,
String
>
getRandomNormalFieldsError
()
{
Map
<
String
,
String
>
normalFields
=
new
HashMap
<>();
normalFields
.
put
(
"message"
,
"aaaaaaaaaaaaaaaaaaaa"
);
return
normalFields
;
}
private
static
Map
<
String
,
String
>
getRandomNormalFieldsSuccess
()
{
Map
<
String
,
String
>
normalFields
=
new
HashMap
<>();
normalFields
.
put
(
"message"
,
"data update success"
);
return
normalFields
;
}
public
static
void
main
(
String
[]
args
)
throws
Exception
{
long
size
=
30000
;
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
if
(
i
!=
0
)
{
Thread
.
sleep
(
10000
);
}
String
logTypeName
=
"default_analysis_template"
;
String
timestamp
=
DateUtil
.
getUTCTimeStr
();
String
source
=
"/var/log/test.log"
;
String
offset
=
getRandomOffset
();
Map
<
String
,
String
>
dimensions
=
getRandomDimensions
();
Map
<
String
,
Double
>
measures
=
new
HashMap
<>();
Map
<
String
,
String
>
normalFields
=
null
;
normalFields
=
getRandomNormalFieldsError
();
Producer
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"dwd_default_log"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
}
}
}
src/main/java/com/zorkdata/tools/mock/MockStreamxLogAvroTest1.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.alibaba.fastjson.JSONObject
;
import
com.zorkdata.tools.kafka.Producer
;
import
com.zorkdata.tools.kafka.ProducerPool
;
import
com.zorkdata.tools.utils.PropertiesUtil
;
import
com.zorkdata.tools.utils.StringUtil
;
import
org.joda.time.DateTime
;
import
java.util.*
;
/**
* @author zhuzhigang
*/
public
class
MockStreamxLogAvroTest1
{
private
static
long
getSize
(
String
propertiesName
)
throws
Exception
{
Properties
properties
=
PropertiesUtil
.
getProperties
(
propertiesName
);
long
logSize
=
StringUtil
.
getLong
(
properties
.
getProperty
(
"log.size"
,
"5000"
).
trim
(),
1
);
return
logSize
;
}
public
static
String
printData
(
String
logTypeName
,
String
timestamp
,
String
source
,
String
offset
,
Map
<
String
,
String
>
dimensions
,
Map
<
String
,
Double
>
metrics
,
Map
<
String
,
String
>
normalFields
)
{
JSONObject
jsonObject
=
new
JSONObject
();
jsonObject
.
put
(
"logTypeName"
,
logTypeName
);
jsonObject
.
put
(
"timestamp"
,
timestamp
);
jsonObject
.
put
(
"source"
,
source
);
jsonObject
.
put
(
"offset"
,
offset
);
jsonObject
.
put
(
"dimensions"
,
dimensions
);
jsonObject
.
put
(
"measures"
,
metrics
);
jsonObject
.
put
(
"normalFields"
,
normalFields
);
return
jsonObject
.
toString
();
}
private
static
String
getRandomOffset
()
{
Random
random
=
new
Random
();
long
l
=
random
.
nextInt
(
10000
);
return
String
.
valueOf
(
l
);
}
private
static
Map
<
String
,
String
>
getRandomDimensions
()
{
Random
random
=
new
Random
();
int
i
=
random
.
nextInt
(
10
);
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"hostname"
,
"yf170"
);
//"zorkdata" + i);
dimensions
.
put
(
"ip"
,
"192.168.70.170"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"clustername"
,
"基础监控"
);
dimensions
.
put
(
"appprogramname"
,
"servicecode"
);
dimensions
.
put
(
"servicename"
,
"servicecode"
);
dimensions
.
put
(
"servicecode"
,
"servicecode"
);
return
dimensions
;
}
private
static
Map
<
String
,
String
>
getRandomDimensions2
()
{
Random
random
=
new
Random
();
int
i
=
random
.
nextInt
(
10
);
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"hostname"
,
"yf171"
);
//"zorkdata" + i);
dimensions
.
put
(
"ip"
,
"192.168.70.171"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"clustername"
,
"基础监控"
);
dimensions
.
put
(
"appprogramname"
,
"servicecode"
);
dimensions
.
put
(
"servicename"
,
"servicecode"
);
dimensions
.
put
(
"servicecode"
,
"servicecode"
);
return
dimensions
;
}
private
static
String
[]
codes
=
{
"AO"
,
"AF"
,
"AL"
,
"DZ"
,
"AD"
,
"AI"
,
"AG"
,
"AR"
,
"AM"
,
"AU"
,
"AT"
,
"AZ"
,
"BS"
,
"BH"
,
"BD"
,
"BB"
,
"BY"
,
"BE"
,
"BZ"
,
"BJ"
};
private
static
String
getRandomCountryCode
()
{
Random
random
=
new
Random
(
codes
.
length
);
return
codes
[
new
Random
(
codes
.
length
).
nextInt
(
codes
.
length
)];
}
private
static
Map
<
String
,
String
>
getRandomNormalFieldsError
()
{
Map
<
String
,
String
>
normalFields
=
new
HashMap
<>();
normalFields
.
put
(
"message"
,
"data update error"
);
// normalFields.put("countryCode", getRandomCountryCode());
normalFields
.
put
(
"collecttime"
,
"testCollecttime"
);
return
normalFields
;
}
private
static
Map
<
String
,
String
>
getRandomNormalFieldsSuccess
()
{
Map
<
String
,
String
>
normalFields
=
new
HashMap
<>();
normalFields
.
put
(
"message"
,
"data update success"
);
// normalFields.put("countryCode", getRandomCountryCode());
normalFields
.
put
(
"collecttime"
,
"testCollecttime"
);
return
normalFields
;
}
public
static
void
main
(
String
[]
args
)
throws
Exception
{
long
size
=
5000
;
// int cycle = 0;
DateTime
timeNow
=
new
DateTime
();
int
minute
=
timeNow
.
getSecondOfMinute
()/
15
*
15
;
DateTime
timeStart
=
new
DateTime
(
timeNow
.
getYear
(),
timeNow
.
getMonthOfYear
(),
timeNow
.
getDayOfMonth
(),
timeNow
.
getHourOfDay
(),
timeNow
.
getMinuteOfHour
(),
minute
,
000
);
// DateTime timeStart = new DateTime(2020,12,02,9, 00,00,000);
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
if
(
i
!=
0
)
{
timeStart
=
timeStart
.
plusSeconds
(
15
);
Thread
.
sleep
(
15000
);
}
DateTime
time
=
new
DateTime
(
new
Date
());
String
logTypeName
=
"default_analysis_template"
;
String
timestamp
=
timeStart
.
toString
();
String
source
=
"/var/log/test.log"
;
String
offset
=
getRandomOffset
();
Map
<
String
,
String
>
dimensions
=
getRandomDimensions
();
Map
<
String
,
String
>
dimensions2
=
getRandomDimensions2
();
Map
<
String
,
Double
>
measures
=
new
HashMap
<>();
Map
<
String
,
String
>
normalFieldsError
=
null
;
Map
<
String
,
String
>
normalFieldsSucess
=
null
;
int
cycle
=
i
%
16
;
// if (cycle==0||cycle==5||cycle==11) {
if
(
cycle
==
0
||
cycle
==
2
||
cycle
==
8
||
cycle
==
13
)
{
// if (cycle>=0&&cycle<=2) {
normalFieldsError
=
getRandomNormalFieldsError
();
normalFieldsSucess
=
getRandomNormalFieldsSuccess
();
cycle
++;
}
// else if (cycle>=8&&cycle<=10) {
// cycle++;
// normalFields = getRandomNormalFieldsSuccess();
// }
else
{
if
(
cycle
==
15
)
{
cycle
=
0
;
}
else
{
cycle
++;
}
continue
;
}
Producer
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"alert_test_source"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFieldsError
);
// Thread.sleep(1000);
// producer.sendLog("alert_test_source", logTypeName, timestamp, source, offset, dimensions2,
// measures, normalFieldsError);
// for (int j = 0; j < 9999; j++) {
// producer.sendLog("alert_test_source", logTypeName, timestamp, source, offset, dimensions,
// measures, normalFieldsSucess);
// }
// producer.sendLog("alert_test_source", logTypeName, timestamp, source, offset, dimensions2,
// measures, normalFields);
}
Thread
.
sleep
(
1000
);
}
}
src/main/java/com/zorkdata/tools/mock/MockStreamxLogAvroTest2.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.alibaba.fastjson.JSONObject
;
import
com.zorkdata.tools.kafka.Producer
;
import
com.zorkdata.tools.kafka.ProducerPool
;
import
com.zorkdata.tools.utils.PropertiesUtil
;
import
com.zorkdata.tools.utils.StringUtil
;
import
org.joda.time.DateTime
;
import
java.util.*
;
/**
* @author zhuzhigang
*/
public
class
MockStreamxLogAvroTest2
{
private
static
long
getSize
(
String
propertiesName
)
throws
Exception
{
Properties
properties
=
PropertiesUtil
.
getProperties
(
propertiesName
);
long
logSize
=
StringUtil
.
getLong
(
properties
.
getProperty
(
"log.size"
,
"5000"
).
trim
(),
1
);
return
logSize
;
}
public
static
String
printData
(
String
logTypeName
,
String
timestamp
,
String
source
,
String
offset
,
Map
<
String
,
String
>
dimensions
,
Map
<
String
,
Double
>
metrics
,
Map
<
String
,
String
>
normalFields
)
{
JSONObject
jsonObject
=
new
JSONObject
();
jsonObject
.
put
(
"logTypeName"
,
logTypeName
);
jsonObject
.
put
(
"timestamp"
,
timestamp
);
jsonObject
.
put
(
"source"
,
source
);
jsonObject
.
put
(
"offset"
,
offset
);
jsonObject
.
put
(
"dimensions"
,
dimensions
);
jsonObject
.
put
(
"measures"
,
metrics
);
jsonObject
.
put
(
"normalFields"
,
normalFields
);
return
jsonObject
.
toString
();
}
private
static
String
getRandomOffset
()
{
Random
random
=
new
Random
();
long
l
=
random
.
nextInt
(
10000
);
return
String
.
valueOf
(
l
);
}
private
static
Map
<
String
,
String
>
getRandomDimensions
()
{
Random
random
=
new
Random
();
int
i
=
random
.
nextInt
(
10
);
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"hostname"
,
"yf170"
);
//"zorkdata" + i);
dimensions
.
put
(
"ip"
,
"192.168.70.170"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"clustername"
,
"基础监控"
);
dimensions
.
put
(
"appprogramname"
,
"servicecode"
);
dimensions
.
put
(
"servicename"
,
"servicecode"
);
dimensions
.
put
(
"servicecode"
,
"servicecode"
);
return
dimensions
;
}
private
static
String
[]
codes
=
{
"AO"
,
"AF"
,
"AL"
,
"DZ"
,
"AD"
,
"AI"
,
"AG"
,
"AR"
,
"AM"
,
"AU"
,
"AT"
,
"AZ"
,
"BS"
,
"BH"
,
"BD"
,
"BB"
,
"BY"
,
"BE"
,
"BZ"
,
"BJ"
};
private
static
String
getRandomCountryCode
()
{
Random
random
=
new
Random
(
codes
.
length
);
return
codes
[
new
Random
(
codes
.
length
).
nextInt
(
codes
.
length
)];
}
private
static
Map
<
String
,
String
>
getRandomNormalFieldsError
()
{
Map
<
String
,
String
>
normalFields
=
new
HashMap
<>();
normalFields
.
put
(
"message"
,
"data update error"
);
// normalFields.put("countryCode", getRandomCountryCode());
normalFields
.
put
(
"collecttime"
,
"testCollecttime"
);
return
normalFields
;
}
private
static
Map
<
String
,
String
>
getRandomNormalFieldsSuccess
()
{
Map
<
String
,
String
>
normalFields
=
new
HashMap
<>();
normalFields
.
put
(
"message"
,
"data update success"
);
// normalFields.put("countryCode", getRandomCountryCode());
normalFields
.
put
(
"collecttime"
,
"testCollecttime"
);
return
normalFields
;
}
public
static
void
main
(
String
[]
args
)
throws
Exception
{
long
size
=
1600
;
// int cycle = 0;
DateTime
timeNow
=
new
DateTime
();
int
minute
=
timeNow
.
getSecondOfMinute
()/
15
*
15
;
DateTime
timeStart
=
new
DateTime
(
timeNow
.
getYear
(),
timeNow
.
getMonthOfYear
(),
timeNow
.
getDayOfMonth
(),
timeNow
.
getHourOfDay
(),
timeNow
.
getMinuteOfHour
(),
minute
,
000
);
// DateTime timeStart = new DateTime(2020,12,02,9, 00,00,000);
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
if
(
i
!=
0
)
{
timeStart
=
timeStart
.
plusSeconds
(
15
);
Thread
.
sleep
(
15000
);
}
DateTime
time
=
new
DateTime
(
new
Date
());
String
logTypeName
=
"default_analysis_template"
;
String
timestamp
=
timeStart
.
toString
();
String
source
=
"/var/log/test.log"
;
String
offset
=
getRandomOffset
();
Map
<
String
,
String
>
dimensions
=
getRandomDimensions
();
Map
<
String
,
Double
>
measures
=
new
HashMap
<>();
Map
<
String
,
String
>
normalFields
=
null
;
int
cycle
=
i
%
16
;
if
(
cycle
==
0
||
cycle
==
5
||
cycle
==
15
)
{
// if (cycle>=0&&cycle<=2) {
normalFields
=
getRandomNormalFieldsError
();
cycle
++;
}
// else if (cycle>=8&&cycle<=10) {
// cycle++;
// normalFields = getRandomNormalFieldsSuccess();
// }
else
{
if
(
cycle
==
15
)
{
cycle
=
0
;
}
else
{
cycle
++;
}
continue
;
}
Producer
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"alert_test_source"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
}
Thread
.
sleep
(
1000
);
}
}
src/main/java/com/zorkdata/tools/mock/MockStreamxLogAvroTest20201218.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.alibaba.fastjson.JSONObject
;
import
com.zorkdata.tools.kafka.Producer
;
import
com.zorkdata.tools.kafka.ProducerPool
;
import
com.zorkdata.tools.utils.PropertiesUtil
;
import
com.zorkdata.tools.utils.StringUtil
;
import
org.joda.time.DateTime
;
import
java.util.*
;
/**
* @author zhuzhigang
*/
public
class
MockStreamxLogAvroTest20201218
{
private
static
String
getRandomOffset
()
{
Random
random
=
new
Random
();
long
l
=
random
.
nextInt
(
10000
);
return
String
.
valueOf
(
l
);
}
private
static
Map
<
String
,
String
>
getRandomDimensions
()
{
Random
random
=
new
Random
();
int
i
=
random
.
nextInt
(
10
);
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"hostname"
,
"yf170"
);
//"zorkdata" + i);
dimensions
.
put
(
"ip"
,
"192.168.70.170"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"clustername"
,
"基础监控"
);
dimensions
.
put
(
"appprogramname"
,
"servicecode"
);
dimensions
.
put
(
"servicename"
,
"servicecode"
);
dimensions
.
put
(
"servicecode"
,
"servicecode"
);
return
dimensions
;
}
private
static
Map
<
String
,
String
>
getRandomNormalFieldsError
()
{
Map
<
String
,
String
>
normalFields
=
new
HashMap
<>();
normalFields
.
put
(
"message"
,
"data update error"
);
normalFields
.
put
(
"collecttime"
,
"testCollecttime"
);
return
normalFields
;
}
private
static
Map
<
String
,
String
>
getRandomNormalFieldsSuccess
()
{
Map
<
String
,
String
>
normalFields
=
new
HashMap
<>();
normalFields
.
put
(
"message"
,
"data update success"
);
normalFields
.
put
(
"collecttime"
,
"testCollecttime"
);
return
normalFields
;
}
public
static
void
main
(
String
[]
args
)
throws
Exception
{
DateTime
timeStart
=
new
DateTime
(
2020
,
12
,
18
,
21
,
54
,
07
,
172
);
String
logTypeName
=
"default_analysis_template"
;
String
timestamp
=
timeStart
.
toString
();
String
source
=
"/var/log/test.log"
;
String
offset
=
getRandomOffset
();
Map
<
String
,
String
>
dimensions
=
getRandomDimensions
();
Map
<
String
,
Double
>
measures
=
new
HashMap
<>();
Map
<
String
,
String
>
normalFields
=
getRandomNormalFieldsError
();
Producer
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"alert_test_source"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
Thread
.
sleep
(
1000L
);
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"alert_test_source"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
Thread
.
sleep
(
1000L
);
timeStart
=
new
DateTime
(
2020
,
12
,
18
,
21
,
57
,
52
,
195
);
timestamp
=
timeStart
.
toString
();
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"alert_test_source"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
Thread
.
sleep
(
1000L
);
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"alert_test_source"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
Thread
.
sleep
(
1000L
);
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"alert_test_source"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
Thread
.
sleep
(
1000L
);
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"alert_test_source"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
Thread
.
sleep
(
1000L
);
timeStart
=
new
DateTime
(
2020
,
12
,
18
,
21
,
57
,
52
,
196
);
timestamp
=
timeStart
.
toString
();
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"alert_test_source"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
Thread
.
sleep
(
1000L
);
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"alert_test_source"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
Thread
.
sleep
(
1000L
);
timeStart
=
new
DateTime
(
2020
,
12
,
18
,
21
,
59
,
07
,
205
);
timestamp
=
timeStart
.
toString
();
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"alert_test_source"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
Thread
.
sleep
(
1000L
);
timeStart
=
new
DateTime
(
2020
,
12
,
18
,
21
,
59
,
07
,
206
);
timestamp
=
timeStart
.
toString
();
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"alert_test_source"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
}
}
src/main/java/com/zorkdata/tools/mock/MockStreamxLogAvroTestRandom.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.alibaba.fastjson.JSONObject
;
import
com.zorkdata.tools.kafka.Producer
;
import
com.zorkdata.tools.kafka.ProducerPool
;
import
com.zorkdata.tools.utils.PropertiesUtil
;
import
com.zorkdata.tools.utils.StringUtil
;
import
org.joda.time.DateTime
;
import
java.util.*
;
/**
* @author zhuzhigang
*/
public
class
MockStreamxLogAvroTestRandom
{
private
static
long
getSize
(
String
propertiesName
)
throws
Exception
{
Properties
properties
=
PropertiesUtil
.
getProperties
(
propertiesName
);
long
logSize
=
StringUtil
.
getLong
(
properties
.
getProperty
(
"log.size"
,
"5000"
).
trim
(),
1
);
return
logSize
;
}
public
static
String
printData
(
String
logTypeName
,
String
timestamp
,
String
source
,
String
offset
,
Map
<
String
,
String
>
dimensions
,
Map
<
String
,
Double
>
metrics
,
Map
<
String
,
String
>
normalFields
)
{
JSONObject
jsonObject
=
new
JSONObject
();
jsonObject
.
put
(
"logTypeName"
,
logTypeName
);
jsonObject
.
put
(
"timestamp"
,
timestamp
);
jsonObject
.
put
(
"source"
,
source
);
jsonObject
.
put
(
"offset"
,
offset
);
jsonObject
.
put
(
"dimensions"
,
dimensions
);
jsonObject
.
put
(
"measures"
,
metrics
);
jsonObject
.
put
(
"normalFields"
,
normalFields
);
return
jsonObject
.
toString
();
}
private
static
String
getRandomOffset
()
{
Random
random
=
new
Random
();
long
l
=
random
.
nextInt
(
10000
);
return
String
.
valueOf
(
l
);
}
private
static
Map
<
String
,
String
>
getRandomDimensions
()
{
Random
random
=
new
Random
();
int
i
=
random
.
nextInt
(
10
);
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"hostname"
,
"yf170"
);
//"zorkdata" + i);
dimensions
.
put
(
"ip"
,
"192.168.70.170"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"clustername"
,
"基础监控"
);
dimensions
.
put
(
"appprogramname"
,
"servicecode"
);
dimensions
.
put
(
"servicename"
,
"servicecode"
);
dimensions
.
put
(
"servicecode"
,
"servicecode"
);
return
dimensions
;
}
private
static
String
[]
codes
=
{
"AO"
,
"AF"
,
"AL"
,
"DZ"
,
"AD"
,
"AI"
,
"AG"
,
"AR"
,
"AM"
,
"AU"
,
"AT"
,
"AZ"
,
"BS"
,
"BH"
,
"BD"
,
"BB"
,
"BY"
,
"BE"
,
"BZ"
,
"BJ"
};
private
static
String
getRandomCountryCode
()
{
Random
random
=
new
Random
(
codes
.
length
);
return
codes
[
new
Random
(
codes
.
length
).
nextInt
(
codes
.
length
)];
}
private
static
Map
<
String
,
String
>
getRandomNormalFieldsError
()
{
Map
<
String
,
String
>
normalFields
=
new
HashMap
<>();
normalFields
.
put
(
"message"
,
"data update error"
);
// normalFields.put("countryCode", getRandomCountryCode());
normalFields
.
put
(
"collecttime"
,
"testCollecttime"
);
return
normalFields
;
}
private
static
Map
<
String
,
String
>
getRandomNormalFieldsSuccess
()
{
Map
<
String
,
String
>
normalFields
=
new
HashMap
<>();
normalFields
.
put
(
"message"
,
"data update success"
);
// normalFields.put("countryCode", getRandomCountryCode());
normalFields
.
put
(
"collecttime"
,
"testCollecttime"
);
return
normalFields
;
}
public
static
void
main
(
String
[]
args
)
throws
Exception
{
long
size
=
200
;
// int cycle = 0;
DateTime
timeNow
=
new
DateTime
();
int
minute
=
timeNow
.
getSecondOfMinute
()/
15
*
15
;
DateTime
timeStart
=
new
DateTime
(
timeNow
.
getYear
(),
timeNow
.
getMonthOfYear
(),
timeNow
.
getDayOfMonth
(),
timeNow
.
getHourOfDay
(),
timeNow
.
getMinuteOfHour
(),
minute
,
000
);
// DateTime timeStart = new DateTime(2020,12,02,9, 00,00,000);
Random
r
=
new
Random
();
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
if
(
i
!=
0
)
{
int
randomNum
=
r
.
nextInt
(
100
-
20
+
1
)+
20
;
timeStart
=
timeStart
.
plusSeconds
(
randomNum
);
// Thread.sleep(randomNum*1000);
// timeStart = timeStart.plusSeconds(15);
// Thread.sleep(15000);
}
DateTime
time
=
new
DateTime
(
new
Date
());
String
logTypeName
=
"default_analysis_template"
;
String
timestamp
=
timeStart
.
toString
();
String
source
=
"/var/log/test.log"
;
String
offset
=
getRandomOffset
();
Map
<
String
,
String
>
dimensions
=
getRandomDimensions
();
Map
<
String
,
Double
>
measures
=
new
HashMap
<>();
Map
<
String
,
String
>
normalFields
=
null
;
normalFields
=
getRandomNormalFieldsError
();
// int cycle = i%16;
// if (cycle==0||cycle==2||cycle==8||cycle==13) {
//// if (cycle>=0&&cycle<=2) {
// normalFields = getRandomNormalFieldsError();
// cycle++;
// }
//// else if (cycle>=8&&cycle<=10) {
//// cycle++;
//// normalFields = getRandomNormalFieldsSuccess();
//// }
// else {
// if (cycle == 15) {
// cycle = 0;
// }else {
// cycle++;
// }
// continue;
// }
Producer
producer
=
ProducerPool
.
getInstance
().
getProducer
();
producer
.
sendLog
(
"alert_test_source"
,
logTypeName
,
timestamp
,
source
,
offset
,
dimensions
,
measures
,
normalFields
);
}
Thread
.
sleep
(
1000
);
}
}
src/main/java/com/zorkdata/tools/mock/MockStreamxMetricAvro.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.zorkdata.tools.avro.AvroSerializer
;
import
com.zorkdata.tools.avro.AvroSerializerFactory
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.ByteArraySerializer
;
import
org.joda.time.DateTime
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Properties
;
import
java.util.Random
;
/**
* @author DeleMing
*/
@Slf4j
public
class
MockStreamxMetricAvro
{
private
static
String
topic
=
"alert_test_source_001"
;
// private static String brokerAddr = "kafka-1:19092,kafka-2:19092,kafka-3:19092";
// private static String brokerAddr = "yf122:9092,yf121:9092,yf120:9092";
private
static
String
brokerAddr
=
"autotest-3:9092,autotest-2:9092,autotest-1:9092"
;
private
static
ProducerRecord
<
String
,
byte
[]>
producerRecord
=
null
;
private
static
KafkaProducer
<
String
,
byte
[]>
producer
=
null
;
private
static
DateTime
timeStart
;
public
static
void
init
()
{
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
brokerAddr
);
props
.
put
(
"acks"
,
"1"
);
props
.
put
(
"retries"
,
0
);
props
.
put
(
"key.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
put
(
"value.serializer"
,
ByteArraySerializer
.
class
.
getName
());
props
.
put
(
"batch.size"
,
16384
);
props
.
put
(
"linger.ms"
,
1
);
props
.
put
(
"buffer.memory"
,
33554432
);
producer
=
new
KafkaProducer
<
String
,
byte
[]>(
props
);
}
public
static
byte
[]
buildMetric
(
double
userPct
)
{
String
metricSetName
=
"cpu_system_mb"
;
// String timestamp = timeStart.toString();
String
timestamp
=
String
.
valueOf
(
timeStart
.
getMillis
());
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"hostname"
,
"yf120"
);
//"zorkdata" + i);
dimensions
.
put
(
"ip"
,
"192.168.70.120"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"cluster"
,
"基础监控"
);
dimensions
.
put
(
"module"
,
"lmt模块"
);
Map
<
String
,
Double
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"user_pct"
,
userPct
);
AvroSerializer
metricSerializer
=
AvroSerializerFactory
.
getMetricAvroSerializer
();
byte
[]
bytes
=
metricSerializer
.
serializingMetric
(
metricSetName
,
timestamp
,
dimensions
,
metrics
);
return
bytes
;
}
public
static
byte
[]
buildMetric2
(
double
userPct
)
{
String
metricSetName
=
"cpu_system_mb"
;
String
timestamp
=
String
.
valueOf
(
timeStart
.
getMillis
());
// String timestamp = timeStart.toString();
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"hostname"
,
"yf121"
);
//"zorkdata" + i);
dimensions
.
put
(
"ip"
,
"192.168.70.121"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"cluster"
,
"基础监控"
);
dimensions
.
put
(
"module"
,
"lmt模块"
);
Map
<
String
,
Double
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"user_pct"
,
userPct
);
AvroSerializer
metricSerializer
=
AvroSerializerFactory
.
getMetricAvroSerializer
();
byte
[]
bytes
=
metricSerializer
.
serializingMetric
(
metricSetName
,
timestamp
,
dimensions
,
metrics
);
return
bytes
;
}
public
static
void
send
(
String
topic
,
double
userPct
)
{
byte
[]
req
=
buildMetric
(
userPct
);
producerRecord
=
new
ProducerRecord
<
String
,
byte
[]>(
topic
,
""
,
req
);
producer
.
send
(
producerRecord
);
}
public
static
void
send2
(
String
topic
,
double
userPct
)
{
byte
[]
req
=
buildMetric2
(
userPct
);
producerRecord
=
new
ProducerRecord
<
String
,
byte
[]>(
topic
,
""
,
req
);
producer
.
send
(
producerRecord
);
}
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
init
();
DateTime
timeNow
=
new
DateTime
();
//时间往前对齐,15s。比如当前时间是21:22:17, 则时间取值为21:22:15
int
minute
=
timeNow
.
getSecondOfMinute
()/
15
*
15
;
timeStart
=
new
DateTime
(
timeNow
.
getYear
(),
timeNow
.
getMonthOfYear
(),
timeNow
.
getDayOfMonth
(),
timeNow
.
getHourOfDay
(),
timeNow
.
getMinuteOfHour
(),
minute
,
000
);
for
(
int
i
=
0
;
i
<=
30000
;
i
++)
{
if
(
i
!=
0
)
{
//15s发一次数据
Thread
.
sleep
(
15000
);
}
//当时间为0, 2, 8, 13发送,具体的看一下那个测试用例Excel
// int cycle = i%16;
// if (cycle==0||cycle==13) {
// send(topic,1);
// send2(topic,1);
// log.info(timeStart+"=>1");
// }else if (cycle==2||cycle==8) {
// send(topic,0.3);
// send2(topic,0.3);
// log.info(timeStart+"=>0.3");
// }
int
cycle
=
i
%
20
;
if
(
cycle
<=
11
)
{
send
(
topic
,
0.5
);
}
timeStart
=
timeStart
.
plusSeconds
(
15
);
}
}
}
src/main/java/com/zorkdata/tools/mock/MockStreamxMetricAvro0402.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.zorkdata.tools.avro.AvroSerializer
;
import
com.zorkdata.tools.avro.AvroSerializerFactory
;
import
com.zorkdata.tools.utils.DateUtil
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.ByteArraySerializer
;
import
org.joda.time.DateTime
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Properties
;
/**
* @author DeleMing
*/
@Slf4j
public
class
MockStreamxMetricAvro0402
{
private
static
String
topic
=
"dwd_all_metric"
;
private
static
String
brokerAddr
=
"node1:9092,node2:9092,node3:9092"
;
private
static
ProducerRecord
<
String
,
byte
[]>
producerRecord
=
null
;
private
static
KafkaProducer
<
String
,
byte
[]>
producer
=
null
;
private
static
DateTime
timeStart
;
public
static
void
init
()
{
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
brokerAddr
);
props
.
put
(
"acks"
,
"1"
);
props
.
put
(
"retries"
,
0
);
props
.
put
(
"key.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
put
(
"value.serializer"
,
ByteArraySerializer
.
class
.
getName
());
props
.
put
(
"batch.size"
,
16384
);
props
.
put
(
"linger.ms"
,
1
);
props
.
put
(
"buffer.memory"
,
33554432
);
producer
=
new
KafkaProducer
<
String
,
byte
[]>(
props
);
}
public
static
byte
[]
buildMetric
(
double
userPct
)
{
String
metricSetName
=
"cpu_system_mb"
;
String
timestamp
=
String
.
valueOf
(
timeStart
.
getMillis
());
// String timestamp = DateUtil.getUTCTimeStr();
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"hostname"
,
"node1"
);
//"zorkdata" + i);
dimensions
.
put
(
"ip"
,
"192.168.70.212"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"cluster"
,
"基础监控"
);
dimensions
.
put
(
"module"
,
"ShanDong"
);
Map
<
String
,
Double
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"user_pct"
,
userPct
);
AvroSerializer
metricSerializer
=
AvroSerializerFactory
.
getMetricAvroSerializer
();
byte
[]
bytes
=
metricSerializer
.
serializingMetric
(
metricSetName
,
timestamp
,
dimensions
,
metrics
);
return
bytes
;
}
public
static
void
send
(
String
topic
,
double
userPct
)
{
byte
[]
req
=
buildMetric
(
userPct
);
producerRecord
=
new
ProducerRecord
<
String
,
byte
[]>(
topic
,
""
,
req
);
producer
.
send
(
producerRecord
);
}
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
init
();
for
(
int
i
=
0
;
i
<
1000
;
i
++){
// int result = fun1(i%4);
send
(
topic
,
0.2
);
Thread
.
sleep
(
10000
);
}
}
public
static
double
fun1
(
int
i
){
double
tmp
=
0
;
if
(
i
==
0
){
tmp
=
0.05
;
}
if
(
i
==
1
){
tmp
=
0.2
;
}
if
(
i
==
2
){
tmp
=
0.2
;
}
return
tmp
;
}
}
src/main/java/com/zorkdata/tools/mock/MockStreamxMetricAvroFlink.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.zorkdata.tools.avro.AvroSerializer
;
import
com.zorkdata.tools.avro.AvroSerializerFactory
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.ByteArraySerializer
;
import
org.joda.time.DateTime
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Properties
;
/**
* @author DeleMing
*/
@Slf4j
public
class
MockStreamxMetricAvroFlink
{
private
static
String
topic
=
"alert_test_source_001"
;
private
static
String
brokerAddr
=
"autotest-3:9092,autotest-2:9092,autotest-1:9092"
;
private
static
ProducerRecord
<
String
,
String
>
producerRecord
=
null
;
private
static
KafkaProducer
<
String
,
String
>
producer
=
null
;
private
static
DateTime
timeStart
;
public
static
void
init
()
{
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
brokerAddr
);
props
.
put
(
"acks"
,
"1"
);
props
.
put
(
"retries"
,
0
);
props
.
put
(
"key.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
put
(
"value.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
put
(
"batch.size"
,
16384
);
props
.
put
(
"linger.ms"
,
1
);
props
.
put
(
"buffer.memory"
,
33554432
);
producer
=
new
KafkaProducer
<
String
,
String
>(
props
);
}
public
static
byte
[]
buildMetric
(
double
userPct
)
{
String
metricSetName
=
"cpu_system_mb"
;
String
timestamp
=
String
.
valueOf
(
timeStart
.
getMillis
());
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"hostname"
,
"yf120"
);
//"zorkdata" + i);
dimensions
.
put
(
"ip"
,
"192.168.70.120"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"cluster"
,
"基础监控"
);
dimensions
.
put
(
"module"
,
"lmt模块"
);
Map
<
String
,
Double
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"user_pct"
,
userPct
);
AvroSerializer
metricSerializer
=
AvroSerializerFactory
.
getMetricAvroSerializer
();
byte
[]
bytes
=
metricSerializer
.
serializingMetric
(
metricSetName
,
timestamp
,
dimensions
,
metrics
);
return
bytes
;
}
public
static
void
send
(
String
topic
,
double
userPct
)
{
byte
[]
req
=
buildMetric
(
userPct
);
producerRecord
=
new
ProducerRecord
<
String
,
String
>(
topic
,
""
,
"a b c d a b d e f"
);
producer
.
send
(
producerRecord
);
}
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
init
();
DateTime
timeNow
=
new
DateTime
();
int
minute
=
timeNow
.
getSecondOfMinute
()/
15
*
15
;
timeStart
=
new
DateTime
(
timeNow
.
getYear
(),
timeNow
.
getMonthOfYear
(),
timeNow
.
getDayOfMonth
(),
timeNow
.
getHourOfDay
(),
timeNow
.
getMinuteOfHour
(),
minute
,
000
);
for
(
int
i
=
0
;
i
<=
30000
;
i
++)
{
if
(
i
!=
0
)
{
Thread
.
sleep
(
15000
);
}
int
cycle
=
i
%
20
;
if
(
cycle
<=
11
)
{
send
(
topic
,
0.5
);
}
timeStart
=
timeStart
.
plusSeconds
(
15
);
}
}
}
src/main/java/com/zorkdata/tools/mock/MockStreamxMetricAvroQQT.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.zorkdata.tools.avro.AvroSerializer
;
import
com.zorkdata.tools.avro.AvroSerializerFactory
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.ByteArraySerializer
;
import
org.joda.time.DateTime
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Properties
;
/**
* @author DeleMing
*/
@Slf4j
public
class
MockStreamxMetricAvroQQT
{
private
static
String
topic
=
"qqt_dwd_all_metric"
;
// private static String brokerAddr = "kafka-1:19092,kafka-2:19092,kafka-3:19092";
// private static String brokerAddr = "yf122:9092,yf121:9092,yf120:9092";
// private static String brokerAddr = "autotest-3:9092,autotest-2:9092,autotest-1:9092";
private
static
String
brokerAddr
=
"node1:9092,node2:9092,node3:9092"
;
// private static String brokerAddr = "yf170:9092,yf171:9092,yf172:9092";
// private static String brokerAddr = "127.0.0.1:9092";
private
static
ProducerRecord
<
String
,
byte
[]>
producerRecord
=
null
;
private
static
KafkaProducer
<
String
,
byte
[]>
producer
=
null
;
private
static
DateTime
timeStart
;
public
static
void
init
()
{
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
brokerAddr
);
props
.
put
(
"acks"
,
"1"
);
props
.
put
(
"retries"
,
0
);
props
.
put
(
"key.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
put
(
"value.serializer"
,
ByteArraySerializer
.
class
.
getName
());
props
.
put
(
"batch.size"
,
16384
);
props
.
put
(
"linger.ms"
,
1
);
props
.
put
(
"buffer.memory"
,
33554432
);
producer
=
new
KafkaProducer
<
String
,
byte
[]>(
props
);
}
public
static
byte
[]
buildMetric
(
double
userPct
)
{
String
metricSetName
=
"cpu_system_mb"
;
// String timestamp = timeStart.toString();
String
timestamp
=
String
.
valueOf
(
timeStart
.
getMillis
());
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"hostname"
,
"shandong2"
);
//"zorkdata" + i);
// dimensions.put("ip", "192.168.70.120");
dimensions
.
put
(
"ip"
,
"192.168.70.220"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"cluster"
,
"基础监控"
);
dimensions
.
put
(
"module"
,
"lmt模块"
);
Map
<
String
,
Double
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"user_pct"
,
userPct
);
// metrics.put("system_pct", userPct);
AvroSerializer
metricSerializer
=
AvroSerializerFactory
.
getMetricAvroSerializer
();
byte
[]
bytes
=
metricSerializer
.
serializingMetric
(
metricSetName
,
timestamp
,
dimensions
,
metrics
);
//qqt
System
.
out
.
println
(
timestamp
+
","
+
userPct
);
return
bytes
;
}
public
static
void
send
(
String
topic
,
double
userPct
)
{
byte
[]
req
=
buildMetric
(
userPct
);
producerRecord
=
new
ProducerRecord
<
String
,
byte
[]>(
topic
,
""
,
req
);
producer
.
send
(
producerRecord
);
}
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
init
();
DateTime
timeNow
=
new
DateTime
();
//时间往前对齐,15s。比如当前时间是21:22:17, 则时间取值为21:22:15
int
minute
=
timeNow
.
getSecondOfMinute
()
/
15
*
15
;
timeStart
=
new
DateTime
(
timeNow
.
getYear
(),
timeNow
.
getMonthOfYear
(),
timeNow
.
getDayOfMonth
(),
timeNow
.
getHourOfDay
(),
timeNow
.
getMinuteOfHour
(),
minute
,
000
);
System
.
out
.
println
(
timeStart
);
//sin
// for (int i = 0; i <= 100 ; i++) {
// for (double y = 0; y < 360; y += 18) {
// double radians = Math.toRadians(y);
// send(topic, Math.sin(radians));
// Thread.sleep(15000);
// timeStart = timeStart.plusSeconds(15);
// }
// }
// for (int i = 0; i < 1000; i++){
// int result = fun1(i%4);
// send(topic,result);
// Thread.sleep(15000);
// timeStart = timeStart.plusSeconds(15);
// }
// // qqt折现式sin
for
(
int
i
=
0
;
i
<
100
;
i
++){
int
result
=
fun1
(
i
%
4
);
send
(
topic
,
result
);
Thread
.
sleep
(
15000
);
timeStart
=
timeStart
.
plusSeconds
(
15
);
}
// for (int i = 0; i <= 30000; i++) {
// if (i != 0) {
// //15s发一次数据
// Thread.sleep(15000);
// }
// //当时间为0, 2, 8, 13发送,具体的看一下那个测试用例Excel
// int cycle = i%16;
// if (cycle==0||cycle==13) {
// send(topic,1);
// log.info(timeStart+"=>1");
// }else if (cycle==2||cycle==8) {
// send(topic,0.3);
// log.info(timeStart+"=>0.3");
// }
// int cycle = i % 20;
// if (cycle <= 11) {
// send(topic, 0.5);
// }
//
// timeStart = timeStart.plusSeconds(15);
// }
}
public
static
int
fun1
(
int
i
){
int
tmp
=
0
;
if
(
tmp
==
0
){
tmp
=
0
;
}
if
(
i
==
1
){
tmp
=
-
1
;
}
if
(
i
==
2
){
tmp
=
0
;
}
if
(
i
==
3
){
tmp
=
1
;
}
return
tmp
;
}
}
src/main/java/com/zorkdata/tools/mock/MockStreamxMetricAvroTestRandom.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.alibaba.fastjson.JSONObject
;
import
com.zorkdata.tools.avro.AvroSerializer
;
import
com.zorkdata.tools.avro.AvroSerializerFactory
;
import
com.zorkdata.tools.kafka.Producer
;
import
com.zorkdata.tools.kafka.ProducerPool
;
import
com.zorkdata.tools.utils.PropertiesUtil
;
import
com.zorkdata.tools.utils.StringUtil
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.ByteArraySerializer
;
import
org.joda.time.DateTime
;
import
java.util.*
;
/**
* @author zhuzhigang
*/
@Slf4j
public
class
MockStreamxMetricAvroTestRandom
{
private
static
String
topic
=
"alert_test_source"
;
private
static
String
brokerAddr
=
"yf122:9092,yf121:9092,yf120:9092"
;
private
static
ProducerRecord
<
String
,
byte
[]>
producerRecord
=
null
;
private
static
KafkaProducer
<
String
,
byte
[]>
producer
=
null
;
public
static
void
init
()
{
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
brokerAddr
);
props
.
put
(
"acks"
,
"1"
);
props
.
put
(
"retries"
,
0
);
props
.
put
(
"key.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
put
(
"value.serializer"
,
ByteArraySerializer
.
class
.
getName
());
props
.
put
(
"batch.size"
,
16384
);
props
.
put
(
"linger.ms"
,
1
);
props
.
put
(
"buffer.memory"
,
33554432
);
producer
=
new
KafkaProducer
<
String
,
byte
[]>(
props
);
}
public
static
byte
[]
buildMetric
()
{
Random
random
=
new
Random
();
String
metricSetName
=
"cpu_system_mb"
;
DateTime
timeNow
=
new
DateTime
();
int
minute
=
timeNow
.
getSecondOfMinute
()/
15
*
15
;
DateTime
timeStart
=
new
DateTime
(
timeNow
.
getYear
(),
timeNow
.
getMonthOfYear
(),
timeNow
.
getDayOfMonth
(),
timeNow
.
getHourOfDay
(),
timeNow
.
getMinuteOfHour
(),
minute
,
000
);
log
.
info
(
timeStart
+
"=>0.3"
);
// String timestamp = timeStart.toString();
String
timestamp
=
String
.
valueOf
(
timeStart
.
getMillis
());
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"hostname"
,
"yf120"
);
//"zorkdata" + i);
dimensions
.
put
(
"ip"
,
"192.168.70.120"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"cluster"
,
"基础监控"
);
dimensions
.
put
(
"module"
,
"lmt模块"
);
Map
<
String
,
Double
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"user_pct"
,
0.5
);
AvroSerializer
metricSerializer
=
AvroSerializerFactory
.
getMetricAvroSerializer
();
byte
[]
bytes
=
metricSerializer
.
serializingMetric
(
metricSetName
,
timestamp
,
dimensions
,
metrics
);
return
bytes
;
}
public
static
void
send
(
String
topic
)
{
byte
[]
req
=
buildMetric
();
producerRecord
=
new
ProducerRecord
<
String
,
byte
[]>(
topic
,
""
,
req
);
producer
.
send
(
producerRecord
);
}
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
init
();
Random
r
=
new
Random
();
for
(
int
i
=
0
;
i
<=
300
;
i
++)
{
if
(
i
!=
0
)
{
int
randomNum
=
r
.
nextInt
(
100
-
20
+
1
)+
20
;
Thread
.
sleep
(
randomNum
*
1000
);
}
send
(
topic
);
}
}
}
\ No newline at end of file
src/main/java/com/zorkdata/tools/mock/MockZorkMetric.java
View file @
be982b36
...
...
@@ -5,6 +5,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import
org.apache.kafka.common.serialization.ByteArraySerializer
;
import
com.zorkdata.tools.avro.AvroSerializer
;
import
com.zorkdata.tools.avro.AvroSerializerFactory
;
import
org.joda.time.DateTime
;
import
java.util.HashMap
;
import
java.util.Map
;
...
...
@@ -15,8 +16,8 @@ import java.util.Random;
* @author DeleMing
*/
public
class
MockZorkMetric
{
private
static
String
topic
=
"
zorkdata
_metric"
;
private
static
String
brokerAddr
=
"
zorkdata-95
:9092"
;
private
static
String
topic
=
"
dwd_all
_metric"
;
private
static
String
brokerAddr
=
"
node1:9092,node2:9092,node3
:9092"
;
private
static
ProducerRecord
<
String
,
byte
[]>
producerRecord
=
null
;
private
static
KafkaProducer
<
String
,
byte
[]>
producer
=
null
;
...
...
@@ -35,15 +36,18 @@ public class MockZorkMetric {
public
static
byte
[]
buildMetric
()
{
Random
random
=
new
Random
();
String
metricSetName
=
"influx_cpu"
;
String
timestamp
=
String
.
valueOf
(
System
.
currentTimeMillis
());
String
metricSetName
=
"cpu_system_mb"
;
DateTime
timeNow
=
new
DateTime
();
int
minute
=
timeNow
.
getSecondOfMinute
()/
15
*
15
;
DateTime
timeStart
=
new
DateTime
(
timeNow
.
getYear
(),
timeNow
.
getMonthOfYear
(),
timeNow
.
getDayOfMonth
(),
timeNow
.
getHourOfDay
(),
timeNow
.
getMinuteOfHour
(),
minute
,
000
);
String
timestamp
=
timeStart
.
toString
();
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"hostname"
,
"localhost"
);
dimensions
.
put
(
"appprogramname"
,
"tc50"
);
dimensions
.
put
(
"appsystem"
,
"TXJY"
);
dimensions
.
put
(
"hostname"
,
"node1"
);
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
Map
<
String
,
Double
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"
cpu_usage"
,
random
.
nextDouble
()
);
metrics
.
put
(
"
used_pct"
,
0.5
);
AvroSerializer
metricSerializer
=
AvroSerializerFactory
.
getMetricAvroSerializer
();
byte
[]
bytes
=
metricSerializer
.
serializingMetric
(
metricSetName
,
timestamp
,
dimensions
,
metrics
);
...
...
@@ -55,7 +59,7 @@ public class MockZorkMetric {
byte
[]
req
=
buildMetric
();
producerRecord
=
new
ProducerRecord
<
String
,
byte
[]>(
topic
,
null
,
""
,
req
);
producer
.
send
(
producerRecord
);
...
...
@@ -63,9 +67,9 @@ public class MockZorkMetric {
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
for
(
int
i
=
0
;
i
<=
1
00
;
i
++)
{
for
(
int
i
=
0
;
i
<=
3
00
;
i
++)
{
send
(
topic
);
Thread
.
sleep
(
1000
);
Thread
.
sleep
(
1
5
000
);
}
}
}
src/main/java/com/zorkdata/tools/mock/MockZorkMetricAfter15Seconds.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.ByteArraySerializer
;
import
com.zorkdata.tools.avro.AvroSerializer
;
import
com.zorkdata.tools.avro.AvroSerializerFactory
;
import
org.joda.time.DateTime
;
import
java.util.*
;
/**
* @author DeleMing
*/
public
class
MockZorkMetricAfter15Seconds
{
// private static String topic = "alert_test_source";
private
static
String
topic
=
"dwd_all_metric"
;
private
static
String
brokerAddr
=
"node1:9092,node2:9092,node3:9092"
;
private
static
ProducerRecord
<
String
,
byte
[]>
producerRecord
=
null
;
private
static
KafkaProducer
<
String
,
byte
[]>
producer
=
null
;
public
static
void
init
()
{
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
brokerAddr
);
props
.
put
(
"acks"
,
"1"
);
props
.
put
(
"retries"
,
0
);
props
.
put
(
"key.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
put
(
"value.serializer"
,
ByteArraySerializer
.
class
.
getName
());
props
.
put
(
"batch.size"
,
16384
);
props
.
put
(
"linger.ms"
,
1
);
props
.
put
(
"buffer.memory"
,
33554432
);
producer
=
new
KafkaProducer
<
String
,
byte
[]>(
props
);
}
public
static
byte
[]
buildMetric
()
{
Random
random
=
new
Random
();
String
metricSetName
=
"cpu_system_mb"
;
//往前对齐15s
// DateTime timeNow = new DateTime();
// int minute = timeNow.getSecondOfMinute()/15*15;
// DateTime timeStart = new DateTime(timeNow.getYear(),timeNow.getMonthOfYear(),timeNow.getDayOfMonth(),timeNow.getHourOfDay(), timeNow.getMinuteOfHour(),minute,000);
// long millis = timeStart.getMillis();
// String timestamp = String.valueOf(millis);
//往后对齐15s === start
// DateTime timeNow = new DateTime();
// System.out.println("true time: "+timeNow);
// int seconds = timeNow.getSecondOfMinute();
// int minute = timeNow.getMinuteOfHour();
// int round = seconds/15;
// //往后对齐15s
// if (round < 3){
// seconds = round * 15+15;
// }else if (round == 3){
// minute += 1;
// seconds = 0;
// }
// DateTime timeStart = new DateTime(timeNow.getYear(),timeNow.getMonthOfYear(),timeNow.getDayOfMonth(),timeNow.getHourOfDay(), minute, seconds,000);
//
// System.out.println("modified time: "+timeStart);
// long millis = timeStart.getMillis();
// String timestamp = String.valueOf(millis);
//往后对齐15s === start
DateTime
timeNow
=
new
DateTime
();
long
millis
=
timeNow
.
getMillis
();
String
timestamp
=
String
.
valueOf
(
millis
);
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"cluster"
,
"基础监控"
);
dimensions
.
put
(
"module"
,
"ShanDong"
);
dimensions
.
put
(
"hostname"
,
"node1"
);
dimensions
.
put
(
"ip"
,
"192.168.70.212"
);
Map
<
String
,
Double
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"user_pct"
,
0.7
);
AvroSerializer
metricSerializer
=
AvroSerializerFactory
.
getMetricAvroSerializer
();
byte
[]
bytes
=
metricSerializer
.
serializingMetric
(
metricSetName
,
timestamp
,
dimensions
,
metrics
);
return
bytes
;
}
public
static
void
send
(
String
topic
)
{
init
();
byte
[]
req
=
buildMetric
();
producerRecord
=
new
ProducerRecord
<
String
,
byte
[]>(
topic
,
null
,
req
);
producer
.
send
(
producerRecord
);
}
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
for
(
int
i
=
0
;
i
<=
30000
;
i
++)
{
send
(
topic
);
Thread
.
sleep
(
10000
);
}
}
}
src/main/java/com/zorkdata/tools/mock/MockZorkMetricAfterTime.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.zorkdata.tools.avro.AvroSerializer
;
import
com.zorkdata.tools.avro.AvroSerializerFactory
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.ByteArraySerializer
;
import
org.joda.time.DateTime
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Properties
;
import
java.util.Random
;
/**
* @author DeleMing
*/
public
class
MockZorkMetricAfterTime
{
private
static
String
topic
=
"dwd_all_metric"
;
private
static
String
brokerAddr
=
"node1:9092,node2:9092,node3:9092"
;
private
static
ProducerRecord
<
String
,
byte
[]>
producerRecord
=
null
;
private
static
KafkaProducer
<
String
,
byte
[]>
producer
=
null
;
public
static
void
init
()
{
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
brokerAddr
);
props
.
put
(
"acks"
,
"1"
);
props
.
put
(
"retries"
,
0
);
props
.
put
(
"key.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
put
(
"value.serializer"
,
ByteArraySerializer
.
class
.
getName
());
props
.
put
(
"batch.size"
,
16384
);
props
.
put
(
"linger.ms"
,
1
);
props
.
put
(
"buffer.memory"
,
33554432
);
producer
=
new
KafkaProducer
<
String
,
byte
[]>(
props
);
}
public
static
byte
[]
buildMetric
()
{
Random
random
=
new
Random
();
String
metricSetName
=
"cpu_system_mb"
;
// 往前对齐15s
// DateTime timeNow = new DateTime();
// int minute = timeNow.getSecondOfMinute()/15*15;
// DateTime timeStart = new DateTime(timeNow.getYear(),timeNow.getMonthOfYear(),timeNow.getDayOfMonth(),timeNow.getHourOfDay(), timeNow.getMinuteOfHour(),minute,000);
// String timestamp = timeStart.toString();
// =====往后对齐15s start
DateTime
timeNow
=
new
DateTime
();
int
seconds
=
timeNow
.
getSecondOfMinute
();
int
minute
=
timeNow
.
getMinuteOfHour
();
int
round
=
seconds
/
15
;
//往后对齐15s
if
(
round
<
3
){
seconds
=
round
*
15
+
15
;
}
else
if
(
round
==
3
){
minute
+=
1
;
seconds
=
0
;
}
DateTime
timeStart
=
new
DateTime
(
timeNow
.
getYear
(),
timeNow
.
getMonthOfYear
(),
timeNow
.
getDayOfMonth
(),
timeNow
.
getHourOfDay
(),
minute
,
seconds
,
000
);
String
timestamp
=
timeStart
.
toString
();
// =====往后对齐 15s end
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"cluster"
,
"基础监控"
);
dimensions
.
put
(
"module"
,
"ShanDong"
);
dimensions
.
put
(
"hostname"
,
"node1"
);
dimensions
.
put
(
"ip"
,
"192.168.70.212"
);
Map
<
String
,
Double
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"used_pct"
,
0.5
);
AvroSerializer
metricSerializer
=
AvroSerializerFactory
.
getMetricAvroSerializer
();
byte
[]
bytes
=
metricSerializer
.
serializingMetric
(
metricSetName
,
timestamp
,
dimensions
,
metrics
);
return
bytes
;
}
public
static
void
send
(
String
topic
)
{
init
();
byte
[]
req
=
buildMetric
();
producerRecord
=
new
ProducerRecord
<
String
,
byte
[]>(
topic
,
""
,
req
);
producer
.
send
(
producerRecord
);
}
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
for
(
int
i
=
0
;
i
<=
300
;
i
++)
{
send
(
topic
);
Thread
.
sleep
(
15000
);
}
}
}
src/main/java/com/zorkdata/tools/mock/QQTMockZorkMetricHostBeatSimpleCycleAppSystem.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.zorkdata.tools.avro.AvroSerializer
;
import
com.zorkdata.tools.avro.AvroSerializerFactory
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.ByteArraySerializer
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Properties
;
/**
* @author DeleMing
*/
public
class
QQTMockZorkMetricHostBeatSimpleCycleAppSystem
{
private
static
String
topic
=
"dwd_all_metric"
;
private
static
String
brokerAddr
=
"node1:9092,node2:9092,node3:9092"
;
// private static String brokerAddr = "yf170:9092,yf171:9092,yf172:9092";
// private static String brokerAddr = "localhost:9092";
private
static
ProducerRecord
<
String
,
byte
[]>
producerRecord
=
null
;
private
static
KafkaProducer
<
String
,
byte
[]>
producer
=
null
;
public
static
void
init
()
{
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
brokerAddr
);
props
.
put
(
"acks"
,
"1"
);
props
.
put
(
"retries"
,
0
);
props
.
put
(
"key.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
put
(
"value.serializer"
,
ByteArraySerializer
.
class
.
getName
());
props
.
put
(
"batch.size"
,
16384
);
props
.
put
(
"linger.ms"
,
1
);
props
.
put
(
"buffer.memory"
,
33554432
);
producer
=
new
KafkaProducer
<
String
,
byte
[]>(
props
);
}
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
init
();
//MetricSet
String
metricSetName
=
"original_agent_eb"
;
//Dimensions
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
// dimensions.put("clustername", "jichujiankong");
// dimensions.put("appprogramname", "linuxmokuai");
dimensions
.
put
(
"hostname"
,
"host-11"
);
dimensions
.
put
(
"ip"
,
"192.168.13.11"
);
for
(
int
i
=
0
;
i
<=
30000
;
i
++)
{
//MetricItem
Map
<
String
,
Double
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"status"
,
0
d
);
//timestamp
long
timestamp
=
System
.
currentTimeMillis
();
String
timestampString
=
String
.
valueOf
(
timestamp
);
System
.
out
.
println
(
"时间:"
+
timestampString
);
//AvroSerializer
AvroSerializer
metricSerializer
=
AvroSerializerFactory
.
getMetricAvroSerializer
();
byte
[]
bytes
=
metricSerializer
.
serializingMetric
(
metricSetName
,
timestampString
,
dimensions
,
metrics
);
//send
producerRecord
=
new
ProducerRecord
<
String
,
byte
[]>(
topic
,
null
,
bytes
);
producer
.
send
(
producerRecord
);
Thread
.
sleep
(
15000
);
}
}
public
static
double
fun1
(
int
i
){
double
tmp
=
0
;
if
(
i
==
0
){
tmp
=
0.05
;
}
if
(
i
==
1
){
tmp
=
0.2
;
}
if
(
i
==
2
){
tmp
=
0.2
;
}
return
tmp
;
}
}
src/main/java/com/zorkdata/tools/mock/QQTMockZorkMetricHostBeatSimpleCyle.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.zorkdata.tools.avro.AvroSerializer
;
import
com.zorkdata.tools.avro.AvroSerializerFactory
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.ByteArraySerializer
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Properties
;
/**
* @author DeleMing
*/
public
class
QQTMockZorkMetricHostBeatSimpleCyle
{
private
static
String
topic
=
"dwd_all_metric"
;
private
static
String
brokerAddr
=
"node1:9092,node2:9092,node3:9092"
;
// private static String brokerAddr = "yf170:9092,yf171:9092,yf172:9092";
// private static String brokerAddr = "localhost:9092";
private
static
ProducerRecord
<
String
,
byte
[]>
producerRecord
=
null
;
private
static
KafkaProducer
<
String
,
byte
[]>
producer
=
null
;
public
static
void
init
()
{
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
brokerAddr
);
props
.
put
(
"acks"
,
"1"
);
props
.
put
(
"retries"
,
0
);
props
.
put
(
"key.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
put
(
"value.serializer"
,
ByteArraySerializer
.
class
.
getName
());
props
.
put
(
"batch.size"
,
16384
);
props
.
put
(
"linger.ms"
,
1
);
props
.
put
(
"buffer.memory"
,
33554432
);
producer
=
new
KafkaProducer
<
String
,
byte
[]>(
props
);
}
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
init
();
//MetricSet
String
metricSetName
=
"original_agent_eb"
;
//Dimensions
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"clustername"
,
"jichujiankong"
);
dimensions
.
put
(
"appprogramname"
,
"linuxmokuai"
);
dimensions
.
put
(
"hostname"
,
"host-11"
);
dimensions
.
put
(
"ip"
,
"192.168.13.11"
);
for
(
int
i
=
0
;
i
<=
30000
;
i
++)
{
//MetricItem
Map
<
String
,
Double
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"status"
,
0
d
);
//timestamp
long
timestamp
=
System
.
currentTimeMillis
();
String
timestampString
=
String
.
valueOf
(
timestamp
);
System
.
out
.
println
(
"时间:"
+
timestampString
);
//AvroSerializer
AvroSerializer
metricSerializer
=
AvroSerializerFactory
.
getMetricAvroSerializer
();
byte
[]
bytes
=
metricSerializer
.
serializingMetric
(
metricSetName
,
timestampString
,
dimensions
,
metrics
);
//send
producerRecord
=
new
ProducerRecord
<
String
,
byte
[]>(
topic
,
null
,
bytes
);
producer
.
send
(
producerRecord
);
Thread
.
sleep
(
15000
);
}
}
public
static
double
fun1
(
int
i
){
double
tmp
=
0
;
if
(
i
==
0
){
tmp
=
0.05
;
}
if
(
i
==
1
){
tmp
=
0.2
;
}
if
(
i
==
2
){
tmp
=
0.2
;
}
return
tmp
;
}
}
src/main/java/com/zorkdata/tools/mock/QQTMockZorkMetricSimpleCyle.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.zorkdata.tools.avro.AvroSerializer
;
import
com.zorkdata.tools.avro.AvroSerializerFactory
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.ByteArraySerializer
;
import
org.joda.time.DateTime
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Properties
;
import
java.util.Random
;
/**
* @author DeleMing
*/
public
class
QQTMockZorkMetricSimpleCyle
{
private
static
String
topic
=
"dwd_all_metric"
;
private
static
String
brokerAddr
=
"node1:9092,node2:9092,node3:9092"
;
// private static String brokerAddr = "autotest-1:9092,autotest-2:9092,autotest-3:9092";
// private static String brokerAddr = "yf170:9092,yf171:9092,yf172:9092";
// private static String brokerAddr = "localhost:9092";
private
static
ProducerRecord
<
String
,
byte
[]>
producerRecord
=
null
;
private
static
KafkaProducer
<
String
,
byte
[]>
producer
=
null
;
public
static
void
init
()
{
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
brokerAddr
);
props
.
put
(
"acks"
,
"1"
);
props
.
put
(
"retries"
,
0
);
props
.
put
(
"key.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
put
(
"value.serializer"
,
ByteArraySerializer
.
class
.
getName
());
props
.
put
(
"batch.size"
,
16384
);
props
.
put
(
"linger.ms"
,
1
);
props
.
put
(
"buffer.memory"
,
33554432
);
producer
=
new
KafkaProducer
<
String
,
byte
[]>(
props
);
}
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
init
();
//MetricSet
String
metricSetName
=
"cpu_system_mb"
;
//Dimensions
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
// dimensions.put("appsystem", "dev_test");
// dimensions.put("clustername", "基础监控");
// dimensions.put("appprogramname", "ShanDong");
// dimensions.put("hostname", "shandong2");
// dimensions.put("ip", "192.168.70.220");
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"clustername"
,
"jichujiankong"
);
dimensions
.
put
(
"appprogramname"
,
"linuxmokuai"
);
dimensions
.
put
(
"hostname"
,
"host-11"
);
dimensions
.
put
(
"ip"
,
"192.168.13.11"
);
for
(
int
i
=
0
;
i
<=
30000
;
i
++)
{
//MetricItem
Map
<
String
,
Double
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"user_pct"
,
0.1
);
//timestamp
long
timestamp
=
System
.
currentTimeMillis
();
String
timestampString
=
String
.
valueOf
(
timestamp
);
System
.
out
.
println
(
"时间:"
+
timestampString
);
//AvroSerializer
AvroSerializer
metricSerializer
=
AvroSerializerFactory
.
getMetricAvroSerializer
();
byte
[]
bytes
=
metricSerializer
.
serializingMetric
(
metricSetName
,
timestampString
,
dimensions
,
metrics
);
//send
producerRecord
=
new
ProducerRecord
<
String
,
byte
[]>(
topic
,
null
,
bytes
);
producer
.
send
(
producerRecord
);
Thread
.
sleep
(
10000
);
}
}
public
static
double
fun1
(
int
i
){
double
tmp
=
0
;
if
(
i
==
0
){
tmp
=
0.05
;
}
if
(
i
==
1
){
tmp
=
0.2
;
}
if
(
i
==
2
){
tmp
=
0.2
;
}
return
tmp
;
}
}
src/main/java/com/zorkdata/tools/mock/QQTMockZorkMetricSimpleCyle2.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
com.zorkdata.tools.avro.AvroSerializer
;
import
com.zorkdata.tools.avro.AvroSerializerFactory
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.ByteArraySerializer
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Properties
;
/**
* @author DeleMing
*/
public
class
QQTMockZorkMetricSimpleCyle2
{
private
static
String
topic
=
"dwd_all_metric"
;
private
static
String
brokerAddr
=
"node1:9092,node2:9092,node3:9092"
;
// private static String brokerAddr = "autotest-1:9092,autotest-2:9092,autotest-3:9092";
// private static String brokerAddr = "yf170:9092,yf171:9092,yf172:9092";
// private static String brokerAddr = "localhost:9092";
private
static
ProducerRecord
<
String
,
byte
[]>
producerRecord
=
null
;
private
static
KafkaProducer
<
String
,
byte
[]>
producer
=
null
;
public
static
void
init
()
{
Properties
props
=
new
Properties
();
props
.
put
(
"bootstrap.servers"
,
brokerAddr
);
props
.
put
(
"acks"
,
"1"
);
props
.
put
(
"retries"
,
0
);
props
.
put
(
"key.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props
.
put
(
"value.serializer"
,
ByteArraySerializer
.
class
.
getName
());
props
.
put
(
"batch.size"
,
16384
);
props
.
put
(
"linger.ms"
,
1
);
props
.
put
(
"buffer.memory"
,
33554432
);
producer
=
new
KafkaProducer
<
String
,
byte
[]>(
props
);
}
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
init
();
//MetricSet
String
metricSetName
=
"cpu_system_mb"
;
//Dimensions
Map
<
String
,
String
>
dimensions
=
new
HashMap
<>();
// dimensions.put("appsystem", "dev_test");
// dimensions.put("clustername", "基础监控");
// dimensions.put("appprogramname", "ShanDong");
// dimensions.put("hostname", "shandong2");
// dimensions.put("ip", "192.168.70.220");
dimensions
.
put
(
"appsystem"
,
"dev_test"
);
dimensions
.
put
(
"clustername"
,
"jichujiankong"
);
dimensions
.
put
(
"appprogramname"
,
"linuxmokuai"
);
dimensions
.
put
(
"hostname"
,
"node1"
);
dimensions
.
put
(
"ip"
,
"192.168.70.85"
);
for
(
int
i
=
0
;
i
<=
30000
;
i
++)
{
//MetricItem
Map
<
String
,
Double
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"user_pct"
,
0.3
);
//timestamp
long
timestamp
=
System
.
currentTimeMillis
();
String
timestampString
=
String
.
valueOf
(
timestamp
);
System
.
out
.
println
(
"时间:"
+
timestampString
);
//AvroSerializer
AvroSerializer
metricSerializer
=
AvroSerializerFactory
.
getMetricAvroSerializer
();
byte
[]
bytes
=
metricSerializer
.
serializingMetric
(
metricSetName
,
timestampString
,
dimensions
,
metrics
);
//send
producerRecord
=
new
ProducerRecord
<
String
,
byte
[]>(
topic
,
null
,
bytes
);
producer
.
send
(
producerRecord
);
Thread
.
sleep
(
10000
);
}
}
public
static
double
fun1
(
int
i
){
double
tmp
=
0
;
if
(
i
==
0
){
tmp
=
0.05
;
}
if
(
i
==
1
){
tmp
=
0.2
;
}
if
(
i
==
2
){
tmp
=
0.2
;
}
return
tmp
;
}
}
src/main/java/com/zorkdata/tools/mock/QQTTest.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
org.joda.time.DateTime
;
public
class
QQTTest
{
public
static
void
main
(
String
[]
args
)
{
DateTime
timeNow
=
new
DateTime
();
int
seconds
=
timeNow
.
getSecondOfMinute
();
int
minute
=
timeNow
.
getMinuteOfHour
();
int
round
=
seconds
/
15
;
//往后对齐15s
if
(
round
<
3
){
seconds
=
round
*
15
+
15
;
}
else
if
(
round
==
3
){
minute
+=
1
;
seconds
=
0
;
}
DateTime
timeStart
=
new
DateTime
(
timeNow
.
getYear
(),
timeNow
.
getMonthOfYear
(),
timeNow
.
getDayOfMonth
(),
timeNow
.
getHourOfDay
(),
minute
,
seconds
,
000
);
String
timestamp
=
timeStart
.
toString
();
System
.
out
.
println
(
timestamp
);
}
}
src/main/java/com/zorkdata/tools/mock/ScheduledExecutorServiceTest.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
org.joda.time.DateTime
;
import
org.springframework.boot.ApplicationArguments
;
import
org.springframework.boot.ApplicationRunner
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
import
java.util.Calendar
;
import
java.util.Date
;
import
java.util.TimeZone
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
/**
* @author zhuzhigang
*/
public
class
ScheduledExecutorServiceTest
implements
ApplicationRunner
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// String text = "2019-01-03T08:26:15.503162206Z";
// text = "2021-01-23T08:26:15+08:00";
// String localTime = "2021-01-23 15:06:20";
//// Date date = utcToLocal(text);
// Date utcdate = localToUTC(localTime);
// System.out.println(utcdate);
String
utcString
=
new
DateTime
(
1611385785000L
).
toString
();
System
.
out
.
println
(
utcString
);
ScheduledExecutorService
service
=
Executors
.
newScheduledThreadPool
(
10
);
long
initialDelay
=
30000
-(
DateTime
.
now
().
getMillis
()%
30000
);
long
period
=
30000
;
// 从现在开始5秒钟之后,每隔30秒钟执行一次job1
service
.
scheduleAtFixedRate
(
new
TaskService
(),
initialDelay
,
period
,
TimeUnit
.
MILLISECONDS
);
}
@Override
public
void
run
(
ApplicationArguments
args
)
throws
Exception
{
}
public
static
Date
localToUTC
(
String
localTime
)
{
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
);
Date
localDate
=
null
;
try
{
localDate
=
sdf
.
parse
(
localTime
);
}
catch
(
ParseException
e
)
{
e
.
printStackTrace
();
}
long
localTimeInMillis
=
localDate
.
getTime
();
/** long时间转换成Calendar */
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTimeInMillis
(
localTimeInMillis
);
/** 取得时间偏移量 */
int
zoneOffset
=
calendar
.
get
(
java
.
util
.
Calendar
.
ZONE_OFFSET
);
/** 取得夏令时差 */
int
dstOffset
=
calendar
.
get
(
java
.
util
.
Calendar
.
DST_OFFSET
);
/** 从本地时间里扣除这些差量,即可以取得UTC时间*/
calendar
.
add
(
java
.
util
.
Calendar
.
MILLISECOND
,
-(
zoneOffset
+
dstOffset
));
/** 取得的时间就是UTC标准时间 */
Date
utcDate
=
new
Date
(
calendar
.
getTimeInMillis
());
return
utcDate
;
}
public
static
Date
utcToLocal
(
String
utcTime
){
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
);
sdf
.
setTimeZone
(
TimeZone
.
getTimeZone
(
"UTC"
));
Date
utcDate
=
null
;
try
{
utcDate
=
sdf
.
parse
(
utcTime
);
}
catch
(
ParseException
e
)
{
e
.
printStackTrace
();
}
sdf
.
setTimeZone
(
TimeZone
.
getDefault
());
Date
locatlDate
=
null
;
String
localTime
=
sdf
.
format
(
utcDate
.
getTime
());
try
{
locatlDate
=
sdf
.
parse
(
localTime
);
}
catch
(
ParseException
e
)
{
e
.
printStackTrace
();
}
return
locatlDate
;
}
}
src/main/java/com/zorkdata/tools/mock/TaskService.java
0 → 100644
View file @
be982b36
package
com.zorkdata.tools.mock
;
import
org.joda.time.DateTime
;
public
class
TaskService
implements
Runnable
{
@Override
public
void
run
()
{
// 定时任务业务逻辑
System
.
out
.
println
(
DateTime
.
now
()+
" --start"
);
}
}
\ No newline at end of file
src/main/resources/config.properties
View file @
be982b36
#
kafka.servers = kafka-1:19092,kafka-2:19092,kafka-3:19092
kafka.servers
=
zorkdata-95:9092
kafka.servers
=
kafka-1:19092,kafka-2:19092,kafka-3:19092
#
kafka.servers = zorkdata-95:9092
kafka.batch.size
=
1
log.size
=
10000
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment