diff --git a/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java b/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java index 805a28e..380385b 100644 --- a/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java +++ b/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java @@ -6,6 +6,7 @@ import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import com.techsor.datacenter.sender.dao.DashboardStatisticsDao; import com.techsor.datacenter.sender.disruptor.*; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -14,6 +15,12 @@ import java.util.concurrent.Executors; @Configuration public class DisruptorConfig { + @Value("${accumulate.batch-size:100}") + private int accumulateBatchSize; + + @Value("${measure.batch-size:100}") + private int measureBatchSize; + @Bean public Disruptor measureDisruptor(DashboardStatisticsDao dao) { @@ -27,7 +34,7 @@ public class DisruptorConfig { new BlockingWaitStrategy() ); - disruptor.handleEventsWith(new MeasureEventHandler(dao)); + disruptor.handleEventsWith(new MeasureEventHandler(dao, measureBatchSize)); disruptor.start(); return disruptor; @@ -51,7 +58,7 @@ public class DisruptorConfig { new BlockingWaitStrategy() ); - disruptor.handleEventsWith(new AccumulateEventHandler(dao)); + disruptor.handleEventsWith(new AccumulateEventHandler(dao, accumulateBatchSize)); disruptor.start(); return disruptor; diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java index e2bb8b7..9b1ae19 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java @@ -12,14 +12,16 @@ public class AccumulateEventHandler implements EventHandler { private final DashboardStatisticsDao dao; - private final int batchSize = 100; + private final int batchSize; - private final List buffer = new ArrayList<>(batchSize); + private final List buffer; private long lastFlushTime = System.currentTimeMillis(); - public AccumulateEventHandler(DashboardStatisticsDao dao) { + public AccumulateEventHandler(DashboardStatisticsDao dao, int batchSize) { this.dao = dao; + this.batchSize = batchSize; + this.buffer = new ArrayList<>(batchSize); } @Override diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java index 54502a6..9d5d357 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java @@ -12,14 +12,16 @@ public class MeasureEventHandler implements EventHandler { private final DashboardStatisticsDao dao; - private final int batchSize = 100; + private final int batchSize; - private final List buffer = new ArrayList<>(batchSize); + private final List buffer; private long lastFlushTime = System.currentTimeMillis(); - public MeasureEventHandler(DashboardStatisticsDao dao) { + public MeasureEventHandler(DashboardStatisticsDao dao, int batchSize) { this.dao = dao; + this.batchSize = batchSize; + this.buffer = new ArrayList<>(batchSize); } @Override diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index d35b2fc..702edd5 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -97,6 +97,9 @@ category.accumulate.deviceTypeIds=48,112,122 # 状态类设备类型ID category.status.deviceTypeIds=86,113,123 +accumulate.batch-size=${accumulateBatchSize:100} +measure.batch-size=${measureBatchSize:100} + sys.mqtt.endpoint=${sysMqttEndpoint:mqtt-stg.kr-sensor.net} sys.mqtt.port=${sysMqttPort:1883} sys.mqtt.username=${sysMqttUsername:test} diff --git a/src/main/resources/application-prd.properties b/src/main/resources/application-prd.properties index c8ce405..37db067 100644 --- a/src/main/resources/application-prd.properties +++ b/src/main/resources/application-prd.properties @@ -91,6 +91,9 @@ category.accumulate.deviceTypeIds=48,112,122 # 状态类设备类型ID category.status.deviceTypeIds=86,113,123 +accumulate.batch-size=${accumulateBatchSize:100} +measure.batch-size=${measureBatchSize:100} + sys.mqtt.endpoint=${sysMqttEndpoint:mqtt-stg.kr-sensor.net} sys.mqtt.port=${sysMqttPort:1883} sys.mqtt.username=${sysMqttUsername:test}