|
|
|
@ -74,58 +74,73 @@ public class DashboardAlertDao { |
|
|
|
.filter(e -> e.getEntity() != null && e.getEntity().getCompanyId() != null) |
|
|
|
.collect(Collectors.groupingBy(e -> e.getEntity().getCompanyId())); |
|
|
|
|
|
|
|
grouped.forEach((companyId, subList) -> { |
|
|
|
grouped.forEach((companyId, companyList) -> { |
|
|
|
long topCompanyId = companyInfoDao.getTopCompanyId(String.valueOf(companyId)); |
|
|
|
String dsKey = "dataSourceForCompany_" + topCompanyId; |
|
|
|
try { |
|
|
|
DataSourceContextHolder.setCurrentDataSourceKey(dsKey); |
|
|
|
|
|
|
|
// 批量执行 upsert
|
|
|
|
StringBuilder sql = new StringBuilder( |
|
|
|
"INSERT INTO device_rawdata_realtime (" + |
|
|
|
"device_id, building_id, status, dashboard_status, receive_ts, " + |
|
|
|
"alert_title, alert_content, alert_cancel_title, alert_cancel_content, " + |
|
|
|
"raw_data, upload_year, upload_month, upload_day) VALUES " |
|
|
|
); |
|
|
|
List<Object> params = new ArrayList<>(); |
|
|
|
for (int i = 0; i < subList.size(); i++) { |
|
|
|
DynamodbEntity entity = subList.get(i).getEntity(); |
|
|
|
|
|
|
|
sql.append("(?,?,?,?,?,?,?,?,?,?,?,?,?)"); |
|
|
|
if (i < subList.size() - 1) { |
|
|
|
sql.append(","); |
|
|
|
// 再按 retainAlert 分组(关键)
|
|
|
|
Map<Integer, List<AlertEvent>> retainGroup = |
|
|
|
companyList.stream() |
|
|
|
.collect(Collectors.groupingBy( |
|
|
|
e -> e.getEntity().getRetainAlert() |
|
|
|
)); |
|
|
|
|
|
|
|
// 每个 retainAlert 执行一次 batch upsert
|
|
|
|
retainGroup.forEach((retainAlert, subList) -> { |
|
|
|
if (subList.isEmpty()) { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
params.add(entity.getDeviceId()); |
|
|
|
params.add(entity.getDbBuildingId()); |
|
|
|
params.add(entity.getStatus()); |
|
|
|
params.add(entity.getStatus()); |
|
|
|
params.add(entity.getReceive_ts()); |
|
|
|
params.add(entity.getAlertTitle()); |
|
|
|
params.add(entity.getAlertContent()); |
|
|
|
params.add(entity.getAlertCancelTitle()); |
|
|
|
params.add(entity.getAlertCancelContent()); |
|
|
|
params.add(entity.getRawData()); |
|
|
|
params.add(Integer.parseInt(entity.getYearKey())); |
|
|
|
params.add(Integer.parseInt(entity.getMonthKey())); |
|
|
|
params.add(Integer.parseInt(entity.getDayKey())); |
|
|
|
//给下面dashboard_status = IF(? = 1 里的?用
|
|
|
|
params.add(entity.getRetainAlert()); |
|
|
|
} |
|
|
|
sql.append(" ON DUPLICATE KEY UPDATE ") |
|
|
|
.append("building_id = VALUES(building_id), ") |
|
|
|
.append("status = VALUES(status), ") |
|
|
|
.append("receive_ts = VALUES(receive_ts), ") |
|
|
|
.append("alert_title = VALUES(alert_title), ") |
|
|
|
.append("alert_content = VALUES(alert_content), ") |
|
|
|
.append("alert_cancel_title = VALUES(alert_cancel_title), ") |
|
|
|
.append("alert_cancel_content = VALUES(alert_cancel_content), ") |
|
|
|
.append("raw_data = VALUES(raw_data), ") |
|
|
|
.append("upload_year = VALUES(upload_year), ") |
|
|
|
.append("upload_month = VALUES(upload_month), ") |
|
|
|
.append("upload_day = VALUES(upload_day), ") |
|
|
|
.append("dashboard_status = IF(? = 1 AND VALUES(status) = 'alert_cancel' AND dashboard_status = 'alert', dashboard_status, VALUES(status))"); |
|
|
|
jdbcTemplate.update(sql.toString(), params.toArray()); |
|
|
|
// 批量执行 upsert
|
|
|
|
StringBuilder sql = new StringBuilder( |
|
|
|
"INSERT INTO device_rawdata_realtime (" + |
|
|
|
"device_id, building_id, status, dashboard_status, receive_ts, " + |
|
|
|
"alert_title, alert_content, alert_cancel_title, alert_cancel_content, " + |
|
|
|
"raw_data, upload_year, upload_month, upload_day) VALUES " |
|
|
|
); |
|
|
|
List<Object> params = new ArrayList<>(); |
|
|
|
for (int i = 0; i < subList.size(); i++) { |
|
|
|
DynamodbEntity entity = subList.get(i).getEntity(); |
|
|
|
|
|
|
|
sql.append("(?,?,?,?,?,?,?,?,?,?,?,?,?)"); |
|
|
|
if (i < subList.size() - 1) { |
|
|
|
sql.append(","); |
|
|
|
} |
|
|
|
|
|
|
|
params.add(entity.getDeviceId()); |
|
|
|
params.add(entity.getDbBuildingId()); |
|
|
|
params.add(entity.getStatus()); |
|
|
|
params.add(entity.getStatus()); |
|
|
|
params.add(entity.getReceive_ts()); |
|
|
|
params.add(entity.getAlertTitle()); |
|
|
|
params.add(entity.getAlertContent()); |
|
|
|
params.add(entity.getAlertCancelTitle()); |
|
|
|
params.add(entity.getAlertCancelContent()); |
|
|
|
params.add(entity.getRawData()); |
|
|
|
params.add(Integer.parseInt(entity.getYearKey())); |
|
|
|
params.add(Integer.parseInt(entity.getMonthKey())); |
|
|
|
params.add(Integer.parseInt(entity.getDayKey())); |
|
|
|
} |
|
|
|
// ON DUPLICATE KEY UPDATE(retainAlert 只绑定一次)
|
|
|
|
sql.append(" ON DUPLICATE KEY UPDATE ") |
|
|
|
.append("building_id = VALUES(building_id), ") |
|
|
|
.append("status = VALUES(status), ") |
|
|
|
.append("receive_ts = VALUES(receive_ts), ") |
|
|
|
.append("alert_title = VALUES(alert_title), ") |
|
|
|
.append("alert_content = VALUES(alert_content), ") |
|
|
|
.append("alert_cancel_title = VALUES(alert_cancel_title), ") |
|
|
|
.append("alert_cancel_content = VALUES(alert_cancel_content), ") |
|
|
|
.append("raw_data = VALUES(raw_data), ") |
|
|
|
.append("upload_year = VALUES(upload_year), ") |
|
|
|
.append("upload_month = VALUES(upload_month), ") |
|
|
|
.append("upload_day = VALUES(upload_day), ") |
|
|
|
.append("dashboard_status = IF(? = 1 AND VALUES(status) = 'alert_cancel' AND dashboard_status = 'alert', dashboard_status, VALUES(status))"); |
|
|
|
//retainAlert 分组了,只加一次
|
|
|
|
params.add(retainAlert); |
|
|
|
jdbcTemplate.update(sql.toString(), params.toArray()); |
|
|
|
}); |
|
|
|
|
|
|
|
} finally { |
|
|
|
DataSourceContextHolder.clearCurrentDataSourceKey(); |
|
|
|
|