diff --git a/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java b/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java index 380385b..6fc01cd 100644 --- a/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java +++ b/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java @@ -4,6 +4,7 @@ import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import com.techsor.datacenter.sender.dao.DashboardAlertDao; import com.techsor.datacenter.sender.dao.DashboardStatisticsDao; import com.techsor.datacenter.sender.disruptor.*; import org.springframework.beans.factory.annotation.Value; @@ -15,11 +16,8 @@ import java.util.concurrent.Executors; @Configuration public class DisruptorConfig { - @Value("${accumulate.batch-size:100}") - private int accumulateBatchSize; - - @Value("${measure.batch-size:100}") - private int measureBatchSize; + @Value("${data.operation.batch-size:100}") + private int batchSize; @Bean public Disruptor measureDisruptor(DashboardStatisticsDao dao) { @@ -34,7 +32,7 @@ public class DisruptorConfig { new BlockingWaitStrategy() ); - disruptor.handleEventsWith(new MeasureEventHandler(dao, measureBatchSize)); + disruptor.handleEventsWith(new MeasureEventHandler(dao, batchSize)); disruptor.start(); return disruptor; @@ -58,7 +56,7 @@ public class DisruptorConfig { new BlockingWaitStrategy() ); - disruptor.handleEventsWith(new AccumulateEventHandler(dao, accumulateBatchSize)); + disruptor.handleEventsWith(new AccumulateEventHandler(dao, batchSize)); disruptor.start(); return disruptor; @@ -68,4 +66,29 @@ public class DisruptorConfig { public RingBuffer accumulateRingBuffer(Disruptor disruptor) { return disruptor.getRingBuffer(); } + + @Bean + public Disruptor alertDisruptor(DashboardAlertDao dao) { + + int ringSize = 1024; + + Disruptor disruptor = new Disruptor<>( + new AlertEventFactory(), + ringSize, + Executors.defaultThreadFactory(), + ProducerType.MULTI, + new BlockingWaitStrategy() + ); + + disruptor.handleEventsWith(new AlertEventHandler(dao, batchSize)); + disruptor.start(); + + return disruptor; + } + + @Bean + public RingBuffer alertRingBuffer(Disruptor disruptor) { + return disruptor.getRingBuffer(); + } + } diff --git a/src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java b/src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java index ed0f5ab..c27ffd0 100644 --- a/src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java +++ b/src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java @@ -1,15 +1,18 @@ package com.techsor.datacenter.sender.dao; +import com.techsor.datacenter.sender.disruptor.AlertEvent; import com.techsor.datacenter.sender.entitiy.AlertHistoryDTO; import com.techsor.datacenter.sender.entitiy.DynamodbEntity; import lombok.extern.slf4j.Slf4j; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Component; @@ -59,6 +62,52 @@ public class DashboardAlertDao { } } + public void batchUpsertRawData(List list) { + jdbcTemplate.batchUpdate( + "INSERT INTO device_rawdata_realtime (" + + "device_id, building_id, status, receive_ts, alert_title, alert_content, " + + "alert_cancel_title, alert_cancel_content, raw_data, upload_year, upload_month, upload_day) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + + "ON DUPLICATE KEY UPDATE " + + "building_id = VALUES(building_id), " + + "status = VALUES(status), " + + "receive_ts = VALUES(receive_ts), " + + "alert_title = VALUES(alert_title), " + + "alert_content = VALUES(alert_content), " + + "alert_cancel_title = VALUES(alert_cancel_title), " + + "alert_cancel_content = VALUES(alert_cancel_content), " + + "raw_data = VALUES(raw_data), " + + "upload_year = VALUES(upload_year), " + + "upload_month = VALUES(upload_month), " + + "upload_day = VALUES(upload_day)", + new BatchPreparedStatementSetter() { + + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + AlertEvent e = list.get(i); + + ps.setString(1, e.getEntity().getDeviceId()); + ps.setLong(2, e.getEntity().getDbBuildingId()); + ps.setString(3, e.getEntity().getStatus()); + ps.setLong(4, e.getEntity().getReceive_ts()); + ps.setString(5, e.getEntity().getAlertTitle()); + ps.setString(6, e.getEntity().getAlertContent()); + ps.setString(7, e.getEntity().getAlertCancelTitle()); + ps.setString(8, e.getEntity().getAlertCancelContent()); + ps.setString(9, e.getEntity().getRawData()); + ps.setInt(10, Integer.parseInt(e.getEntity().getYearKey())); + ps.setInt(11, Integer.parseInt(e.getEntity().getMonthKey())); + ps.setInt(12, Integer.parseInt(e.getEntity().getDayKey())); + } + + @Override + public int getBatchSize() { + return list.size(); + } + } + ); + } + public void insertAlertHistory(DynamodbEntity entity) { String sql = "INSERT INTO alert_history (" + "device_id, receive_ts, retain_alert" + @@ -112,5 +161,4 @@ public class DashboardAlertDao { String updateHandleHistorySql = "UPDATE alert_handle_history SET status = 4, alert_status = 1 WHERE alert_history_id = ? and status != 3"; jdbcTemplate.update(updateHandleHistorySql, alertHistoryId); } - } diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AlertEvent.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertEvent.java new file mode 100644 index 0000000..0d7bce6 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertEvent.java @@ -0,0 +1,14 @@ +package com.techsor.datacenter.sender.disruptor; + +import com.techsor.datacenter.sender.entitiy.DynamodbEntity; +import lombok.Data; + +@Data +public class AlertEvent { + + private DynamodbEntity entity; + + public void clear() { + this.entity = null; + } +} diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventFactory.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventFactory.java new file mode 100644 index 0000000..cfabeef --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventFactory.java @@ -0,0 +1,10 @@ +package com.techsor.datacenter.sender.disruptor; + +import com.lmax.disruptor.EventFactory; + +public class AlertEventFactory implements EventFactory { + @Override + public AlertEvent newInstance() { + return new AlertEvent(); + } +} diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventHandler.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventHandler.java new file mode 100644 index 0000000..f6954a4 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventHandler.java @@ -0,0 +1,65 @@ +package com.techsor.datacenter.sender.disruptor; + +import com.lmax.disruptor.EventHandler; +import com.techsor.datacenter.sender.dao.DashboardAlertDao; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class AlertEventHandler implements EventHandler { + + private final DashboardAlertDao dao; + + private final int batchSize; + + private final List buffer; + + private long lastFlushTime = System.currentTimeMillis(); + + public AlertEventHandler(DashboardAlertDao dao, int batchSize) { + this.dao = dao; + this.batchSize = batchSize; + this.buffer = new ArrayList<>(batchSize); + } + + @Override + public void onEvent(AlertEvent e, long seq, boolean endOfBatch) { + buffer.add(copyOf(e)); + + // 自动 flush:达到批量条数或超过 1 秒未 flush + if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime > 1000) { + flush(); + } + + e.clear(); + } + + private AlertEvent copyOf(AlertEvent e) { + AlertEvent a = new AlertEvent(); + a.setEntity(e.getEntity()); + return a; + } + + private void flush() { + if (buffer.isEmpty()) return; + + try { + dao.batchUpsertRawData(buffer); + } catch (Exception ex) { + log.error("alert batch DB failed", ex); + } + + buffer.clear(); + lastFlushTime = System.currentTimeMillis(); + } + + @Override + public void onStart() {} + + @Override + public void onShutdown() { + flush(); // 程序退出 flush + } +} diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java new file mode 100644 index 0000000..b5162b5 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java @@ -0,0 +1,24 @@ +package com.techsor.datacenter.sender.disruptor; + +import com.lmax.disruptor.RingBuffer; +import com.techsor.datacenter.sender.entitiy.DynamodbEntity; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +public class AlertService { + + private final RingBuffer ringBuffer; + + public void write(DynamodbEntity entity) { + + long seq = ringBuffer.next(); + try { + AlertEvent event = ringBuffer.get(seq); + event.setEntity(entity); + } finally { + ringBuffer.publish(seq); + } + } +} diff --git a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java index 42d3ce3..ebdaf73 100644 --- a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java +++ b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java @@ -22,6 +22,7 @@ import com.techsor.datacenter.sender.config.DataSourceContextHolder; import com.techsor.datacenter.sender.constants.Constants; import com.techsor.datacenter.sender.dao.*; import com.techsor.datacenter.sender.disruptor.AccumulateService; +import com.techsor.datacenter.sender.disruptor.AlertService; import com.techsor.datacenter.sender.disruptor.MeasureService; import com.techsor.datacenter.sender.dto.DeviceAlertInfo; import com.techsor.datacenter.sender.dto.DeviceInfoVO; @@ -175,6 +176,8 @@ public class DataProcessServiceImpl implements IDataProcessService { private MeasureService measureService; @Autowired private AccumulateService accumulateService; + @Autowired + private AlertService alertService; private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -816,7 +819,8 @@ public class DataProcessServiceImpl implements IDataProcessService { //更新实时信息 try { - dashboardAlertDao.upsertDeviceRawData(baseTransDataEntity); +// dashboardAlertDao.upsertDeviceRawData(baseTransDataEntity); + alertService.write(baseTransDataEntity); this.alramRedisTemplate.opsForHash().put( REDIS_DASHBOARD_DEVICE_STATUS_KEY, baseTransDataEntity.getDeviceId(), diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index 702edd5..bf44626 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -97,8 +97,7 @@ category.accumulate.deviceTypeIds=48,112,122 # 状态类设备类型ID category.status.deviceTypeIds=86,113,123 -accumulate.batch-size=${accumulateBatchSize:100} -measure.batch-size=${measureBatchSize:100} +data.operation.batch-size=${dataOperationBatchSize:100} sys.mqtt.endpoint=${sysMqttEndpoint:mqtt-stg.kr-sensor.net} sys.mqtt.port=${sysMqttPort:1883} diff --git a/src/main/resources/application-prd.properties b/src/main/resources/application-prd.properties index 37db067..a73b511 100644 --- a/src/main/resources/application-prd.properties +++ b/src/main/resources/application-prd.properties @@ -91,8 +91,7 @@ category.accumulate.deviceTypeIds=48,112,122 # 状态类设备类型ID category.status.deviceTypeIds=86,113,123 -accumulate.batch-size=${accumulateBatchSize:100} -measure.batch-size=${measureBatchSize:100} +data.operation.batch-size=${dataOperationBatchSize:100} sys.mqtt.endpoint=${sysMqttEndpoint:mqtt-stg.kr-sensor.net} sys.mqtt.port=${sysMqttPort:1883}