From 3e622538068cd4d0b1082a37aa6bed49e9e0e0e8 Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Fri, 12 Dec 2025 09:26:18 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E5=8E=BB=E9=99=A4dashboard=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/DataProcessServiceImpl.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java index 7afc119..fb08ab1 100644 --- a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java +++ b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java @@ -622,17 +622,17 @@ public class DataProcessServiceImpl implements IDataProcessService { } baseTransDataEntity.setHashId(UUID.randomUUID()); - try { - handleDashboardAlert(baseTransDataEntity); - } catch (Exception e) { - log.error("dashboard alert error", e); - } - - try { - minuteLevelStorage(baseTransDataEntity); - } catch (Exception e) { - log.error("minuteLevelStorage error", e); - } +// try { +// handleDashboardAlert(baseTransDataEntity); +// } catch (Exception e) { +// log.error("dashboard alert error", e); +// } +// +// try { +// minuteLevelStorage(baseTransDataEntity); +// } catch (Exception e) { +// log.error("minuteLevelStorage error", e); +// } // try { // if ("alert".equals(baseTransDataEntity.getStatus())) { From e2d9cc6649f77e337cdfc3cb41df6081dcef3268 Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Fri, 12 Dec 2025 22:50:57 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sender/config/DisruptorConfig.java | 43 +++++++++++-------- .../sender/disruptor/AccumulateService.java | 16 ++++--- .../sender/disruptor/AlertService.java | 15 ++++--- .../sender/disruptor/MeasureService.java | 15 ++++--- .../service/impl/DataProcessServiceImpl.java | 22 +++++----- 5 files changed, 62 insertions(+), 49 deletions(-) diff --git a/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java b/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java index 6fc01cd..9bd73c8 100644 --- a/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java +++ b/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java @@ -1,7 +1,7 @@ package com.techsor.datacenter.sender.config; -import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import com.techsor.datacenter.sender.dao.DashboardAlertDao; @@ -11,7 +11,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; @Configuration public class DisruptorConfig { @@ -19,22 +19,30 @@ public class DisruptorConfig { @Value("${data.operation.batch-size:100}") private int batchSize; + /** 创建带线程名的 ThreadFactory **/ + private ThreadFactory createDisruptorThreadFactory(String namePrefix) { + return r -> { + Thread t = new Thread(r); + t.setName(namePrefix + "-" + t.getId()); + t.setDaemon(true); + return t; + }; + } + @Bean public Disruptor measureDisruptor(DashboardStatisticsDao dao) { - - int ringSize = 1024; + int ringSize = 32768; Disruptor disruptor = new Disruptor<>( new MeasureEventFactory(), ringSize, - Executors.defaultThreadFactory(), + createDisruptorThreadFactory("measure-disruptor"), ProducerType.MULTI, - new BlockingWaitStrategy() + new SleepingWaitStrategy() ); disruptor.handleEventsWith(new MeasureEventHandler(dao, batchSize)); disruptor.start(); - return disruptor; } @@ -43,22 +51,21 @@ public class DisruptorConfig { return disruptor.getRingBuffer(); } + @Bean public Disruptor accumulateDisruptor(DashboardStatisticsDao dao) { - - int ringSize = 1024; + int ringSize = 32768; Disruptor disruptor = new Disruptor<>( new AccumulateEventFactory(), ringSize, - Executors.defaultThreadFactory(), + createDisruptorThreadFactory("accumulate-disruptor"), ProducerType.MULTI, - new BlockingWaitStrategy() + new SleepingWaitStrategy() ); disruptor.handleEventsWith(new AccumulateEventHandler(dao, batchSize)); disruptor.start(); - return disruptor; } @@ -67,22 +74,21 @@ public class DisruptorConfig { return disruptor.getRingBuffer(); } + @Bean public Disruptor alertDisruptor(DashboardAlertDao dao) { - - int ringSize = 1024; + int ringSize = 32768; Disruptor disruptor = new Disruptor<>( new AlertEventFactory(), ringSize, - Executors.defaultThreadFactory(), + createDisruptorThreadFactory("alert-disruptor"), ProducerType.MULTI, - new BlockingWaitStrategy() + new SleepingWaitStrategy() ); disruptor.handleEventsWith(new AlertEventHandler(dao, batchSize)); disruptor.start(); - return disruptor; } @@ -90,5 +96,4 @@ public class DisruptorConfig { public RingBuffer alertRingBuffer(Disruptor disruptor) { return disruptor.getRingBuffer(); } - -} +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java index 20f43cc..9b8ac22 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java @@ -3,10 +3,12 @@ package com.techsor.datacenter.sender.disruptor; import com.lmax.disruptor.RingBuffer; import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor +@Slf4j public class AccumulateService { private final RingBuffer ringBuffer; @@ -18,17 +20,19 @@ public class AccumulateService { Double incrementMinute, StatisticsAccumulateInfo info) { - long seq = ringBuffer.next(); - try { - AccumulateEvent event = ringBuffer.get(seq); + boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> { event.setUploadValue(uploadValue); event.setDeviceId(deviceId); event.setAttrCode(attrCode); event.setIncrementToday(incrementToday); event.setIncrementMinute(incrementMinute); event.setInfo(info); - } finally { - ringBuffer.publish(seq); + }); + + if (!ok) { + // ringBuffer 满了 + log.error("[AccumulateService] RingBuffer is FULL, message dropped! deviceId={}", deviceId); } } -} + +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java index b5162b5..9c92ae7 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java @@ -3,22 +3,25 @@ package com.techsor.datacenter.sender.disruptor; import com.lmax.disruptor.RingBuffer; import com.techsor.datacenter.sender.entitiy.DynamodbEntity; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor +@Slf4j public class AlertService { private final RingBuffer ringBuffer; public void write(DynamodbEntity entity) { - long seq = ringBuffer.next(); - try { - AlertEvent event = ringBuffer.get(seq); + boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> { event.setEntity(entity); - } finally { - ringBuffer.publish(seq); + }); + + if (!ok) { + log.error("[AlertService] RingBuffer FULL! dropped alert. deviceId={}", + entity != null ? entity.getDeviceId() : "null"); } } -} +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java index 632b41e..57bb72c 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java @@ -3,12 +3,14 @@ package com.techsor.datacenter.sender.disruptor; import com.lmax.disruptor.RingBuffer; import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.math.BigDecimal; @Service @RequiredArgsConstructor +@Slf4j public class MeasureService { private final RingBuffer ringBuffer; @@ -20,18 +22,17 @@ public class MeasureService { BigDecimal minValue, BigDecimal maxValue) { - long seq = ringBuffer.next(); - try { - MeasureEvent event = ringBuffer.get(seq); + boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> { event.setUploadValue(uploadValue); event.setDeviceId(deviceId); event.setAttrCode(attrCode); event.setInfo(info); event.setMinValue(minValue); event.setMaxValue(maxValue); - } finally { - ringBuffer.publish(seq); + }); + + if (!ok) { + log.error("[MeasureService] RingBuffer FULL! dropped data. deviceId={}", deviceId); } } -} - +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java index fb08ab1..7afc119 100644 --- a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java +++ b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java @@ -622,17 +622,17 @@ public class DataProcessServiceImpl implements IDataProcessService { } baseTransDataEntity.setHashId(UUID.randomUUID()); -// try { -// handleDashboardAlert(baseTransDataEntity); -// } catch (Exception e) { -// log.error("dashboard alert error", e); -// } -// -// try { -// minuteLevelStorage(baseTransDataEntity); -// } catch (Exception e) { -// log.error("minuteLevelStorage error", e); -// } + try { + handleDashboardAlert(baseTransDataEntity); + } catch (Exception e) { + log.error("dashboard alert error", e); + } + + try { + minuteLevelStorage(baseTransDataEntity); + } catch (Exception e) { + log.error("minuteLevelStorage error", e); + } // try { // if ("alert".equals(baseTransDataEntity.getStatus())) { From ccae5a033ba0e79241f360ba1882ab01f9fae158 Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Sat, 13 Dec 2025 10:38:08 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E4=BC=98=E5=8C=96sql?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sender/dao/DashboardStatisticsDao.java | 234 +++++++++--------- 1 file changed, 114 insertions(+), 120 deletions(-) diff --git a/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java b/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java index bed592a..c27dc11 100644 --- a/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java +++ b/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java @@ -11,6 +11,7 @@ import java.math.BigDecimal; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Types; +import java.util.ArrayList; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; @@ -85,69 +86,67 @@ public class DashboardStatisticsDao { } public void measureBatchInsert(List list) { + if (list.isEmpty()) return; - // 批量 insert - auroraJdbcTemplate.batchUpdate( + StringBuilder insertSql = new StringBuilder( "INSERT INTO dashboard_record_measure (" + - "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, upload_value, upload_at, attr_code) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - new BatchPreparedStatementSetter() { - public void setValues(PreparedStatement ps, int i) throws SQLException { - MeasureEvent e = list.get(i); - StatisticsMeasureInfo info = e.getInfo(); - ps.setString(1, e.getDeviceId()); - ps.setInt(2, info.getYearKey()); - ps.setInt(3, info.getMonthKey()); - ps.setInt(4, info.getDayKey()); - ps.setInt(5, info.getHourKey()); - ps.setInt(6, info.getMinuteKey()); - ps.setInt(7, info.getSecondKey()); - ps.setString(8, e.getUploadValue()); - ps.setLong(9, info.getUploadAt()); - ps.setString(10, e.getAttrCode()); - } - - public int getBatchSize() { - return list.size(); - } - } + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + + "upload_value, upload_at, attr_code) VALUES " ); - - // 批量 upsert - auroraJdbcTemplate.batchUpdate( + List insertParams = new ArrayList<>(list.size() * 10); + for (int i = 0; i < list.size(); i++) { + MeasureEvent e = list.get(i); + StatisticsMeasureInfo info = e.getInfo(); + + insertSql.append("(?,?,?,?,?,?,?,?,?,?)"); + if (i < list.size() - 1) insertSql.append(","); + + insertParams.add(e.getDeviceId()); + insertParams.add(info.getYearKey()); + insertParams.add(info.getMonthKey()); + insertParams.add(info.getDayKey()); + insertParams.add(info.getHourKey()); + insertParams.add(info.getMinuteKey()); + insertParams.add(info.getSecondKey()); + insertParams.add(e.getUploadValue()); + insertParams.add(info.getUploadAt()); + insertParams.add(e.getAttrCode()); + } + auroraJdbcTemplate.update(insertSql.toString(), insertParams.toArray()); + + // 实时表 + StringBuilder upsertSql = new StringBuilder( "INSERT INTO dashboard_realtime_measure (" + - "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second," + - "upload_value, min_value, max_value, upload_at, attr_code) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + - "ON DUPLICATE KEY UPDATE " + - "upload_value = VALUES(upload_value), " + - "min_value = VALUES(min_value)," + - "max_value = VALUES(max_value)," + - "upload_at = VALUES(upload_at)", - new BatchPreparedStatementSetter() { - - public void setValues(PreparedStatement ps, int i) throws SQLException { - MeasureEvent e = list.get(i); - StatisticsMeasureInfo info = e.getInfo(); - ps.setString(1, e.getDeviceId()); - ps.setInt(2, info.getYearKey()); - ps.setInt(3, info.getMonthKey()); - ps.setInt(4, info.getDayKey()); - ps.setInt(5, info.getHourKey()); - ps.setInt(6, info.getMinuteKey()); - ps.setInt(7, info.getSecondKey()); - ps.setString(8, e.getUploadValue()); - ps.setString(9, e.getMinValue().toString()); - ps.setString(10, e.getMaxValue().toString()); - ps.setLong(11, info.getUploadAt()); - ps.setString(12, e.getAttrCode()); - } - - public int getBatchSize() { - return list.size(); - } - } + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + + "upload_value, min_value, max_value, upload_at, attr_code) VALUES " ); + List upsertParams = new ArrayList<>(list.size() * 12); + for (int i = 0; i < list.size(); i++) { + MeasureEvent e = list.get(i); + StatisticsMeasureInfo info = e.getInfo(); + + upsertSql.append("(?,?,?,?,?,?,?,?,?,?,?,?)"); + if (i < list.size() - 1) upsertSql.append(","); + + upsertParams.add(e.getDeviceId()); + upsertParams.add(info.getYearKey()); + upsertParams.add(info.getMonthKey()); + upsertParams.add(info.getDayKey()); + upsertParams.add(info.getHourKey()); + upsertParams.add(info.getMinuteKey()); + upsertParams.add(info.getSecondKey()); + upsertParams.add(e.getUploadValue()); + upsertParams.add(e.getMinValue() != null ? e.getMinValue().toString() : null); + upsertParams.add(e.getMaxValue() != null ? e.getMaxValue().toString() : null); + upsertParams.add(info.getUploadAt()); + upsertParams.add(e.getAttrCode()); + } + upsertSql.append(" ON DUPLICATE KEY UPDATE ") + .append("upload_value = VALUES(upload_value), ") + .append("min_value = VALUES(min_value), ") + .append("max_value = VALUES(max_value), ") + .append("upload_at = VALUES(upload_at)"); + auroraJdbcTemplate.update(upsertSql.toString(), upsertParams.toArray()); } public void insertDeviceAccumulateInfo(String uploadValue, String deviceId, Double incrementToday, @@ -195,70 +194,65 @@ public class DashboardStatisticsDao { public void accumulateBatchInsert(List list) { - auroraJdbcTemplate.batchUpdate( - "INSERT INTO dashboard_record_accumulate (" + - "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + - "upload_value, increment_today, increment_minute, upload_at, attr_code) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - new BatchPreparedStatementSetter() { - - @Override - public void setValues(PreparedStatement ps, int i) throws SQLException { - AccumulateEvent e = list.get(i); - StatisticsAccumulateInfo info = e.getInfo(); - - ps.setString(1, e.getDeviceId()); - ps.setInt(2, info.getYearKey()); - ps.setInt(3, info.getMonthKey()); - ps.setInt(4, info.getDayKey()); - ps.setInt(5, info.getHourKey()); - ps.setInt(6, info.getMinuteKey()); - ps.setInt(7, info.getSecondKey()); - ps.setString(8, e.getUploadValue()); - ps.setDouble(9, e.getIncrementToday() != null ? e.getIncrementToday() : 0.0); - ps.setDouble(10, e.getIncrementMinute() != null ? e.getIncrementMinute() : 0.0); - ps.setLong(11, info.getUploadAt()); - ps.setString(12, e.getAttrCode()); - } - - @Override - public int getBatchSize() { - return list.size(); - } - } - ); + if (list.isEmpty()) return; - auroraJdbcTemplate.batchUpdate( - "INSERT INTO dashboard_realtime_accumulate_day (" + - "device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at, attr_code) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?) " + - "ON DUPLICATE KEY UPDATE " + - "upload_value = VALUES(upload_value), " + - "increment_today = VALUES(increment_today), " + - "upload_at = VALUES(upload_at)", - new BatchPreparedStatementSetter() { - - @Override - public void setValues(PreparedStatement ps, int i) throws SQLException { - AccumulateEvent e = list.get(i); - StatisticsAccumulateInfo info = e.getInfo(); - - ps.setString(1, e.getDeviceId()); - ps.setInt(2, info.getYearKey()); - ps.setInt(3, info.getMonthKey()); - ps.setInt(4, info.getDayKey()); - ps.setString(5, e.getUploadValue()); - ps.setDouble(6, e.getIncrementToday()); - ps.setLong(7, info.getUploadAt()); - ps.setString(8, e.getAttrCode()); - } - - @Override - public int getBatchSize() { - return list.size(); - } - } + StringBuilder insertSql = new StringBuilder( + "INSERT INTO dashboard_record_accumulate " + + "(device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + + "upload_value, increment_today, increment_minute, upload_at, attr_code) VALUES " + ); + List insertParams = new ArrayList<>(); + for (int i = 0; i < list.size(); i++) { + AccumulateEvent e = list.get(i); + StatisticsAccumulateInfo info = e.getInfo(); + + insertSql.append("(?,?,?,?,?,?,?,?,?,?,?,?)"); + if (i < list.size() - 1) { + insertSql.append(","); + } + + insertParams.add(e.getDeviceId()); + insertParams.add(info.getYearKey()); + insertParams.add(info.getMonthKey()); + insertParams.add(info.getDayKey()); + insertParams.add(info.getHourKey()); + insertParams.add(info.getMinuteKey()); + insertParams.add(info.getSecondKey()); + insertParams.add(e.getUploadValue()); + insertParams.add(e.getIncrementToday() != null ? e.getIncrementToday() : 0D); + insertParams.add(e.getIncrementMinute() != null ? e.getIncrementMinute() : 0D); + insertParams.add(info.getUploadAt()); + insertParams.add(e.getAttrCode()); + } + auroraJdbcTemplate.update(insertSql.toString(), insertParams.toArray()); + + //实时表 + StringBuilder updateSql = new StringBuilder( + "INSERT INTO dashboard_realtime_accumulate_day " + + "(device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at, attr_code) VALUES " ); + List updateParams = new ArrayList<>(); + for (int i = 0; i < list.size(); i++) { + AccumulateEvent e = list.get(i); + StatisticsAccumulateInfo info = e.getInfo(); + + updateSql.append("(?,?,?,?,?,?,?,?)"); + if (i < list.size() - 1) updateSql.append(","); + + updateParams.add(e.getDeviceId()); + updateParams.add(info.getYearKey()); + updateParams.add(info.getMonthKey()); + updateParams.add(info.getDayKey()); + updateParams.add(e.getUploadValue()); + updateParams.add(e.getIncrementToday()); + updateParams.add(info.getUploadAt()); + updateParams.add(e.getAttrCode()); + } + updateSql.append(" ON DUPLICATE KEY UPDATE ") + .append("upload_value = VALUES(upload_value), ") + .append("increment_today = VALUES(increment_today), ") + .append("upload_at = VALUES(upload_at)"); + auroraJdbcTemplate.update(updateSql.toString(), updateParams.toArray()); } } From 8e750bac4633f096c5885b8a40fc8afbdefbdd1c Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Sat, 13 Dec 2025 10:41:54 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E4=BC=98=E5=8C=96sql?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sender/dao/DashboardAlertDao.java | 79 ++++++++++--------- 1 file changed, 41 insertions(+), 38 deletions(-) diff --git a/src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java b/src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java index 37de8da..5b72a8e 100644 --- a/src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java +++ b/src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java @@ -9,6 +9,7 @@ import lombok.extern.slf4j.Slf4j; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -80,46 +81,48 @@ public class DashboardAlertDao { DataSourceContextHolder.setCurrentDataSourceKey(dsKey); // 批量执行 upsert - jdbcTemplate.batchUpdate( + StringBuilder sql = new StringBuilder( "INSERT INTO device_rawdata_realtime (" + - "device_id, building_id, status, receive_ts, alert_title, alert_content, alert_cancel_title," + - "alert_cancel_content, raw_data, upload_year, upload_month, upload_day) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + - "ON DUPLICATE KEY UPDATE " + - "building_id = VALUES(building_id), " + - "status = VALUES(status), " + - "receive_ts = VALUES(receive_ts)," + - "alert_title = VALUES(alert_title)," + - "alert_content = VALUES(alert_content)," + - "alert_cancel_title = VALUES(alert_cancel_title)," + - "alert_cancel_content = VALUES(alert_cancel_content)," + - "raw_data = VALUES(raw_data)," + - "upload_year = VALUES(upload_year)," + - "upload_month = VALUES(upload_month)," + - "upload_day = VALUES(upload_day)", - new BatchPreparedStatementSetter() { - - public void setValues(PreparedStatement ps, int i) throws SQLException { - DynamodbEntity entity = subList.get(i).getEntity(); - ps.setString(1, entity.getDeviceId()); - ps.setLong(2, entity.getDbBuildingId()); - ps.setString(3, entity.getStatus()); - ps.setLong(4, entity.getReceive_ts()); - ps.setString(5, entity.getAlertTitle()); - ps.setString(6, entity.getAlertContent()); - ps.setString(7, entity.getAlertCancelTitle()); - ps.setString(8, entity.getAlertCancelContent()); - ps.setString(9, entity.getRawData()); - ps.setInt(10, Integer.parseInt(entity.getYearKey())); - ps.setInt(11, Integer.parseInt(entity.getMonthKey())); - ps.setInt(12, Integer.parseInt(entity.getDayKey())); - } - - public int getBatchSize() { - return subList.size(); - } - } + "device_id, building_id, status, receive_ts, " + + "alert_title, alert_content, alert_cancel_title, alert_cancel_content, " + + "raw_data, upload_year, upload_month, upload_day) VALUES " ); + List params = new ArrayList<>(); + for (int i = 0; i < subList.size(); i++) { + DynamodbEntity entity = subList.get(i).getEntity(); + + sql.append("(?,?,?,?,?,?,?,?,?,?,?,?)"); + if (i < subList.size() - 1) { + sql.append(","); + } + + params.add(entity.getDeviceId()); + params.add(entity.getDbBuildingId()); + params.add(entity.getStatus()); + params.add(entity.getReceive_ts()); + params.add(entity.getAlertTitle()); + params.add(entity.getAlertContent()); + params.add(entity.getAlertCancelTitle()); + params.add(entity.getAlertCancelContent()); + params.add(entity.getRawData()); + params.add(Integer.parseInt(entity.getYearKey())); + params.add(Integer.parseInt(entity.getMonthKey())); + params.add(Integer.parseInt(entity.getDayKey())); + } + sql.append(" ON DUPLICATE KEY UPDATE ") + .append("building_id = VALUES(building_id), ") + .append("status = VALUES(status), ") + .append("receive_ts = VALUES(receive_ts), ") + .append("alert_title = VALUES(alert_title), ") + .append("alert_content = VALUES(alert_content), ") + .append("alert_cancel_title = VALUES(alert_cancel_title), ") + .append("alert_cancel_content = VALUES(alert_cancel_content), ") + .append("raw_data = VALUES(raw_data), ") + .append("upload_year = VALUES(upload_year), ") + .append("upload_month = VALUES(upload_month), ") + .append("upload_day = VALUES(upload_day)"); + jdbcTemplate.update(sql.toString(), params.toArray()); + } finally { DataSourceContextHolder.clearCurrentDataSourceKey(); } From 1a66d1f05e9fa2055ec2457c93b23dfecea11c45 Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Sat, 13 Dec 2025 16:31:28 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=89=B9=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../disruptor/BaseBatchEventHandler.java | 90 ++++++++++++------- 1 file changed, 60 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java b/src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java index 90a13ab..cacdf92 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java @@ -5,69 +5,71 @@ import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** - * 通用批量写库 Handler:支持 batchSize、定时 flush、线程安全 + * 批量 Handler + * + * 特点: + * 1. 锁内仅 swap buffer,DB IO 锁外执行 + * 2. 支持 batchSize + 定时 flush + * 3. Disruptor 对象复用安全 */ @Slf4j public abstract class BaseBatchEventHandler implements EventHandler { - protected final int batchSize; + private final int batchSize; + + /** 当前写入 buffer(只在锁内访问) */ + private List buffer = new ArrayList<>(); - protected final List buffer = new ArrayList<>(); private final Object lock = new Object(); + private final AtomicBoolean running = new AtomicBoolean(true); public BaseBatchEventHandler(int batchSize) { this.batchSize = batchSize; } - // 子类实现写库逻辑 + /** 子类实现 DB 写入 */ protected abstract void flushToDb(List events); - // 子类负责 copy(避免对象复用覆盖) - protected abstract T copyOf(T e); + /** 子类实现深拷贝 */ + protected abstract T copyOf(T event); @Override - public void onEvent(T event, long seq, boolean endOfBatch) { + public void onEvent(T event, long sequence, boolean endOfBatch) { + List toFlush = null; + synchronized (lock) { buffer.add(copyOf(event)); if (buffer.size() >= batchSize) { - flushLocked(); + toFlush = buffer; + buffer = new ArrayList<>(batchSize); } } - } - - private void flushLocked() { - if (buffer.isEmpty()) return; - - try { - flushToDb(buffer); - } catch (Exception ex) { - log.error("batch flush failed", ex); - } - - buffer.clear(); - } - protected void flush() { - synchronized (lock) { - flushLocked(); + // DB IO 在锁外 + if (toFlush != null) { + flushSafely(toFlush); } } + /** 定时 flush 线程 */ @Override public void onStart() { Thread flusher = new Thread(() -> { - while (true) { + while (running.get()) { try { - Thread.sleep(1000); // 每秒检查一次 - flush(); + Thread.sleep(1000); + timedFlush(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } catch (Exception e) { log.error("timer flush failed", e); } } - }); + }, "batch-flusher"); flusher.setDaemon(true); flusher.start(); @@ -75,6 +77,34 @@ public abstract class BaseBatchEventHandler implements EventHandler { @Override public void onShutdown() { - flush(); + running.set(false); + timedFlush(); + } + + /** 定时触发 flush */ + private void timedFlush() { + List toFlush = null; + + synchronized (lock) { + if (!buffer.isEmpty()) { + toFlush = buffer; + buffer = new ArrayList<>(batchSize); + } + } + + if (toFlush != null) { + flushSafely(toFlush); + } + } + + /** DB flush 统一保护 */ + private void flushSafely(List list) { + if (list.isEmpty()) return; + + try { + flushToDb(list); + } catch (Exception e) { + log.error("batch flush failed, size={}", list.size(), e); + } } -} +} \ No newline at end of file