From aa4d9a3019c27e7b6bffcfa440bef628d860b39e Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Mon, 13 Oct 2025 10:40:29 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E3=80=81=E6=9C=88=E3=80=81=E5=B9=B4?= =?UTF-8?q?=E7=9A=84=E8=81=9A=E5=90=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 7 + .../handler/AccumulateIncrementHandler.java | 4 +- .../lambda/handler/MeasureAverageHandler.java | 3 +- .../aws/lambda/handler/MySQLToS3Handler.java | 4 +- .../aws/lambda/handler/S3ToMySQLHandler.java | 3 +- .../lambda/service/MeasureAverageService.java | 173 ++++++++++++++++++ .../dashboard/aws/lambda/util/DateUtil.java | 81 +++++++- 7 files changed, 267 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 4abeda9..69d67ce 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,13 @@ compile + + + cn.hutool + hutool-core + 5.8.40 + + org.slf4j diff --git a/src/main/java/com/dashboard/aws/lambda/handler/AccumulateIncrementHandler.java b/src/main/java/com/dashboard/aws/lambda/handler/AccumulateIncrementHandler.java index be58334..2d347b4 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/AccumulateIncrementHandler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/AccumulateIncrementHandler.java @@ -23,8 +23,8 @@ public class AccumulateIncrementHandler implements RequestHandler event, Context context) { try { - LocalDateTime now = LocalDateTime.now(ZoneId.of("Asia/Tokyo")); -// LocalDateTime now = LocalDateTime.of(2025, 1, 1, 0, 0, 5); + ZoneId zone = ZoneId.of("Asia/Tokyo"); + LocalDateTime now = DateUtil.resolveEventTime(event, zone); List levels = DateUtil.getAggregationLevels(now); logger.info("Aggregation triggered at: {}, levels: {}", now, levels); diff --git a/src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java b/src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java index 9daf5f0..737c5ac 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java @@ -4,6 +4,7 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.dashboard.aws.lambda.service.MeasureAverageService; import com.dashboard.aws.lambda.service.MySQLService; +import com.dashboard.aws.lambda.util.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +27,7 @@ public class MeasureAverageHandler implements RequestHandler logger.info("company id list: {}", companyIds); ZoneId zone = ZoneId.of("Asia/Tokyo"); - LocalDateTime now = LocalDateTime.now(zone).withMinute(0).withSecond(0).withNano(0); + LocalDateTime now = DateUtil.resolveEventTime(event, zone); LocalDateTime start = now.minusHours(1); for (Long companyId : companyIds) { diff --git a/src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java b/src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java index be201a2..16d02e9 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java @@ -6,6 +6,7 @@ import com.dashboard.aws.lambda.Constants; import com.dashboard.aws.lambda.service.MySQLService; import com.dashboard.aws.lambda.service.S3Service; import com.dashboard.aws.lambda.util.CsvUtil; +import com.dashboard.aws.lambda.util.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,7 +24,8 @@ public class MySQLToS3Handler implements RequestHandler, Str @Override public String handleRequest(Map event, Context context) { try { - LocalDate now = LocalDate.now(ZoneId.of("Asia/Tokyo")); + ZoneId zone = ZoneId.of("Asia/Tokyo"); + LocalDate now = DateUtil.resolveEventDate( event, zone); LocalDate startDate = now.minusDays(5); // 5天前 LocalDate endDate = now.minusDays(3); // 3天前 diff --git a/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java b/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java index 8199fb8..227caca 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java @@ -28,7 +28,8 @@ public class S3ToMySQLHandler implements RequestHandler, Str // LocalDate targetDate = LocalDate.now(ZoneId.of("Asia/Tokyo")).minusDays(3); // String dateStr = targetDate.format(java.time.format.DateTimeFormatter.ISO_DATE); // String dateStr = "2025-10-08"; - LocalDate now = LocalDate.now(ZoneId.of("Asia/Tokyo")); + ZoneId zone = ZoneId.of("Asia/Tokyo"); + LocalDate now = DateUtil.resolveEventDate( event, zone); String dateStr = DateUtil.getLastYearSameIsoWeekDayStr(now); // 查询符合条件的企业ID diff --git a/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java b/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java index 653ea70..2461629 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java +++ b/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java @@ -108,11 +108,184 @@ public class MeasureAverageService { logger.info("Hourly aggregation finished for {} devices", resultMap.size()); + // 判断是否到达每日 / 每月 / 每年聚合时间点 + if (now.getHour() == 0) { + aggregateLastDay(companyId, now, now.toLocalDate().minusDays(1)); // 昨天的聚合 + } + if (now.getDayOfMonth() == 1 && now.getHour() == 0) { + aggregateLastMonth(companyId, now, now.toLocalDate().minusMonths(1)); // 上个月的聚合 + } + if (now.getMonthValue() == 1 && now.getDayOfMonth() == 1 && now.getHour() == 0) { + aggregateLastYear(companyId, now, now.toLocalDate().minusYears(1)); // 上一年的聚合 + } + } catch (SQLException e) { logger.error("Hourly aggregation failed", e); } } + public void aggregateLastDay(Long companyId, LocalDateTime now, LocalDate targetDate) { + String schema = "data_center_dongjian_" + companyId; + String sourceTable = "dashboard_aggregate_measure_hour"; + String targetTable = "dashboard_aggregate_measure_day"; + + logger.info("Aggregating daily data for company: {}, date {}", companyId, targetDate); + + String sql = String.format( + "SELECT device_id, (SUM(COALESCE(CAST(NULLIF(average_value, '') AS DECIMAL(20,6)), 0)) / 24.0) AS avg_val, " + + "MAX(max_value) AS max_val, MIN(min_value) AS min_val " + + "FROM %s.%s WHERE date_year = ? AND date_month = ? AND date_day = ? GROUP BY device_id", + schema, sourceTable + ); + logger.info("sql:{}, date_year:{}, date_month:{}, date_day:{}", sql, targetDate.getYear(), targetDate.getMonthValue(), targetDate.getDayOfMonth()); + + String insertSql = String.format( + "INSERT INTO %s.%s (device_id, average_value, max_value, min_value, date_year, date_month, date_day, aggregated_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + schema, targetTable + ); + + try (Connection conn = getConnection(); + PreparedStatement psSelect = conn.prepareStatement(sql); + PreparedStatement psInsert = conn.prepareStatement(insertSql)) { + + psSelect.setInt(1, targetDate.getYear()); + psSelect.setInt(2, targetDate.getMonthValue()); + psSelect.setInt(3, targetDate.getDayOfMonth()); + ResultSet rs = psSelect.executeQuery(); + + while (rs.next()) { + String deviceId = rs.getString("device_id"); + double avg = rs.getDouble("avg_val"); + double max = rs.getDouble("max_val"); + double min = rs.getDouble("min_val"); + + psInsert.setString(1, deviceId); + psInsert.setDouble(2, avg); + psInsert.setDouble(3, max); + psInsert.setDouble(4, min); + psInsert.setInt(5, targetDate.getYear()); + psInsert.setInt(6, targetDate.getMonthValue()); + psInsert.setInt(7, targetDate.getDayOfMonth()); + psInsert.setTimestamp(8, Timestamp.valueOf(now)); + psInsert.addBatch(); + } + psInsert.executeBatch(); + logger.info("Daily aggregation completed for {}", targetDate); + + } catch (SQLException e) { + logger.error("Daily aggregation failed for {}", targetDate, e); + } + } + + public void aggregateLastMonth(Long companyId, LocalDateTime now, LocalDate targetMonth) { + String schema = "data_center_dongjian_" + companyId; + String sourceTable = "dashboard_aggregate_measure_day"; + String targetTable = "dashboard_aggregate_measure_month"; + + logger.info("Aggregating monthly data for company:{}, month: {}", companyId, targetMonth.getMonthValue()); + + // 天数作为分母 + int daysInMonth = YearMonth.of(targetMonth.getYear(), targetMonth.getMonthValue()).lengthOfMonth(); + + String sql = String.format( + "SELECT device_id, (SUM(COALESCE(CAST(NULLIF(average_value, '') AS DECIMAL(20,6)), 0)) / %d.0) AS avg_val, " + + "MAX(max_value) AS max_val, MIN(min_value) AS min_val " + + "FROM %s.%s WHERE date_year = ? AND date_month = ? GROUP BY device_id", + daysInMonth, schema, sourceTable + ); + logger.info("sql:{}, date_year:{}, date_month:{}", sql, targetMonth.getYear(), targetMonth.getMonthValue()); + + String insertSql = String.format( + "INSERT INTO %s.%s (device_id, average_value, max_value, min_value, date_year, date_month, aggregated_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?)", + schema, targetTable + ); + + try (Connection conn = getConnection(); + PreparedStatement psSelect = conn.prepareStatement(sql); + PreparedStatement psInsert = conn.prepareStatement(insertSql)) { + + psSelect.setInt(1, targetMonth.getYear()); + psSelect.setInt(2, targetMonth.getMonthValue()); + ResultSet rs = psSelect.executeQuery(); + + while (rs.next()) { + String deviceId = rs.getString("device_id"); + double avg = rs.getDouble("avg_val"); + double max = rs.getDouble("max_val"); + double min = rs.getDouble("min_val"); + + psInsert.setString(1, deviceId); + psInsert.setDouble(2, avg); + psInsert.setDouble(3, max); + psInsert.setDouble(4, min); + psInsert.setInt(5, targetMonth.getYear()); + psInsert.setInt(6, targetMonth.getMonthValue()); + psInsert.setTimestamp(7, Timestamp.valueOf(now)); + psInsert.addBatch(); + } + psInsert.executeBatch(); + + logger.info("Monthly aggregation completed for {}", targetMonth); + + } catch (SQLException e) { + logger.error("Monthly aggregation failed for {}", targetMonth, e); + } + } + + public void aggregateLastYear(Long companyId, LocalDateTime now, LocalDate targetYear) { + String schema = "data_center_dongjian_" + companyId; + String sourceTable = "dashboard_aggregate_measure_month"; + String targetTable = "dashboard_aggregate_measure_year"; + + logger.info("Aggregating yearly data for company:{}, year: {}", companyId, targetYear.getYear()); + + String sql = String.format( + "SELECT device_id, (SUM(COALESCE(CAST(NULLIF(average_value, '') AS DECIMAL(20,6)), 0)) / 12.0) AS avg_val," + + " MAX(max_value) AS max_val, MIN(min_value) AS min_val " + + "FROM %s.%s WHERE date_year = ? GROUP BY device_id", + schema, sourceTable + ); + logger.info("sql:{}, date_year:{}", sql, targetYear.getYear()); + + String insertSql = String.format( + "INSERT INTO %s.%s (device_id, average_value, max_value, min_value, date_year, aggregated_at) " + + "VALUES (?, ?, ?, ?, ?, ?)", + schema, targetTable + ); + + try (Connection conn = getConnection(); + PreparedStatement psSelect = conn.prepareStatement(sql); + PreparedStatement psInsert = conn.prepareStatement(insertSql)) { + + psSelect.setInt(1, targetYear.getYear()); + ResultSet rs = psSelect.executeQuery(); + + while (rs.next()) { + String deviceId = rs.getString("device_id"); + double avg = rs.getDouble("avg_val"); + double max = rs.getDouble("max_val"); + double min = rs.getDouble("min_val"); + + psInsert.setString(1, deviceId); + psInsert.setDouble(2, avg); + psInsert.setDouble(3, max); + psInsert.setDouble(4, min); + psInsert.setInt(5, targetYear.getYear()); + psInsert.setTimestamp(6, Timestamp.valueOf(now)); + psInsert.addBatch(); + } + psInsert.executeBatch(); + + logger.info("Yearly aggregation completed for {}", targetYear.getYear()); + + } catch (SQLException e) { + logger.error("Yearly aggregation failed for {}", targetYear.getYear(), e); + } + } + + // 查询当前小时数据 private Map> queryRecordData(Connection conn, String schema, String table, long startTs, long endTs) throws SQLException { String sql = String.format( diff --git a/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java b/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java index e4fd87e..6ccb5f0 100644 --- a/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java +++ b/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java @@ -1,17 +1,21 @@ package com.dashboard.aws.lambda.util; +import cn.hutool.core.map.MapUtil; import com.dashboard.aws.lambda.entity.AggregationLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.time.DayOfWeek; -import java.time.LocalDate; -import java.time.LocalDateTime; +import java.time.*; import java.time.format.DateTimeFormatter; import java.time.temporal.IsoFields; import java.util.ArrayList; import java.util.List; +import java.util.Map; public class DateUtil { + private static final Logger logger = LoggerFactory.getLogger(DateUtil.class); + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ISO_DATE; /** @@ -88,4 +92,75 @@ public class DateUtil { default -> now; }; } + + /** + * 从 event 参数中解析时间(带兜底逻辑) + * @param event Lambda 事件参数 + * @param zone 时区(默认 Asia/Tokyo) + * @return LocalDateTime 对象 + + { + "year": "2026", + "month": "1", + "day": "1", + "hour": "0", + "minute": "0", + "second": "5" + } + + */ + public static LocalDateTime resolveEventTime(Map event, ZoneId zone) { + LocalDateTime now = LocalDateTime.now(zone) + .withMinute(0) + .withSecond(0) + .withNano(0); + + if (MapUtil.isNotEmpty(event)) { + Integer year = MapUtil.getInt(event, "year"); + Integer month = MapUtil.getInt(event, "month"); + Integer day = MapUtil.getInt(event, "day"); + Integer hour = MapUtil.getInt(event, "hour"); + Integer minute = MapUtil.getInt(event, "minute"); + Integer second = MapUtil.getInt(event, "second"); + + if (year != null && month != null && day != null) { + now = LocalDateTime.of( + year, + month, + day, + hour != null ? hour : 0, + minute != null ? minute : 0, + second != null ? second : 0 + ); + logger.info("Using event time: {}", now); + } else { + logger.info("Using default Tokyo time (missing year/month/day): {}", now); + } + } else { + logger.info("Using default Tokyo time (no event provided): {}", now); + } + + return now; + } + + public static LocalDate resolveEventDate(Map event, ZoneId zone) { + LocalDate date = ZonedDateTime.now(zone).toLocalDate(); + + if (MapUtil.isNotEmpty(event)) { + Integer year = MapUtil.getInt(event, "year"); + Integer month = MapUtil.getInt(event, "month"); + Integer day = MapUtil.getInt(event, "day"); + + if (year != null && month != null && day != null) { + date = LocalDate.of(year, month, day); + logger.info("Using event date: {}", date); + } else { + logger.info("Using default Tokyo date (missing year/month/day): {}", date); + } + } else { + logger.info("Using default Tokyo date (no event provided): {}", date); + } + + return date; + } }