Browse Source

批量更新设备实时信息

jwy_category
review512jwy@163.com 1 month ago
parent
commit
cd5c143f6e
  1. 37
      src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java
  2. 50
      src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java
  3. 14
      src/main/java/com/techsor/datacenter/sender/disruptor/AlertEvent.java
  4. 10
      src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventFactory.java
  5. 65
      src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventHandler.java
  6. 24
      src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java
  7. 6
      src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java
  8. 3
      src/main/resources/application-dev.properties
  9. 3
      src/main/resources/application-prd.properties

37
src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java

@ -4,6 +4,7 @@ import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.dsl.ProducerType;
import com.techsor.datacenter.sender.dao.DashboardAlertDao;
import com.techsor.datacenter.sender.dao.DashboardStatisticsDao; import com.techsor.datacenter.sender.dao.DashboardStatisticsDao;
import com.techsor.datacenter.sender.disruptor.*; import com.techsor.datacenter.sender.disruptor.*;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -15,11 +16,8 @@ import java.util.concurrent.Executors;
@Configuration @Configuration
public class DisruptorConfig { public class DisruptorConfig {
@Value("${accumulate.batch-size:100}") @Value("${data.operation.batch-size:100}")
private int accumulateBatchSize; private int batchSize;
@Value("${measure.batch-size:100}")
private int measureBatchSize;
@Bean @Bean
public Disruptor<MeasureEvent> measureDisruptor(DashboardStatisticsDao dao) { public Disruptor<MeasureEvent> measureDisruptor(DashboardStatisticsDao dao) {
@ -34,7 +32,7 @@ public class DisruptorConfig {
new BlockingWaitStrategy() new BlockingWaitStrategy()
); );
disruptor.handleEventsWith(new MeasureEventHandler(dao, measureBatchSize)); disruptor.handleEventsWith(new MeasureEventHandler(dao, batchSize));
disruptor.start(); disruptor.start();
return disruptor; return disruptor;
@ -58,7 +56,7 @@ public class DisruptorConfig {
new BlockingWaitStrategy() new BlockingWaitStrategy()
); );
disruptor.handleEventsWith(new AccumulateEventHandler(dao, accumulateBatchSize)); disruptor.handleEventsWith(new AccumulateEventHandler(dao, batchSize));
disruptor.start(); disruptor.start();
return disruptor; return disruptor;
@ -68,4 +66,29 @@ public class DisruptorConfig {
public RingBuffer<AccumulateEvent> accumulateRingBuffer(Disruptor<AccumulateEvent> disruptor) { public RingBuffer<AccumulateEvent> accumulateRingBuffer(Disruptor<AccumulateEvent> disruptor) {
return disruptor.getRingBuffer(); return disruptor.getRingBuffer();
} }
@Bean
public Disruptor<AlertEvent> alertDisruptor(DashboardAlertDao dao) {
int ringSize = 1024;
Disruptor<AlertEvent> disruptor = new Disruptor<>(
new AlertEventFactory(),
ringSize,
Executors.defaultThreadFactory(),
ProducerType.MULTI,
new BlockingWaitStrategy()
);
disruptor.handleEventsWith(new AlertEventHandler(dao, batchSize));
disruptor.start();
return disruptor;
}
@Bean
public RingBuffer<AlertEvent> alertRingBuffer(Disruptor<AlertEvent> disruptor) {
return disruptor.getRingBuffer();
}
} }

50
src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java

@ -1,15 +1,18 @@
package com.techsor.datacenter.sender.dao; package com.techsor.datacenter.sender.dao;
import com.techsor.datacenter.sender.disruptor.AlertEvent;
import com.techsor.datacenter.sender.entitiy.AlertHistoryDTO; import com.techsor.datacenter.sender.entitiy.AlertHistoryDTO;
import com.techsor.datacenter.sender.entitiy.DynamodbEntity; import com.techsor.datacenter.sender.entitiy.DynamodbEntity;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -59,6 +62,52 @@ public class DashboardAlertDao {
} }
} }
public void batchUpsertRawData(List<AlertEvent> list) {
jdbcTemplate.batchUpdate(
"INSERT INTO device_rawdata_realtime (" +
"device_id, building_id, status, receive_ts, alert_title, alert_content, " +
"alert_cancel_title, alert_cancel_content, raw_data, upload_year, upload_month, upload_day) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE " +
"building_id = VALUES(building_id), " +
"status = VALUES(status), " +
"receive_ts = VALUES(receive_ts), " +
"alert_title = VALUES(alert_title), " +
"alert_content = VALUES(alert_content), " +
"alert_cancel_title = VALUES(alert_cancel_title), " +
"alert_cancel_content = VALUES(alert_cancel_content), " +
"raw_data = VALUES(raw_data), " +
"upload_year = VALUES(upload_year), " +
"upload_month = VALUES(upload_month), " +
"upload_day = VALUES(upload_day)",
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
AlertEvent e = list.get(i);
ps.setString(1, e.getEntity().getDeviceId());
ps.setLong(2, e.getEntity().getDbBuildingId());
ps.setString(3, e.getEntity().getStatus());
ps.setLong(4, e.getEntity().getReceive_ts());
ps.setString(5, e.getEntity().getAlertTitle());
ps.setString(6, e.getEntity().getAlertContent());
ps.setString(7, e.getEntity().getAlertCancelTitle());
ps.setString(8, e.getEntity().getAlertCancelContent());
ps.setString(9, e.getEntity().getRawData());
ps.setInt(10, Integer.parseInt(e.getEntity().getYearKey()));
ps.setInt(11, Integer.parseInt(e.getEntity().getMonthKey()));
ps.setInt(12, Integer.parseInt(e.getEntity().getDayKey()));
}
@Override
public int getBatchSize() {
return list.size();
}
}
);
}
public void insertAlertHistory(DynamodbEntity entity) { public void insertAlertHistory(DynamodbEntity entity) {
String sql = "INSERT INTO alert_history (" + String sql = "INSERT INTO alert_history (" +
"device_id, receive_ts, retain_alert" + "device_id, receive_ts, retain_alert" +
@ -112,5 +161,4 @@ public class DashboardAlertDao {
String updateHandleHistorySql = "UPDATE alert_handle_history SET status = 4, alert_status = 1 WHERE alert_history_id = ? and status != 3"; String updateHandleHistorySql = "UPDATE alert_handle_history SET status = 4, alert_status = 1 WHERE alert_history_id = ? and status != 3";
jdbcTemplate.update(updateHandleHistorySql, alertHistoryId); jdbcTemplate.update(updateHandleHistorySql, alertHistoryId);
} }
} }

