Browse Source

累积设备聚合

master
review512jwy@163.com 10 hours ago
parent
commit
7444acae87
  1. 10
      src/main/java/com/dashboard/aws/lambda/entity/AggregationLevel.java
  2. 50
      src/main/java/com/dashboard/aws/lambda/handler/AggregationHandler.java
  3. 182
      src/main/java/com/dashboard/aws/lambda/service/AggregationService.java
  4. 51
      src/main/java/com/dashboard/aws/lambda/util/DateUtil.java

10
src/main/java/com/dashboard/aws/lambda/entity/AggregationLevel.java

@ -0,0 +1,10 @@
package com.dashboard.aws.lambda.entity;
public enum AggregationLevel {
MIN_30,
HOUR,
DAY,
MONTH,
YEAR
}

50
src/main/java/com/dashboard/aws/lambda/handler/AggregationHandler.java

@ -0,0 +1,50 @@
package com.dashboard.aws.lambda.handler;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.dashboard.aws.lambda.entity.AggregationLevel;
import com.dashboard.aws.lambda.service.AggregationService;
import com.dashboard.aws.lambda.service.MySQLService;
import com.dashboard.aws.lambda.util.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
public class AggregationHandler implements RequestHandler<Map<String, Object>, String> {
private static final Logger logger = LoggerFactory.getLogger(AggregationHandler.class);
private final AggregationService aggregationService = new AggregationService();
private final MySQLService mysqlService = new MySQLService();
@Override
public String handleRequest(Map<String, Object> event, Context context) {
try {
LocalDateTime now = LocalDateTime.now(ZoneId.of("Asia/Tokyo"));
// LocalDateTime now = LocalDateTime.of(2025, 1, 1, 0, 0, 5);
List<AggregationLevel> levels = DateUtil.getAggregationLevels(now);
logger.info("Aggregation triggered at: {}, levels: {}", now, levels);
// 从数据库或配置中取企业列表
List<Long> companyIds = mysqlService.getActiveCompanyIds();
logger.info("company id list: {}", companyIds);
for (Long companyId : companyIds) {
for (AggregationLevel level : levels) {
logger.info("Start aggregation [company:{}, level:{}]", companyId, level);
aggregationService.aggregate(companyId, level, now);
}
}
return "Aggregation finished at: " + now;
} catch (Exception e) {
logger.error("Aggregation error", e);
return "Aggregation error:" + e.getMessage();
}
}
}

182
src/main/java/com/dashboard/aws/lambda/service/AggregationService.java

