You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

183 lines
7.7 KiB

package com.dashboard.aws.lambda.service;
import com.dashboard.aws.lambda.entity.AggregationLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.LocalDateTime;
8 months ago
public class AccumulateIncrementService {
8 months ago
private static final Logger logger = LoggerFactory.getLogger(AccumulateIncrementService.class);
private static final String MYSQL_URL = System.getenv("DB_URL");
private static final String DB_USER = System.getenv("DB_USER");
private static final String DB_PASSWORD = System.getenv("DB_PASSWORD");
private Connection getConnection() throws SQLException {
return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
}
public void aggregate(Long companyId, AggregationLevel level, LocalDateTime now) {
String sourceTable;
String targetTable;
String sourceColumn;
String timeWhereColumn;
switch (level) {
case MIN_30 -> {
sourceTable = "dashboard_record_accumulate";
8 months ago
targetTable = "dashboard_aggregate_accumulate_30min";
sourceColumn = "increment_minute";
timeWhereColumn = "upload_at";
}
case HOUR -> {
8 months ago
sourceTable = "dashboard_aggregate_accumulate_30min";
targetTable = "dashboard_aggregate_accumulate_hour";
sourceColumn = "aggregate_value";
timeWhereColumn = "time_start";
}
case DAY -> {
8 months ago
sourceTable = "dashboard_aggregate_accumulate_hour";
targetTable = "dashboard_aggregate_accumulate_day";
sourceColumn = "aggregate_value";
timeWhereColumn = "time_start";
}
case MONTH -> {
8 months ago
sourceTable = "dashboard_aggregate_accumulate_day";
targetTable = "dashboard_aggregate_accumulate_month";
sourceColumn = "aggregate_value";
timeWhereColumn = "time_start";
}
case YEAR -> {
8 months ago
sourceTable = "dashboard_aggregate_accumulate_month";
targetTable = "dashboard_aggregate_accumulate_year";
sourceColumn = "aggregate_value";
timeWhereColumn = "time_start";
}
default -> {
logger.warn("Unsupported aggregation level: {}", level);
return;
}
}
// --- 1) 计算 start/end(东京时区),并计算要写入的 date 字段 ---
java.time.ZoneId zone = java.time.ZoneId.of("Asia/Tokyo");
java.time.LocalDateTime end; // 区间右开端点(不包含)
java.time.LocalDateTime start; // 区间起点(包含)
switch (level) {
case MIN_30:
// 触发时应该是在 minute == 0 或 30。以当前时间向下取整到 0 或 30。
int minute = now.getMinute();
int roundedMinute = (minute >= 30) ? 30 : 0;
end = now.withMinute(roundedMinute).withSecond(0).withNano(0);
// 如果 now 正好在 0/30 分(触发时),end==now; 否则我们仍以向下取整后的 end 为区间结束
start = end.minusMinutes(30);
break;
case HOUR:
end = now.withMinute(0).withSecond(0).withNano(0);
start = end.minusHours(1);
break;
case DAY:
end = now.withHour(0).withMinute(0).withSecond(0).withNano(0);
start = end.minusDays(1);
break;
case MONTH:
end = now.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
start = end.minusMonths(1);
break;
case YEAR:
end = now.withMonth(1).withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
start = end.minusYears(1);
break;
default:
logger.warn("Unsupported level for time calc: {}", level);
return;
}
long startMillis = start.atZone(zone).toInstant().toEpochMilli();
long endMillis = end.atZone(zone).toInstant().toEpochMilli();
// 计算写入的 date 字段(基于 end 的“窗口标识时间”)
int y = end.getYear();
int m = end.getMonthValue();
int d = end.getDayOfMonth();
int h = end.getHour();
int mm = (level == AggregationLevel.MIN_30) ? (end.getMinute() == 0 ? 0 : 30) : end.getMinute();
// 根据级别决定要写入哪些 date 列和参数顺序
String dateCols;
java.util.List<Object> dateValues = new java.util.ArrayList<>();
switch (level) {
case MIN_30 -> {
dateCols = "date_year, date_month, date_day, date_hour, date_minute";
dateValues.add(y); dateValues.add(m); dateValues.add(d); dateValues.add(h); dateValues.add(mm);
}
case HOUR -> {
dateCols = "date_year, date_month, date_day, date_hour";
dateValues.add(y); dateValues.add(m); dateValues.add(d); dateValues.add(h);
}
case DAY -> {
dateCols = "date_year, date_month, date_day";
dateValues.add(y); dateValues.add(m); dateValues.add(d);
}
case MONTH -> {
dateCols = "date_year, date_month";
dateValues.add(y); dateValues.add(m);
}
case YEAR -> {
dateCols = "date_year";
dateValues.add(y);
}
default -> {
logger.warn("unexpected level");
return;
}
}
String schema = "data_center_dongjian_" + companyId;
// --- 2) SQL:用占位符传入 date 字段 + start/end 时间戳(毫秒) ---
String sql = String.format(
"INSERT INTO %s.%s (device_id, aggregate_value, %s, time_start, time_end, aggregated_at) " +
"SELECT device_id, SUM(CAST(%s AS DECIMAL(20,6))) AS aggregate_value, %s, ?, ?, ? " +
"FROM %s.%s " +
"WHERE %s >= ? AND %s < ? " +
"GROUP BY device_id",
schema, targetTable, dateCols,
sourceColumn,
// 下面占位符用于 date 列,会被 PreparedStatement 替换(数量与 dateValues.size() 一致)
String.join(", ", java.util.Collections.nCopies(dateValues.size(), "?")),
schema, sourceTable, timeWhereColumn, timeWhereColumn
);
try (Connection conn = getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
int idx = 1;
for (Object v : dateValues) {
if (v instanceof Integer) ps.setInt(idx++, (Integer) v);
else if (v instanceof Long) ps.setLong(idx++, (Long) v);
else ps.setObject(idx++, v);
}
ps.setObject(idx++, startMillis);
ps.setObject(idx++, endMillis);
// aggregated_at
ps.setObject(idx++, now);
ps.setLong(idx++, startMillis);
ps.setLong(idx, endMillis);
int inserted = ps.executeUpdate();
logger.info("[company:{}][{}] aggregated {} -> rows inserted: {}, start={}, end={}", companyId, level, targetTable, inserted, startMillis, endMillis);
} catch (SQLException e) {
logger.error("[company:{}][{}] aggregation failed, start={}, end={}", companyId, level, startMillis, endMillis, e);
}
}
}