14
src/main/java/com/techsor/datacenter/sender/disruptor/AlertEvent.java

@ -0,0 +1,14 @@
package com.techsor.datacenter.sender.disruptor;
import com.techsor.datacenter.sender.entitiy.DynamodbEntity;
import lombok.Data;
@Data
public class AlertEvent {
private DynamodbEntity entity;
public void clear() {
this.entity = null;
}
}

10
src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventFactory.java

@ -0,0 +1,10 @@
package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.EventFactory;
public class AlertEventFactory implements EventFactory<AlertEvent> {
@Override
public AlertEvent newInstance() {
return new AlertEvent();
}
}

65
src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventHandler.java

@ -0,0 +1,65 @@
package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.EventHandler;
import com.techsor.datacenter.sender.dao.DashboardAlertDao;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class AlertEventHandler implements EventHandler<AlertEvent> {
private final DashboardAlertDao dao;
private final int batchSize;
private final List<AlertEvent> buffer;
private long lastFlushTime = System.currentTimeMillis();
public AlertEventHandler(DashboardAlertDao dao, int batchSize) {
this.dao = dao;
this.batchSize = batchSize;
this.buffer = new ArrayList<>(batchSize);
}
@Override
public void onEvent(AlertEvent e, long seq, boolean endOfBatch) {
buffer.add(copyOf(e));
// 自动 flush:达到批量条数或超过 1 秒未 flush
if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime > 1000) {
flush();
}
e.clear();
}
private AlertEvent copyOf(AlertEvent e) {
AlertEvent a = new AlertEvent();
a.setEntity(e.getEntity());
return a;
}
private void flush() {
if (buffer.isEmpty()) return;
try {
dao.batchUpsertRawData(buffer);
} catch (Exception ex) {
log.error("alert batch DB failed", ex);
}
buffer.clear();
lastFlushTime = System.currentTimeMillis();
}
@Override
public void onStart() {}
@Override
public void onShutdown() {
flush(); // 程序退出 flush
}
}

