Browse Source

批量操作数量提取到配置文件

jwy_category
review512jwy@163.com 1 month ago
parent
commit
97e64d45f7
  1. 11
      src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java
  2. 8
      src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java
  3. 8
      src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java
  4. 3
      src/main/resources/application-dev.properties
  5. 3
      src/main/resources/application-prd.properties

11
src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java

@ -6,6 +6,7 @@ import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.techsor.datacenter.sender.dao.DashboardStatisticsDao;
import com.techsor.datacenter.sender.disruptor.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -14,6 +15,12 @@ import java.util.concurrent.Executors;
@Configuration
public class DisruptorConfig {
@Value("${accumulate.batch-size:100}")
private int accumulateBatchSize;
@Value("${measure.batch-size:100}")
private int measureBatchSize;
@Bean
public Disruptor<MeasureEvent> measureDisruptor(DashboardStatisticsDao dao) {
@ -27,7 +34,7 @@ public class DisruptorConfig {
new BlockingWaitStrategy()
);
disruptor.handleEventsWith(new MeasureEventHandler(dao));
disruptor.handleEventsWith(new MeasureEventHandler(dao, measureBatchSize));
disruptor.start();
return disruptor;
@ -51,7 +58,7 @@ public class DisruptorConfig {
new BlockingWaitStrategy()
);
disruptor.handleEventsWith(new AccumulateEventHandler(dao));
disruptor.handleEventsWith(new AccumulateEventHandler(dao, accumulateBatchSize));
disruptor.start();
return disruptor;

8
src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java

@ -12,14 +12,16 @@ public class AccumulateEventHandler implements EventHandler<AccumulateEvent> {
private final DashboardStatisticsDao dao;
private final int batchSize = 100;
private final int batchSize;
private final List<AccumulateEvent> buffer = new ArrayList<>(batchSize);
private final List<AccumulateEvent> buffer;
private long lastFlushTime = System.currentTimeMillis();
public AccumulateEventHandler(DashboardStatisticsDao dao) {
public AccumulateEventHandler(DashboardStatisticsDao dao, int batchSize) {
this.dao = dao;
this.batchSize = batchSize;
this.buffer = new ArrayList<>(batchSize);
}
@Override

8
src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java

@ -12,14 +12,16 @@ public class MeasureEventHandler implements EventHandler<MeasureEvent> {
private final DashboardStatisticsDao dao;
private final int batchSize = 100;
private final int batchSize;
private final List<MeasureEvent> buffer = new ArrayList<>(batchSize);
private final List<MeasureEvent> buffer;
private long lastFlushTime = System.currentTimeMillis();
public MeasureEventHandler(DashboardStatisticsDao dao) {
public MeasureEventHandler(DashboardStatisticsDao dao, int batchSize) {
this.dao = dao;
this.batchSize = batchSize;
this.buffer = new ArrayList<>(batchSize);
}
@Override

3
src/main/resources/application-dev.properties

@ -97,6 +97,9 @@ category.accumulate.deviceTypeIds=48,112,122
# 状态类设备类型ID
category.status.deviceTypeIds=86,113,123
accumulate.batch-size=${accumulateBatchSize:100}
measure.batch-size=${measureBatchSize:100}
sys.mqtt.endpoint=${sysMqttEndpoint:mqtt-stg.kr-sensor.net}
sys.mqtt.port=${sysMqttPort:1883}
sys.mqtt.username=${sysMqttUsername:test}

3
src/main/resources/application-prd.properties

@ -91,6 +91,9 @@ category.accumulate.deviceTypeIds=48,112,122
# 状态类设备类型ID
category.status.deviceTypeIds=86,113,123
accumulate.batch-size=${accumulateBatchSize:100}
measure.batch-size=${measureBatchSize:100}
sys.mqtt.endpoint=${sysMqttEndpoint:mqtt-stg.kr-sensor.net}
sys.mqtt.port=${sysMqttPort:1883}
sys.mqtt.username=${sysMqttUsername:test}

Loading…
Cancel
Save