|
|
@ -1,5 +1,6 @@ |
|
|
package com.techsor.datacenter.sender.dao; |
|
|
package com.techsor.datacenter.sender.dao; |
|
|
|
|
|
|
|
|
|
|
|
import com.techsor.datacenter.sender.config.DataSourceContextHolder; |
|
|
import com.techsor.datacenter.sender.disruptor.AlertEvent; |
|
|
import com.techsor.datacenter.sender.disruptor.AlertEvent; |
|
|
import com.techsor.datacenter.sender.entitiy.AlertHistoryDTO; |
|
|
import com.techsor.datacenter.sender.entitiy.AlertHistoryDTO; |
|
|
import com.techsor.datacenter.sender.entitiy.DynamodbEntity; |
|
|
import com.techsor.datacenter.sender.entitiy.DynamodbEntity; |
|
|
@ -9,6 +10,8 @@ import java.sql.PreparedStatement; |
|
|
import java.sql.ResultSet; |
|
|
import java.sql.ResultSet; |
|
|
import java.sql.SQLException; |
|
|
import java.sql.SQLException; |
|
|
import java.util.List; |
|
|
import java.util.List; |
|
|
|
|
|
import java.util.Map; |
|
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
import org.apache.commons.collections.CollectionUtils; |
|
|
import org.apache.commons.collections.CollectionUtils; |
|
|
import org.apache.commons.lang.StringUtils; |
|
|
import org.apache.commons.lang.StringUtils; |
|
|
@ -63,10 +66,21 @@ public class DashboardAlertDao { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public void batchUpsertRawData(List<AlertEvent> list) { |
|
|
public void batchUpsertRawData(List<AlertEvent> list) { |
|
|
|
|
|
// 按 companyId 分组
|
|
|
|
|
|
Map<Long, List<AlertEvent>> grouped = list.stream() |
|
|
|
|
|
.filter(e -> e.getEntity() != null && e.getEntity().getCompanyId() != null) |
|
|
|
|
|
.collect(Collectors.groupingBy(e -> e.getEntity().getCompanyId())); |
|
|
|
|
|
|
|
|
|
|
|
grouped.forEach((companyId, subList) -> { |
|
|
|
|
|
String dsKey = "dataSourceForCompany_" + companyId; |
|
|
|
|
|
try { |
|
|
|
|
|
DataSourceContextHolder.setCurrentDataSourceKey(dsKey); |
|
|
|
|
|
|
|
|
|
|
|
// 批量执行 upsert
|
|
|
jdbcTemplate.batchUpdate( |
|
|
jdbcTemplate.batchUpdate( |
|
|
"INSERT INTO device_rawdata_realtime (" + |
|
|
"INSERT INTO device_rawdata_realtime (" + |
|
|
"device_id, building_id, status, receive_ts, alert_title, alert_content, " + |
|
|
"device_id, building_id, status, receive_ts, alert_title, alert_content, alert_cancel_title," + |
|
|
"alert_cancel_title, alert_cancel_content, raw_data, upload_year, upload_month, upload_day) " + |
|
|
"alert_cancel_content, raw_data, upload_year, upload_month, upload_day) " + |
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + |
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + |
|
|
"ON DUPLICATE KEY UPDATE " + |
|
|
"ON DUPLICATE KEY UPDATE " + |
|
|
"building_id = VALUES(building_id), " + |
|
|
"building_id = VALUES(building_id), " + |
|
|
@ -82,30 +96,31 @@ public class DashboardAlertDao { |
|
|
"upload_day = VALUES(upload_day)", |
|
|
"upload_day = VALUES(upload_day)", |
|
|
new BatchPreparedStatementSetter() { |
|
|
new BatchPreparedStatementSetter() { |
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
public void setValues(PreparedStatement ps, int i) throws SQLException { |
|
|
public void setValues(PreparedStatement ps, int i) throws SQLException { |
|
|
AlertEvent e = list.get(i); |
|
|
DynamodbEntity entity = subList.get(i).getEntity(); |
|
|
|
|
|
ps.setString(1, entity.getDeviceId()); |
|
|
ps.setString(1, e.getEntity().getDeviceId()); |
|
|
ps.setLong(2, entity.getDbBuildingId()); |
|
|
ps.setLong(2, e.getEntity().getDbBuildingId()); |
|
|
ps.setString(3, entity.getStatus()); |
|
|
ps.setString(3, e.getEntity().getStatus()); |
|
|
ps.setLong(4, entity.getReceive_ts()); |
|
|
ps.setLong(4, e.getEntity().getReceive_ts()); |
|
|
ps.setString(5, entity.getAlertTitle()); |
|
|
ps.setString(5, e.getEntity().getAlertTitle()); |
|
|
ps.setString(6, entity.getAlertContent()); |
|
|
ps.setString(6, e.getEntity().getAlertContent()); |
|
|
ps.setString(7, entity.getAlertCancelTitle()); |
|
|
ps.setString(7, e.getEntity().getAlertCancelTitle()); |
|
|
ps.setString(8, entity.getAlertCancelContent()); |
|
|
ps.setString(8, e.getEntity().getAlertCancelContent()); |
|
|
ps.setString(9, entity.getRawData()); |
|
|
ps.setString(9, e.getEntity().getRawData()); |
|
|
ps.setInt(10, Integer.parseInt(entity.getYearKey())); |
|
|
ps.setInt(10, Integer.parseInt(e.getEntity().getYearKey())); |
|
|
ps.setInt(11, Integer.parseInt(entity.getMonthKey())); |
|
|
ps.setInt(11, Integer.parseInt(e.getEntity().getMonthKey())); |
|
|
ps.setInt(12, Integer.parseInt(entity.getDayKey())); |
|
|
ps.setInt(12, Integer.parseInt(e.getEntity().getDayKey())); |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
public int getBatchSize() { |
|
|
public int getBatchSize() { |
|
|
return list.size(); |
|
|
return subList.size(); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
); |
|
|
); |
|
|
|
|
|
} finally { |
|
|
|
|
|
DataSourceContextHolder.clearCurrentDataSourceKey(); |
|
|
|
|
|
} |
|
|
|
|
|
}); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public void insertAlertHistory(DynamodbEntity entity) { |
|
|
public void insertAlertHistory(DynamodbEntity entity) { |
|
|
|