24
src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java

@ -0,0 +1,24 @@
package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.RingBuffer;
import com.techsor.datacenter.sender.entitiy.DynamodbEntity;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class AlertService {
private final RingBuffer<AlertEvent> ringBuffer;
public void write(DynamodbEntity entity) {
long seq = ringBuffer.next();
try {
AlertEvent event = ringBuffer.get(seq);
event.setEntity(entity);
} finally {
ringBuffer.publish(seq);
}
}
}

6
src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java

@ -22,6 +22,7 @@ import com.techsor.datacenter.sender.config.DataSourceContextHolder;
import com.techsor.datacenter.sender.constants.Constants; import com.techsor.datacenter.sender.constants.Constants;
import com.techsor.datacenter.sender.dao.*; import com.techsor.datacenter.sender.dao.*;
import com.techsor.datacenter.sender.disruptor.AccumulateService; import com.techsor.datacenter.sender.disruptor.AccumulateService;
import com.techsor.datacenter.sender.disruptor.AlertService;
import com.techsor.datacenter.sender.disruptor.MeasureService; import com.techsor.datacenter.sender.disruptor.MeasureService;
import com.techsor.datacenter.sender.dto.DeviceAlertInfo; import com.techsor.datacenter.sender.dto.DeviceAlertInfo;
import com.techsor.datacenter.sender.dto.DeviceInfoVO; import com.techsor.datacenter.sender.dto.DeviceInfoVO;
@ -175,6 +176,8 @@ public class DataProcessServiceImpl implements IDataProcessService {
private MeasureService measureService; private MeasureService measureService;
@Autowired @Autowired
private AccumulateService accumulateService; private AccumulateService accumulateService;
@Autowired
private AlertService alertService;
private static final ObjectMapper objectMapper = new ObjectMapper(); private static final ObjectMapper objectMapper = new ObjectMapper();
@ -816,7 +819,8 @@ public class DataProcessServiceImpl implements IDataProcessService {
//更新实时信息 //更新实时信息
try { try {
dashboardAlertDao.upsertDeviceRawData(baseTransDataEntity); // dashboardAlertDao.upsertDeviceRawData(baseTransDataEntity);
alertService.write(baseTransDataEntity);
this.alramRedisTemplate.opsForHash().put( this.alramRedisTemplate.opsForHash().put(
REDIS_DASHBOARD_DEVICE_STATUS_KEY, REDIS_DASHBOARD_DEVICE_STATUS_KEY,
baseTransDataEntity.getDeviceId(), baseTransDataEntity.getDeviceId(),

3
src/main/resources/application-dev.properties

@ -97,8 +97,7 @@ category.accumulate.deviceTypeIds=48,112,122
# 状态类设备类型ID # 状态类设备类型ID
category.status.deviceTypeIds=86,113,123 category.status.deviceTypeIds=86,113,123
accumulate.batch-size=${accumulateBatchSize:100} data.operation.batch-size=${dataOperationBatchSize:100}
measure.batch-size=${measureBatchSize:100}
sys.mqtt.endpoint=${sysMqttEndpoint:mqtt-stg.kr-sensor.net} sys.mqtt.endpoint=${sysMqttEndpoint:mqtt-stg.kr-sensor.net}
sys.mqtt.port=${sysMqttPort:1883} sys.mqtt.port=${sysMqttPort:1883}

3
src/main/resources/application-prd.properties

@ -91,8 +91,7 @@ category.accumulate.deviceTypeIds=48,112,122
# 状态类设备类型ID # 状态类设备类型ID
category.status.deviceTypeIds=86,113,123 category.status.deviceTypeIds=86,113,123
accumulate.batch-size=${accumulateBatchSize:100} data.operation.batch-size=${dataOperationBatchSize:100}
measure.batch-size=${measureBatchSize:100}
sys.mqtt.endpoint=${sysMqttEndpoint:mqtt-stg.kr-sensor.net} sys.mqtt.endpoint=${sysMqttEndpoint:mqtt-stg.kr-sensor.net}
sys.mqtt.port=${sysMqttPort:1883} sys.mqtt.port=${sysMqttPort:1883}

Loading…
Cancel
Save