@ -0,0 +1,182 @@
package com.dashboard.aws.lambda.service;
import com.dashboard.aws.lambda.entity.AggregationLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.LocalDateTime;
public class AggregationService {
private static final Logger logger = LoggerFactory.getLogger(AggregationService.class);
private static final String MYSQL_URL = System.getenv("DB_URL");
private static final String DB_USER = System.getenv("DB_USER");
private static final String DB_PASSWORD = System.getenv("DB_PASSWORD");
private Connection getConnection() throws SQLException {
return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
}
public void aggregate(Long companyId, AggregationLevel level, LocalDateTime now) {
String sourceTable;
String targetTable;
String sourceColumn;
String timeWhereColumn;
switch (level) {
case MIN_30 -> {
sourceTable = "dashboard_record_accumulate";
targetTable = "dashboard_aggregate_30min";
sourceColumn = "increment_minute";
timeWhereColumn = "upload_at";
}
case HOUR -> {
sourceTable = "dashboard_aggregate_30min";
targetTable = "dashboard_aggregate_hour";
sourceColumn = "aggregate_value";
timeWhereColumn = "time_start";
}
case DAY -> {
sourceTable = "dashboard_aggregate_hour";
targetTable = "dashboard_aggregate_day";
sourceColumn = "aggregate_value";
timeWhereColumn = "time_start";
}
case MONTH -> {
sourceTable = "dashboard_aggregate_day";
targetTable = "dashboard_aggregate_month";
sourceColumn = "aggregate_value";
timeWhereColumn = "time_start";
}
case YEAR -> {
sourceTable = "dashboard_aggregate_month";
targetTable = "dashboard_aggregate_year";
sourceColumn = "aggregate_value";
timeWhereColumn = "time_start";
}
default -> {
logger.warn("Unsupported aggregation level: {}", level);
return;
}
}
// --- 1) 计算 start/end(东京时区),并计算要写入的 date 字段 ---
java.time.ZoneId zone = java.time.ZoneId.of("Asia/Tokyo");
java.time.LocalDateTime end; // 区间右开端点(不包含)
java.time.LocalDateTime start; // 区间起点(包含)
switch (level) {
case MIN_30:
// 触发时应该是在 minute == 0 或 30。以当前时间向下取整到 0 或 30。
int minute = now.getMinute();
int roundedMinute = (minute >= 30) ? 30 : 0;
end = now.withMinute(roundedMinute).withSecond(0).withNano(0);
// 如果 now 正好在 0/30 分(触发时),end==now; 否则我们仍以向下取整后的 end 为区间结束
start = end.minusMinutes(30);
break;
case HOUR:
end = now.withMinute(0).withSecond(0).withNano(0);
start = end.minusHours(1);
break;
case DAY:
end = now.withHour(0).withMinute(0).withSecond(0).withNano(0);
start = end.minusDays(1);
break;
case MONTH:
end = now.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
start = end.minusMonths(1);
break;
case YEAR:
end = now.withMonth(1).withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
start = end.minusYears(1);
break;
default:
logger.warn("Unsupported level for time calc: {}", level);
return;
}
long startMillis = start.atZone(zone).toInstant().toEpochMilli();
long endMillis = end.atZone(zone).toInstant().toEpochMilli();
// 计算写入的 date 字段(基于 end 的“窗口标识时间”)
int y = end.getYear();
int m = end.getMonthValue();
int d = end.getDayOfMonth();
int h = end.getHour();
int mm = (level == AggregationLevel.MIN_30) ? (end.getMinute() == 0 ? 0 : 30) : end.getMinute();
// 根据级别决定要写入哪些 date 列和参数顺序
String dateCols;
java.util.List<Object> dateValues = new java.util.ArrayList<>();
switch (level) {
case MIN_30 -> {
dateCols = "date_year, date_month, date_day, date_hour, date_minute";
dateValues.add(y); dateValues.add(m); dateValues.add(d); dateValues.add(h); dateValues.add(mm);
}
case HOUR -> {
dateCols = "date_year, date_month, date_day, date_hour";
dateValues.add(y); dateValues.add(m); dateValues.add(d); dateValues.add(h);
}
case DAY -> {
dateCols = "date_year, date_month, date_day";
dateValues.add(y); dateValues.add(m); dateValues.add(d);
}
case MONTH -> {
dateCols = "date_year, date_month";
dateValues.add(y); dateValues.add(m);
}
case YEAR -> {
dateCols = "date_year";
dateValues.add(y);
}
default -> {
logger.warn("unexpected level");
return;
}
}
String schema = "data_center_dongjian_" + companyId;
// --- 2) SQL:用占位符传入 date 字段 + start/end 时间戳(毫秒) ---
String sql = String.format(
"INSERT INTO %s.%s (device_id, aggregate_value, %s, time_start, time_end, aggregated_at) " +
"SELECT device_id, SUM(CAST(%s AS DECIMAL(20,6))) AS aggregate_value, %s, ?, ?, ? " +
"FROM %s.%s " +
"WHERE %s >= ? AND %s < ? " +
"GROUP BY device_id",
schema, targetTable, dateCols,
sourceColumn,
// 下面占位符用于 date 列,会被 PreparedStatement 替换(数量与 dateValues.size() 一致)
String.join(", ", java.util.Collections.nCopies(dateValues.size(), "?")),
schema, sourceTable, timeWhereColumn, timeWhereColumn
);
try (Connection conn = getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
int idx = 1;
for (Object v : dateValues) {
if (v instanceof Integer) ps.setInt(idx++, (Integer) v);
else if (v instanceof Long) ps.setLong(idx++, (Long) v);
else ps.setObject(idx++, v);
}
ps.setObject(idx++, startMillis);
ps.setObject(idx++, endMillis);
// aggregated_at
ps.setObject(idx++, now);
ps.setLong(idx++, startMillis);
ps.setLong(idx, endMillis);
int inserted = ps.executeUpdate();
logger.info("[company:{}][{}] aggregated {} -> rows inserted: {}, start={}, end={}", companyId, level, targetTable, inserted, startMillis, endMillis);
} catch (SQLException e) {
logger.error("[company:{}][{}] aggregation failed, start={}, end={}", companyId, level, startMillis, endMillis, e);
}
}
}

51
src/main/java/com/dashboard/aws/lambda/util/DateUtil.java

@ -1,9 +1,14 @@
package com.dashboard.aws.lambda.util;
import com.dashboard.aws.lambda.entity.AggregationLevel;
import java.time.DayOfWeek;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.IsoFields;
import java.util.ArrayList;
import java.util.List;
public class DateUtil {
@ -37,4 +42,50 @@ public class DateUtil {
return lastYearDate.format(FORMATTER);
}
/**
* 判断当前时刻需要执行的聚合层级可能多个
* 例如每天0点 -> 执行 [30分钟, 小时, ]
* 每月1号0点 -> 执行 [30分钟, 小时, , ]
* 每年1月1号0点 -> 执行 [30分钟, 小时, , , ]
*/
public static List<AggregationLevel> getAggregationLevels(LocalDateTime now) {
List<AggregationLevel> levels = new ArrayList<>();
// 任何时候都执行30分钟聚合
levels.add(AggregationLevel.MIN_30);
// 整点
if (now.getMinute() == 0) {
levels.add(AggregationLevel.HOUR);
}
// 每天0点
if (now.getMinute() == 0 && now.getHour() == 0) {
levels.add(AggregationLevel.DAY);
}
// 每月1号0点
if (now.getMinute() == 0 && now.getHour() == 0 && now.getDayOfMonth() == 1) {
levels.add(AggregationLevel.MONTH);
}
// 每年1月1号0点
if (now.getMinute() == 0 && now.getHour() == 0 && now.getDayOfMonth() == 1 && now.getMonthValue() == 1) {
levels.add(AggregationLevel.YEAR);
}
return levels;
}
public static LocalDateTime getStartTime(AggregationLevel level, LocalDateTime now) {
return switch (level) {
case MIN_30 -> now.minusMinutes(30);
case HOUR -> now.minusHours(1);
case DAY -> now.minusDays(1);
case MONTH -> now.minusMonths(1);
case YEAR -> now.minusYears(1);
default -> now;
};
}
}

Loading…
Cancel
Save