diff --git a/src/main/java/com/dashboard/aws/lambda/entity/AggregationLevel.java b/src/main/java/com/dashboard/aws/lambda/entity/AggregationLevel.java new file mode 100644 index 0000000..53acb46 --- /dev/null +++ b/src/main/java/com/dashboard/aws/lambda/entity/AggregationLevel.java @@ -0,0 +1,10 @@ +package com.dashboard.aws.lambda.entity; + +public enum AggregationLevel { + MIN_30, + HOUR, + DAY, + MONTH, + YEAR +} + diff --git a/src/main/java/com/dashboard/aws/lambda/handler/AggregationHandler.java b/src/main/java/com/dashboard/aws/lambda/handler/AggregationHandler.java new file mode 100644 index 0000000..fc7b421 --- /dev/null +++ b/src/main/java/com/dashboard/aws/lambda/handler/AggregationHandler.java @@ -0,0 +1,50 @@ +package com.dashboard.aws.lambda.handler; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.dashboard.aws.lambda.entity.AggregationLevel; +import com.dashboard.aws.lambda.service.AggregationService; +import com.dashboard.aws.lambda.service.MySQLService; +import com.dashboard.aws.lambda.util.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Map; + +public class AggregationHandler implements RequestHandler, String> { + + private static final Logger logger = LoggerFactory.getLogger(AggregationHandler.class); + private final AggregationService aggregationService = new AggregationService(); + private final MySQLService mysqlService = new MySQLService(); + + @Override + public String handleRequest(Map event, Context context) { + try { + LocalDateTime now = LocalDateTime.now(ZoneId.of("Asia/Tokyo")); +// LocalDateTime now = LocalDateTime.of(2025, 1, 1, 0, 0, 5); + List levels = DateUtil.getAggregationLevels(now); + + logger.info("Aggregation triggered at: {}, levels: {}", now, levels); + + // 从数据库或配置中取企业列表 + List companyIds = mysqlService.getActiveCompanyIds(); + logger.info("company id list: {}", companyIds); + + for (Long companyId : companyIds) { + for (AggregationLevel level : levels) { + logger.info("Start aggregation [company:{}, level:{}]", companyId, level); + aggregationService.aggregate(companyId, level, now); + } + } + + return "Aggregation finished at: " + now; + + } catch (Exception e) { + logger.error("Aggregation error", e); + return "Aggregation error:" + e.getMessage(); + } + } +} diff --git a/src/main/java/com/dashboard/aws/lambda/service/AggregationService.java b/src/main/java/com/dashboard/aws/lambda/service/AggregationService.java new file mode 100644 index 0000000..5afa054 --- /dev/null +++ b/src/main/java/com/dashboard/aws/lambda/service/AggregationService.java @@ -0,0 +1,182 @@ +package com.dashboard.aws.lambda.service; + +import com.dashboard.aws.lambda.entity.AggregationLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.LocalDateTime; + +public class AggregationService { + + private static final Logger logger = LoggerFactory.getLogger(AggregationService.class); + + private static final String MYSQL_URL = System.getenv("DB_URL"); + private static final String DB_USER = System.getenv("DB_USER"); + private static final String DB_PASSWORD = System.getenv("DB_PASSWORD"); + + private Connection getConnection() throws SQLException { + return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); + } + + public void aggregate(Long companyId, AggregationLevel level, LocalDateTime now) { + String sourceTable; + String targetTable; + String sourceColumn; + String timeWhereColumn; + + switch (level) { + case MIN_30 -> { + sourceTable = "dashboard_record_accumulate"; + targetTable = "dashboard_aggregate_30min"; + sourceColumn = "increment_minute"; + timeWhereColumn = "upload_at"; + } + case HOUR -> { + sourceTable = "dashboard_aggregate_30min"; + targetTable = "dashboard_aggregate_hour"; + sourceColumn = "aggregate_value"; + timeWhereColumn = "time_start"; + } + case DAY -> { + sourceTable = "dashboard_aggregate_hour"; + targetTable = "dashboard_aggregate_day"; + sourceColumn = "aggregate_value"; + timeWhereColumn = "time_start"; + } + case MONTH -> { + sourceTable = "dashboard_aggregate_day"; + targetTable = "dashboard_aggregate_month"; + sourceColumn = "aggregate_value"; + timeWhereColumn = "time_start"; + } + case YEAR -> { + sourceTable = "dashboard_aggregate_month"; + targetTable = "dashboard_aggregate_year"; + sourceColumn = "aggregate_value"; + timeWhereColumn = "time_start"; + } + default -> { + logger.warn("Unsupported aggregation level: {}", level); + return; + } + } + + // --- 1) 计算 start/end(东京时区),并计算要写入的 date 字段 --- + java.time.ZoneId zone = java.time.ZoneId.of("Asia/Tokyo"); + + java.time.LocalDateTime end; // 区间右开端点(不包含) + java.time.LocalDateTime start; // 区间起点(包含) + + switch (level) { + case MIN_30: + // 触发时应该是在 minute == 0 或 30。以当前时间向下取整到 0 或 30。 + int minute = now.getMinute(); + int roundedMinute = (minute >= 30) ? 30 : 0; + end = now.withMinute(roundedMinute).withSecond(0).withNano(0); + // 如果 now 正好在 0/30 分(触发时),end==now; 否则我们仍以向下取整后的 end 为区间结束 + start = end.minusMinutes(30); + break; + case HOUR: + end = now.withMinute(0).withSecond(0).withNano(0); + start = end.minusHours(1); + break; + case DAY: + end = now.withHour(0).withMinute(0).withSecond(0).withNano(0); + start = end.minusDays(1); + break; + case MONTH: + end = now.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0); + start = end.minusMonths(1); + break; + case YEAR: + end = now.withMonth(1).withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0); + start = end.minusYears(1); + break; + default: + logger.warn("Unsupported level for time calc: {}", level); + return; + } + + long startMillis = start.atZone(zone).toInstant().toEpochMilli(); + long endMillis = end.atZone(zone).toInstant().toEpochMilli(); + + // 计算写入的 date 字段(基于 end 的“窗口标识时间”) + int y = end.getYear(); + int m = end.getMonthValue(); + int d = end.getDayOfMonth(); + int h = end.getHour(); + int mm = (level == AggregationLevel.MIN_30) ? (end.getMinute() == 0 ? 0 : 30) : end.getMinute(); + + // 根据级别决定要写入哪些 date 列和参数顺序 + String dateCols; + java.util.List dateValues = new java.util.ArrayList<>(); + switch (level) { + case MIN_30 -> { + dateCols = "date_year, date_month, date_day, date_hour, date_minute"; + dateValues.add(y); dateValues.add(m); dateValues.add(d); dateValues.add(h); dateValues.add(mm); + } + case HOUR -> { + dateCols = "date_year, date_month, date_day, date_hour"; + dateValues.add(y); dateValues.add(m); dateValues.add(d); dateValues.add(h); + } + case DAY -> { + dateCols = "date_year, date_month, date_day"; + dateValues.add(y); dateValues.add(m); dateValues.add(d); + } + case MONTH -> { + dateCols = "date_year, date_month"; + dateValues.add(y); dateValues.add(m); + } + case YEAR -> { + dateCols = "date_year"; + dateValues.add(y); + } + default -> { + logger.warn("unexpected level"); + return; + } + } + + String schema = "data_center_dongjian_" + companyId; + + // --- 2) SQL:用占位符传入 date 字段 + start/end 时间戳(毫秒) --- + String sql = String.format( + "INSERT INTO %s.%s (device_id, aggregate_value, %s, time_start, time_end, aggregated_at) " + + "SELECT device_id, SUM(CAST(%s AS DECIMAL(20,6))) AS aggregate_value, %s, ?, ?, ? " + + "FROM %s.%s " + + "WHERE %s >= ? AND %s < ? " + + "GROUP BY device_id", + schema, targetTable, dateCols, + sourceColumn, + // 下面占位符用于 date 列,会被 PreparedStatement 替换(数量与 dateValues.size() 一致) + String.join(", ", java.util.Collections.nCopies(dateValues.size(), "?")), + schema, sourceTable, timeWhereColumn, timeWhereColumn + ); + + try (Connection conn = getConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + + int idx = 1; + for (Object v : dateValues) { + if (v instanceof Integer) ps.setInt(idx++, (Integer) v); + else if (v instanceof Long) ps.setLong(idx++, (Long) v); + else ps.setObject(idx++, v); + } + ps.setObject(idx++, startMillis); + ps.setObject(idx++, endMillis); + // aggregated_at + ps.setObject(idx++, now); + ps.setLong(idx++, startMillis); + ps.setLong(idx, endMillis); + + int inserted = ps.executeUpdate(); + logger.info("[company:{}][{}] aggregated {} -> rows inserted: {}, start={}, end={}", companyId, level, targetTable, inserted, startMillis, endMillis); + } catch (SQLException e) { + logger.error("[company:{}][{}] aggregation failed, start={}, end={}", companyId, level, startMillis, endMillis, e); + } + } +} diff --git a/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java b/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java index 69b8e6a..e4fd87e 100644 --- a/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java +++ b/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java @@ -1,9 +1,14 @@ package com.dashboard.aws.lambda.util; +import com.dashboard.aws.lambda.entity.AggregationLevel; + import java.time.DayOfWeek; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.IsoFields; +import java.util.ArrayList; +import java.util.List; public class DateUtil { @@ -37,4 +42,50 @@ public class DateUtil { return lastYearDate.format(FORMATTER); } + + /** + * 判断当前时刻需要执行的聚合层级(可能多个) + * 例如:每天0点 -> 执行 [30分钟, 小时, 天] + * 每月1号0点 -> 执行 [30分钟, 小时, 天, 月] + * 每年1月1号0点 -> 执行 [30分钟, 小时, 天, 月, 年] + */ + public static List getAggregationLevels(LocalDateTime now) { + List levels = new ArrayList<>(); + + // 任何时候都执行30分钟聚合 + levels.add(AggregationLevel.MIN_30); + + // 整点 + if (now.getMinute() == 0) { + levels.add(AggregationLevel.HOUR); + } + + // 每天0点 + if (now.getMinute() == 0 && now.getHour() == 0) { + levels.add(AggregationLevel.DAY); + } + + // 每月1号0点 + if (now.getMinute() == 0 && now.getHour() == 0 && now.getDayOfMonth() == 1) { + levels.add(AggregationLevel.MONTH); + } + + // 每年1月1号0点 + if (now.getMinute() == 0 && now.getHour() == 0 && now.getDayOfMonth() == 1 && now.getMonthValue() == 1) { + levels.add(AggregationLevel.YEAR); + } + + return levels; + } + + public static LocalDateTime getStartTime(AggregationLevel level, LocalDateTime now) { + return switch (level) { + case MIN_30 -> now.minusMinutes(30); + case HOUR -> now.minusHours(1); + case DAY -> now.minusDays(1); + case MONTH -> now.minusMonths(1); + case YEAR -> now.minusYears(1); + default -> now; + }; + } }