Commit 8f132cc9 authored by xuli's avatar xuli

mysql版本

parent 84edf9ce
......@@ -45,6 +45,20 @@
<version>2.5.17.RELEASE</version>
</dependency>
<!-- Spring Boot JDBC Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<!-- MySQL Driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<!-- Fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
......
......@@ -30,45 +30,53 @@ public class InfluxDisorderComponent {
private String tableKeywords;
@Value("${missing-s:15}")
private Integer missingS;
@Value("${time-window.query-interval-seconds:5}")
private Integer queryIntervalSeconds;
// 注入优化组件
@Value("${time-window.data-delay-seconds:3}")
private Integer dataDelaySeconds;
@Value("${deduplication.enabled:true}")
private boolean deduplicationEnabled;
@Value("${deduplication.retention-days:7}")
private int retentionDays;
// 注入组件
@Autowired
private MultiDbConnectionManager connectionManager;
@Autowired
private DataTimestampIncrementalTracker dataTimestampTracker;
private BatchKafkaSender batchSender;
@Autowired
private BatchKafkaSender batchSender;
private DatabaseDeduplicationService deduplicationService;
/**
* 优化后的查询influxdb指标数据发送至Kafka(基于数据时间戳的增量查询+批量发送
* 基于时间窗口的数据采集(每5秒查询过去8秒数据,用数据库去重
*/
@Scheduled(cron = "${schedules}")
public void optimizedExecute() {
public void timeWindowExecute() {
try {
log.info("开始执行InfluxDB数据采集任务...");
log.info("开始执行InfluxDB时间窗口数据采集任务...");
List<String> urlList = connectionManager.getAvailableUrls();
List<String> databaseList = connectionManager.getAvailableDatabases();
// 顺序处理每个URL和数据库的组合,确保时间戳追踪准确
for (String url : urlList) {
for (String database : databaseList) {
try {
processUrlDatabase(url.trim(), database.trim());
processUrlDatabaseWithTimeWindow(url.trim(), database.trim());
} catch (Exception e) {
log.error("处理URL:{}, Database:{} 失败: {}", url, database, e.getMessage());
}
}
}
log.info("InfluxDB数据采集任务完成");
log.info("InfluxDB时间窗口数据采集任务完成");
} catch (Exception e) {
log.error("数据采集任务异常: {}", e.getMessage(), e);
......@@ -76,9 +84,9 @@ public class InfluxDisorderComponent {
}
/**
* 处理单个URL和数据库的数据采集
* 基于时间窗口的数据库处理(新方法)
*/
private void processUrlDatabase(String url, String database) {
private void processUrlDatabaseWithTimeWindow(String url, String database) {
try {
// 获取复用连接
InfluxDBService influxDBService = connectionManager.getConnection(url, database);
......@@ -93,10 +101,9 @@ public class InfluxDisorderComponent {
log.info("URL:{}, Database:{} 找到 {} 张表需要处理", url, database, tableNameList.size());
// 顺序处理每张表,保证时间戳更新的原子性
List<NormalFieldsDTO> allData = new ArrayList<>();
for (String tableName : tableNameList) {
List<NormalFieldsDTO> tableData = processTable(influxDBService, url, database, tableName);
List<NormalFieldsDTO> tableData = processTableWithTimeWindow(influxDBService, url, database, tableName);
if (tableData != null && !tableData.isEmpty()) {
allData.addAll(tableData);
}
......@@ -114,12 +121,15 @@ public class InfluxDisorderComponent {
}
/**
* 处理单张表的数据(基于数据时间戳的增量查询)
* 基于时间窗口处理单张表的数据
*/
private List<NormalFieldsDTO> processTable(InfluxDBService influxDBService, String url, String database, String tableName) {
private List<NormalFieldsDTO> processTableWithTimeWindow(InfluxDBService influxDBService, String url, String database, String tableName) {
try {
// 使用基于数据时间戳的增量查询SQL
String sql = dataTimestampTracker.buildDataTimestampQuery(url, database, tableName);
// 构建时间窗口查询SQL:查询过去 (queryIntervalSeconds + dataDelaySeconds) 秒到当前时间的数据
// 这会产生重叠数据,需要通过去重来处理
int totalSeconds = queryIntervalSeconds + dataDelaySeconds; // 8秒
String sql = String.format("SELECT * FROM %s WHERE time >= now() - %ds AND time <= now()",
tableName, totalSeconds);
log.debug("查询表 {} SQL: {}", tableName, sql);
QueryResult.Series series = getResult(influxDBService.query(sql));
......@@ -129,8 +139,8 @@ public class InfluxDisorderComponent {
return Collections.emptyList();
}
// 处理查询结果并更新最大时间戳
return processTableDataWithTimestampUpdate(series, url, database, tableName);
// 处理查询结果,包含去重逻辑
return processTableDataWithDeduplication(series, url, database, tableName);
} catch (Exception e) {
log.error("处理表 {} 异常: {}", tableName, e.getMessage());
......@@ -139,96 +149,126 @@ public class InfluxDisorderComponent {
}
/**
* 处理表数据,转换并更新时间戳(无需去重,增量查询天然避免重复)
* 处理表数据,包含批量数据库去重逻辑
*/
private List<NormalFieldsDTO> processTableDataWithTimestampUpdate(QueryResult.Series series, String url, String database, String tableName) {
private List<NormalFieldsDTO> processTableDataWithDeduplication(QueryResult.Series series, String url, String database, String tableName) {
// 构建列名映射
List<String> columns = series.getColumns();
Map<String, Integer> columnIndexMap = buildColumnIndexMap(columns);
List<Atoota> atootaList = new ArrayList<>();
List<NormalFieldsDTO> resultList = new ArrayList<>();
List<Atoota> allAtootaList = new ArrayList<>();
List<DatabaseDeduplicationService.RecordKey> recordKeys = new ArrayList<>();
int totalCount = 0;
// 第一步:构建所有数据对象和去重键
for (List<Object> value : series.getValues()) {
try {
totalCount++;
// 构建Atoota对象
Atoota atoota = buildAtootaFromRow(value, columnIndexMap, tableName, url);
if (atoota == null || atoota.getId() == null) {
continue;
}
atootaList.add(atoota);
allAtootaList.add(atoota);
// 转换为DTO
NormalFieldsDTO dto = convertToDTO(atoota);
resultList.add(dto);
// 如果启用去重,构建去重键
if (deduplicationEnabled) {
long timeMillis = extractTimeMillis(atoota.getTime());
DatabaseDeduplicationService.RecordKey recordKey =
new DatabaseDeduplicationService.RecordKey(atoota.getId(), timeMillis, tableName, url, database);
recordKeys.add(recordKey);
}
} catch (Exception e) {
log.warn("处理表 {} 的某行数据失败: {}", tableName, e.getMessage());
}
}
// 更新最大时间戳(重要:只有数据处理成功后才更新)
int processedCount = dataTimestampTracker.updateMaxTimestamp(url, database, tableName, atootaList);
// 第二步:批量去重检查(如果启用)
Set<String> processedKeys = new HashSet<>();
if (deduplicationEnabled && !recordKeys.isEmpty()) {
processedKeys = deduplicationService.getProcessedRecords(recordKeys);
}
// 第三步:过滤新数据并批量插入去重记录
List<NormalFieldsDTO> resultList = new ArrayList<>();
List<DatabaseDeduplicationService.RecordKey> newRecordKeys = new ArrayList<>();
int processedCount = 0;
for (int i = 0; i < allAtootaList.size(); i++) {
Atoota atoota = allAtootaList.get(i);
// 检查是否是重复数据
if (deduplicationEnabled && i < recordKeys.size()) {
DatabaseDeduplicationService.RecordKey recordKey = recordKeys.get(i);
String keyString = recordKey.toKey();
if (processedKeys.contains(keyString)) {
log.trace("跳过重复记录: {}", keyString);
continue; // 跳过重复数据
}
newRecordKeys.add(recordKey);
}
// 转换为DTO并添加到结果
NormalFieldsDTO dto = convertToDTO(atoota);
resultList.add(dto);
processedCount++;
}
// 第四步:批量插入新的去重记录
if (deduplicationEnabled && !newRecordKeys.isEmpty()) {
int insertedCount = deduplicationService.batchInsertRecords(newRecordKeys);
log.debug("批量插入去重记录: 尝试={}, 成功={}", newRecordKeys.size(), insertedCount);
}
log.debug("表 {} 处理完成,有效数据: {} 条", tableName, processedCount);
log.debug("表 {} 处理完成,总数据: {}, 处理数据: {} 条", tableName, totalCount, processedCount);
return resultList;
}
/**
* 获取过去几分钟时间,因为数据缺失,要求减去3秒
*
* @param minutes 分钟
* @return
*/
public Date lastFewMinutesMissingS(int minutes) {
Calendar beforeTime = Calendar.getInstance();
beforeTime.add(Calendar.MINUTE, -minutes);
beforeTime.add(Calendar.SECOND, -missingS);
return beforeTime.getTime();
}
/**
* 获取过去几分钟时间,因为数据缺失,要求减去3秒
*
* @param intervalSecond 秒数
* @return
* 从InfluxDB时间字符串提取毫秒级时间戳
*/
public Date lastFewSecondMissingS(int intervalSecond) {
Calendar beforeTime = Calendar.getInstance();
beforeTime.add(Calendar.SECOND, -intervalSecond);
beforeTime.add(Calendar.SECOND, -missingS);
return beforeTime.getTime();
private long extractTimeMillis(String timeStr) {
try {
if (timeStr == null) return 0;
// InfluxDB时间格式: "2023-12-01T10:30:45.123456789Z"
// 直接解析完整时间戳,保留毫秒精度
return java.time.Instant.parse(timeStr).toEpochMilli();
} catch (Exception e) {
log.warn("解析时间戳失败: {}", timeStr);
return System.currentTimeMillis(); // 回退到当前时间
}
}
/**
* 获取当前时间减去指定秒数的时间
*
* @param seconds 秒数
* @return
* 定期清理过期的去重记录(每天凌晨2点执行)
*/
public Date lastFewSecond(int seconds) {
Calendar beforeTime = Calendar.getInstance();
beforeTime.add(Calendar.SECOND, -seconds);
return beforeTime.getTime();
@Scheduled(cron = "0 0 ${deduplication.cleanup-hour:2} * * ?")
public void cleanupExpiredRecords() {
if (!deduplicationEnabled) {
return;
}
try {
int deletedCount = deduplicationService.cleanupOldRecords(retentionDays);
log.info("清理过期去重记录完成,删除 {} 条记录", deletedCount);
} catch (Exception e) {
log.error("清理过期去重记录失败: {}", e.getMessage());
}
}
/**
* 手动重置时间戳(用于重新开始同步)
*/
public void resetAllTimestamps() {
dataTimestampTracker.clearAllTimestamps();
log.info("已重置所有时间戳");
}
/**
* 获取系统状态信息
*/
public String getSystemStats() {
return String.format("系统状态 - %s, %s, %s",
return String.format("系统状态 - %s, %s",
connectionManager.getConnectionStats(),
dataTimestampTracker.getStats(),
dataTimestampTracker.getCleanupStats());
deduplicationService.getStats());
}
/**
......
server:
port: 8190
spring:
profiles:
include:
- datasource
- kafka
schedules: 0/5 * * * * ?
timestamp-file: ./data-timestamps.properties
timestamp:
cleanup:
days: 3
schedules: 0/3 * * * * ?
table-keywords: atoota
missing-s: 15
# 新的时间窗口查询配置
time-window:
query-interval-seconds: 3 # 查询间隔(每3秒查询一次)
data-delay-seconds: 2 # 数据延迟补偿(减去2秒避免数据延迟)
# 去重配置
deduplication:
enabled: true # 启用去重功能
retention-days: 7 # 去重记录保留天数
cleanup-hour: 2 # 每天凌晨2点清理过期记录
batch:
size: 1000
timeout: 2000
\ No newline at end of file
File mode changed from 100755 to 100644
server:
port: 8190
spring:
profiles:
include:
- datasource
- kafka
schedules: 0/5 * * * * ?
timestamp-file: ./data-timestamps.properties
timestamp:
cleanup:
days: 3
schedules: 0/3 * * * * ?
table-keywords: atoota
missing-s: 15
# 新的时间窗口查询配置
time-window:
query-interval-seconds: 3 # 查询间隔(每3秒查询一次)
data-delay-seconds: 2 # 数据延迟补偿(减去2秒避免数据延迟)
# 去重配置
deduplication:
enabled: true # 启用去重功能
retention-days: 7 # 去重记录保留天数
cleanup-hour: 2 # 每天凌晨2点清理过期记录
batch:
size: 1000
timeout: 2000
\ No newline at end of file
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
#Generated by Maven
#Thu Sep 11 10:27:00 CST 2025
#Wed Sep 17 09:50:37 CST 2025
version=2.2-RELEASE
groupId=com.zork
artifactId=influx-disorder
......@@ -7,12 +7,15 @@ com/zork/disorder/kafka/Producer.class
com/zork/disorder/constant/AtootaConstant.class
com/zork/disorder/kafka/KafkaSendDTO.class
com/zork/disorder/component/CountComponent.class
com/zork/disorder/component/DatabaseDeduplicationService.class
com/zork/common/constant/InfluxDBConstant.class
com/zork/common/utils/DateUtil.class
com/zork/common/dto/NormalFieldsDTO$NormalFieldsDTOBuilder.class
com/zork/disorder/component/DatabaseDeduplicationService$RecordKey.class
com/zork/common/utils/FileUtil.class
com/zork/common/service/InfluxDBService.class
com/zork/disorder/component/BatchKafkaSender.class
com/zork/disorder/component/DatabaseDeduplicationService$1.class
com/zork/disorder/component/DataTimestampIncrementalTracker.class
com/zork/disorder/config/RestTemplateConfig.class
com/zork/disorder/InfluxDisorderApplication.class
......
......@@ -9,6 +9,7 @@
/Users/xuli/Documents/ZorkCodes/applets/zork-server/influx-disorder/src/main/java/com/zork/common/constant/InfluxDBConstant.java
/Users/xuli/Documents/ZorkCodes/applets/zork-server/influx-disorder/src/main/java/com/zork/disorder/kafka/Producer.java
/Users/xuli/Documents/ZorkCodes/applets/zork-server/influx-disorder/src/main/java/com/zork/disorder/config/RestTemplateConfig.java
/Users/xuli/Documents/ZorkCodes/applets/zork-server/influx-disorder/src/main/java/com/zork/disorder/component/DatabaseDeduplicationService.java
/Users/xuli/Documents/ZorkCodes/applets/zork-server/influx-disorder/src/main/java/com/zork/disorder/component/MultiDbConnectionManager.java
/Users/xuli/Documents/ZorkCodes/applets/zork-server/influx-disorder/src/main/java/com/zork/common/dto/NormalFieldsDTO.java
/Users/xuli/Documents/ZorkCodes/applets/zork-server/influx-disorder/src/main/java/com/zork/disorder/InfluxDisorderApplication.java
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment