From e2d9cc6649f77e337cdfc3cb41df6081dcef3268 Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Fri, 12 Dec 2025 22:50:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sender/config/DisruptorConfig.java | 43 +++++++++++-------- .../sender/disruptor/AccumulateService.java | 16 ++++--- .../sender/disruptor/AlertService.java | 15 ++++--- .../sender/disruptor/MeasureService.java | 15 ++++--- .../service/impl/DataProcessServiceImpl.java | 22 +++++----- 5 files changed, 62 insertions(+), 49 deletions(-) diff --git a/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java b/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java index 6fc01cd..9bd73c8 100644 --- a/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java +++ b/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java @@ -1,7 +1,7 @@ package com.techsor.datacenter.sender.config; -import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import com.techsor.datacenter.sender.dao.DashboardAlertDao; @@ -11,7 +11,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; @Configuration public class DisruptorConfig { @@ -19,22 +19,30 @@ public class DisruptorConfig { @Value("${data.operation.batch-size:100}") private int batchSize; + /** 创建带线程名的 ThreadFactory **/ + private ThreadFactory createDisruptorThreadFactory(String namePrefix) { + return r -> { + Thread t = new Thread(r); + t.setName(namePrefix + "-" + t.getId()); + t.setDaemon(true); + return t; + }; + } + @Bean public Disruptor measureDisruptor(DashboardStatisticsDao dao) { - - int ringSize = 1024; + int ringSize = 32768; Disruptor disruptor = new Disruptor<>( new MeasureEventFactory(), ringSize, - Executors.defaultThreadFactory(), + createDisruptorThreadFactory("measure-disruptor"), ProducerType.MULTI, - new BlockingWaitStrategy() + new SleepingWaitStrategy() ); disruptor.handleEventsWith(new MeasureEventHandler(dao, batchSize)); disruptor.start(); - return disruptor; } @@ -43,22 +51,21 @@ public class DisruptorConfig { return disruptor.getRingBuffer(); } + @Bean public Disruptor accumulateDisruptor(DashboardStatisticsDao dao) { - - int ringSize = 1024; + int ringSize = 32768; Disruptor disruptor = new Disruptor<>( new AccumulateEventFactory(), ringSize, - Executors.defaultThreadFactory(), + createDisruptorThreadFactory("accumulate-disruptor"), ProducerType.MULTI, - new BlockingWaitStrategy() + new SleepingWaitStrategy() ); disruptor.handleEventsWith(new AccumulateEventHandler(dao, batchSize)); disruptor.start(); - return disruptor; } @@ -67,22 +74,21 @@ public class DisruptorConfig { return disruptor.getRingBuffer(); } + @Bean public Disruptor alertDisruptor(DashboardAlertDao dao) { - - int ringSize = 1024; + int ringSize = 32768; Disruptor disruptor = new Disruptor<>( new AlertEventFactory(), ringSize, - Executors.defaultThreadFactory(), + createDisruptorThreadFactory("alert-disruptor"), ProducerType.MULTI, - new BlockingWaitStrategy() + new SleepingWaitStrategy() ); disruptor.handleEventsWith(new AlertEventHandler(dao, batchSize)); disruptor.start(); - return disruptor; } @@ -90,5 +96,4 @@ public class DisruptorConfig { public RingBuffer alertRingBuffer(Disruptor disruptor) { return disruptor.getRingBuffer(); } - -} +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java index 20f43cc..9b8ac22 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java @@ -3,10 +3,12 @@ package com.techsor.datacenter.sender.disruptor; import com.lmax.disruptor.RingBuffer; import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor +@Slf4j public class AccumulateService { private final RingBuffer ringBuffer; @@ -18,17 +20,19 @@ public class AccumulateService { Double incrementMinute, StatisticsAccumulateInfo info) { - long seq = ringBuffer.next(); - try { - AccumulateEvent event = ringBuffer.get(seq); + boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> { event.setUploadValue(uploadValue); event.setDeviceId(deviceId); event.setAttrCode(attrCode); event.setIncrementToday(incrementToday); event.setIncrementMinute(incrementMinute); event.setInfo(info); - } finally { - ringBuffer.publish(seq); + }); + + if (!ok) { + // ringBuffer 满了 + log.error("[AccumulateService] RingBuffer is FULL, message dropped! deviceId={}", deviceId); } } -} + +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java index b5162b5..9c92ae7 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java @@ -3,22 +3,25 @@ package com.techsor.datacenter.sender.disruptor; import com.lmax.disruptor.RingBuffer; import com.techsor.datacenter.sender.entitiy.DynamodbEntity; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor +@Slf4j public class AlertService { private final RingBuffer ringBuffer; public void write(DynamodbEntity entity) { - long seq = ringBuffer.next(); - try { - AlertEvent event = ringBuffer.get(seq); + boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> { event.setEntity(entity); - } finally { - ringBuffer.publish(seq); + }); + + if (!ok) { + log.error("[AlertService] RingBuffer FULL! dropped alert. deviceId={}", + entity != null ? entity.getDeviceId() : "null"); } } -} +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java index 632b41e..57bb72c 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java @@ -3,12 +3,14 @@ package com.techsor.datacenter.sender.disruptor; import com.lmax.disruptor.RingBuffer; import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.math.BigDecimal; @Service @RequiredArgsConstructor +@Slf4j public class MeasureService { private final RingBuffer ringBuffer; @@ -20,18 +22,17 @@ public class MeasureService { BigDecimal minValue, BigDecimal maxValue) { - long seq = ringBuffer.next(); - try { - MeasureEvent event = ringBuffer.get(seq); + boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> { event.setUploadValue(uploadValue); event.setDeviceId(deviceId); event.setAttrCode(attrCode); event.setInfo(info); event.setMinValue(minValue); event.setMaxValue(maxValue); - } finally { - ringBuffer.publish(seq); + }); + + if (!ok) { + log.error("[MeasureService] RingBuffer FULL! dropped data. deviceId={}", deviceId); } } -} - +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java index fb08ab1..7afc119 100644 --- a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java +++ b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java @@ -622,17 +622,17 @@ public class DataProcessServiceImpl implements IDataProcessService { } baseTransDataEntity.setHashId(UUID.randomUUID()); -// try { -// handleDashboardAlert(baseTransDataEntity); -// } catch (Exception e) { -// log.error("dashboard alert error", e); -// } -// -// try { -// minuteLevelStorage(baseTransDataEntity); -// } catch (Exception e) { -// log.error("minuteLevelStorage error", e); -// } + try { + handleDashboardAlert(baseTransDataEntity); + } catch (Exception e) { + log.error("dashboard alert error", e); + } + + try { + minuteLevelStorage(baseTransDataEntity); + } catch (Exception e) { + log.error("minuteLevelStorage error", e); + } // try { // if ("alert".equals(baseTransDataEntity.getStatus())) {