diff --git a/src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java b/src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java index f385adb..32cd108 100644 --- a/src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java +++ b/src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java @@ -74,58 +74,73 @@ public class DashboardAlertDao { .filter(e -> e.getEntity() != null && e.getEntity().getCompanyId() != null) .collect(Collectors.groupingBy(e -> e.getEntity().getCompanyId())); - grouped.forEach((companyId, subList) -> { + grouped.forEach((companyId, companyList) -> { long topCompanyId = companyInfoDao.getTopCompanyId(String.valueOf(companyId)); String dsKey = "dataSourceForCompany_" + topCompanyId; try { DataSourceContextHolder.setCurrentDataSourceKey(dsKey); - // 批量执行 upsert - StringBuilder sql = new StringBuilder( - "INSERT INTO device_rawdata_realtime (" + - "device_id, building_id, status, dashboard_status, receive_ts, " + - "alert_title, alert_content, alert_cancel_title, alert_cancel_content, " + - "raw_data, upload_year, upload_month, upload_day) VALUES " - ); - List params = new ArrayList<>(); - for (int i = 0; i < subList.size(); i++) { - DynamodbEntity entity = subList.get(i).getEntity(); - - sql.append("(?,?,?,?,?,?,?,?,?,?,?,?,?)"); - if (i < subList.size() - 1) { - sql.append(","); + // 再按 retainAlert 分组(关键) + Map> retainGroup = + companyList.stream() + .collect(Collectors.groupingBy( + e -> e.getEntity().getRetainAlert() + )); + + // 每个 retainAlert 执行一次 batch upsert + retainGroup.forEach((retainAlert, subList) -> { + if (subList.isEmpty()) { + return; } - params.add(entity.getDeviceId()); - params.add(entity.getDbBuildingId()); - params.add(entity.getStatus()); - params.add(entity.getStatus()); - params.add(entity.getReceive_ts()); - params.add(entity.getAlertTitle()); - params.add(entity.getAlertContent()); - params.add(entity.getAlertCancelTitle()); - params.add(entity.getAlertCancelContent()); - params.add(entity.getRawData()); - params.add(Integer.parseInt(entity.getYearKey())); - params.add(Integer.parseInt(entity.getMonthKey())); - params.add(Integer.parseInt(entity.getDayKey())); - //给下面dashboard_status = IF(? = 1 里的?用 - params.add(entity.getRetainAlert()); - } - sql.append(" ON DUPLICATE KEY UPDATE ") - .append("building_id = VALUES(building_id), ") - .append("status = VALUES(status), ") - .append("receive_ts = VALUES(receive_ts), ") - .append("alert_title = VALUES(alert_title), ") - .append("alert_content = VALUES(alert_content), ") - .append("alert_cancel_title = VALUES(alert_cancel_title), ") - .append("alert_cancel_content = VALUES(alert_cancel_content), ") - .append("raw_data = VALUES(raw_data), ") - .append("upload_year = VALUES(upload_year), ") - .append("upload_month = VALUES(upload_month), ") - .append("upload_day = VALUES(upload_day), ") - .append("dashboard_status = IF(? = 1 AND VALUES(status) = 'alert_cancel' AND dashboard_status = 'alert', dashboard_status, VALUES(status))"); - jdbcTemplate.update(sql.toString(), params.toArray()); + // 批量执行 upsert + StringBuilder sql = new StringBuilder( + "INSERT INTO device_rawdata_realtime (" + + "device_id, building_id, status, dashboard_status, receive_ts, " + + "alert_title, alert_content, alert_cancel_title, alert_cancel_content, " + + "raw_data, upload_year, upload_month, upload_day) VALUES " + ); + List params = new ArrayList<>(); + for (int i = 0; i < subList.size(); i++) { + DynamodbEntity entity = subList.get(i).getEntity(); + + sql.append("(?,?,?,?,?,?,?,?,?,?,?,?,?)"); + if (i < subList.size() - 1) { + sql.append(","); + } + + params.add(entity.getDeviceId()); + params.add(entity.getDbBuildingId()); + params.add(entity.getStatus()); + params.add(entity.getStatus()); + params.add(entity.getReceive_ts()); + params.add(entity.getAlertTitle()); + params.add(entity.getAlertContent()); + params.add(entity.getAlertCancelTitle()); + params.add(entity.getAlertCancelContent()); + params.add(entity.getRawData()); + params.add(Integer.parseInt(entity.getYearKey())); + params.add(Integer.parseInt(entity.getMonthKey())); + params.add(Integer.parseInt(entity.getDayKey())); + } + // ON DUPLICATE KEY UPDATE(retainAlert 只绑定一次) + sql.append(" ON DUPLICATE KEY UPDATE ") + .append("building_id = VALUES(building_id), ") + .append("status = VALUES(status), ") + .append("receive_ts = VALUES(receive_ts), ") + .append("alert_title = VALUES(alert_title), ") + .append("alert_content = VALUES(alert_content), ") + .append("alert_cancel_title = VALUES(alert_cancel_title), ") + .append("alert_cancel_content = VALUES(alert_cancel_content), ") + .append("raw_data = VALUES(raw_data), ") + .append("upload_year = VALUES(upload_year), ") + .append("upload_month = VALUES(upload_month), ") + .append("upload_day = VALUES(upload_day), ") + .append("dashboard_status = IF(? = 1 AND VALUES(status) = 'alert_cancel' AND dashboard_status = 'alert', dashboard_status, VALUES(status))"); + //retainAlert 分组了,只加一次 + params.add(retainAlert); + jdbcTemplate.update(sql.toString(), params.toArray()); + }); } finally { DataSourceContextHolder.clearCurrentDataSourceKey();