From fa09b4ef55ac15c9cb14bbd6fbeb11b7efdbe32e Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Mon, 13 Oct 2025 09:04:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=80=E5=A4=A7=E5=80=BC=E6=9C=80=E5=B0=8F?= =?UTF-8?q?=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lambda/service/MeasureAverageService.java | 93 +++++++++++++------ 1 file changed, 64 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java b/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java index d498c2e..653ea70 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java +++ b/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java @@ -48,47 +48,58 @@ public class MeasureAverageService { // 3: 查询上一小时的聚合结果(用于回退) LocalDateTime prevHour = start.minusHours(1); - Map prevHourData = queryPrevHourAggregates(conn, schema, targetTable, prevHour); + Map prevHourData = queryPrevHourAggregates(conn, schema, targetTable, prevHour); - // 4: 计算当前小时平均值(如果有) - Map resultMap = new HashMap<>(); + // 4: 计算当前小时平均、最大、最小值 + Map resultMap = new HashMap<>(); for (Map.Entry> entry : currentHourData.entrySet()) { String deviceId = entry.getKey(); - double avg = computeTimeWeightedAverage(entry.getValue(), startTs, endTs); - resultMap.put(deviceId, avg); + List records = entry.getValue(); + + double avg = computeTimeWeightedAverage(records, startTs, endTs); + double max = computeMaxValue(records); + double min = computeMinValue(records); + + resultMap.put(deviceId, new AggregateResult(avg, max, min)); } // 5: 对比集合 A,确保所有设备都有值(否则用上一小时或 0) for (String deviceId : deviceSet) { if (!resultMap.containsKey(deviceId)) { - Double prevVal = prevHourData.get(deviceId); - double value = (prevVal != null) ? prevVal : 0.0; - resultMap.put(deviceId, value); + AggregateResult prev = prevHourData.get(deviceId); + if (prev != null) { + resultMap.put(deviceId, prev); + } else { + resultMap.put(deviceId, new AggregateResult(0.0, 0.0, 0.0)); + } } } // 6: 写入本小时聚合表 String insertSql = String.format( - "INSERT INTO %s.%s (device_id, average_value, date_year, date_month, date_day, date_hour, " + - "time_start, time_end, aggregated_at) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", schema, targetTable + "INSERT INTO %s.%s (device_id, average_value, max_value, min_value, " + + "date_year, date_month, date_day, date_hour, time_start, time_end, aggregated_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + schema, targetTable ); try (PreparedStatement ps = conn.prepareStatement(insertSql)) { - for (Map.Entry entry : resultMap.entrySet()) { + for (Map.Entry entry : resultMap.entrySet()) { String deviceId = entry.getKey(); - double avg = entry.getValue(); + AggregateResult r = entry.getValue(); LocalDateTime dt = start; ps.setString(1, deviceId); - ps.setString(2, String.format("%.6f", avg)); - ps.setInt(3, dt.getYear()); - ps.setInt(4, dt.getMonthValue()); - ps.setInt(5, dt.getDayOfMonth()); - ps.setInt(6, dt.getHour()); - ps.setLong(7, startTs); - ps.setLong(8, endTs); - ps.setTimestamp(9, Timestamp.valueOf(now)); + ps.setString(2, String.format("%.6f", r.avg)); + ps.setString(3, String.format("%.6f", r.max)); + ps.setString(4, String.format("%.6f", r.min)); + ps.setInt(5, dt.getYear()); + ps.setInt(6, dt.getMonthValue()); + ps.setInt(7, dt.getDayOfMonth()); + ps.setInt(8, dt.getHour()); + ps.setLong(9, startTs); + ps.setLong(10, endTs); + ps.setTimestamp(11, Timestamp.valueOf(now)); ps.addBatch(); } @@ -126,10 +137,10 @@ public class MeasureAverageService { } // 查询上一小时聚合结果 - private Map queryPrevHourAggregates(Connection conn, String schema, String table, LocalDateTime prevHour) throws SQLException { - Map map = new HashMap<>(); + private Map queryPrevHourAggregates(Connection conn, String schema, String table, LocalDateTime prevHour) throws SQLException { + Map map = new HashMap<>(); String sql = String.format( - "SELECT device_id, average_value FROM %s.%s " + + "SELECT device_id, average_value, max_value, min_value FROM %s.%s " + "WHERE date_year = ? AND date_month = ? AND date_day = ? AND date_hour = ?", schema, table ); @@ -140,12 +151,17 @@ public class MeasureAverageService { ps.setInt(4, prevHour.getHour()); ResultSet rs = ps.executeQuery(); while (rs.next()) { - map.put(rs.getString("device_id"), rs.getDouble("average_value")); + map.put(rs.getString("device_id"), + new AggregateResult( + rs.getDouble("average_value"), + rs.getDouble("max_value"), + rs.getDouble("min_value"))); } } return map; } + // 时间加权平均 private double computeTimeWeightedAverage(List records, long startTs, long endTs) { if (records == null || records.isEmpty()) return 0.0; @@ -156,15 +172,22 @@ public class MeasureAverageService { Record rec = records.get(i); long nextTs = (i < records.size() - 1) ? records.get(i + 1).timestamp : endTs; double intervalMinutes = (nextTs - rec.timestamp) / 60000.0; // 转分钟 - logger.info("intervalMinutes: {}, rec.value: {}", intervalMinutes, rec.value); sum += rec.value * intervalMinutes; } - logger.info("totalMinutes: {}, sum: {}", totalMinutes, sum); - return totalMinutes > 0 ? sum / totalMinutes : 0.0; } + // 最大值 + private double computeMaxValue(List records) { + return records.stream().mapToDouble(r -> r.value).max().orElse(0.0); + } + + // 最小值 + private double computeMinValue(List records) { + return records.stream().mapToDouble(r -> r.value).min().orElse(0.0); + } + private static class Record { long timestamp; double value; @@ -174,4 +197,16 @@ public class MeasureAverageService { this.value = value; } } -} \ No newline at end of file + + private static class AggregateResult { + double avg; + double max; + double min; + + AggregateResult(double avg, double max, double min) { + this.avg = avg; + this.max = max; + this.min = min; + } + } +}