You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

385 lines
17 KiB

package com.dashboard.aws.lambda.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.time.*;
import java.util.*;
public class MeasureAverageService {
private static final Logger logger = LoggerFactory.getLogger(MeasureAverageService.class);
private static final String MYSQL_URL = System.getenv("DB_URL");
private static final String DB_USER = System.getenv("DB_USER");
private static final String DB_PASSWORD = System.getenv("DB_PASSWORD");
private Connection getConnection() throws SQLException {
return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
}
public void aggregateLastHour(Long companyId, ZoneId zone, LocalDateTime now, LocalDateTime start) {
long startTs = start.atZone(zone).toInstant().toEpochMilli();
long endTs = now.atZone(zone).toInstant().toEpochMilli();
logger.info("Aggregating data for company {} from {} to {}", companyId, start, now);
String schema = "data_center_dongjian_" + companyId;
String realtimeTable = "dashboard_realtime_measure";
String sourceTable = "dashboard_record_measure";
String targetTable = "dashboard_aggregate_measure_hour";
try (Connection conn = getConnection()) {
// 1: 从实时表获取所有上报过的设备ID集合
Set<String> deviceSet = new HashSet<>();
String realtimeSql = String.format("SELECT device_id FROM %s.%s", schema, realtimeTable);
try (Statement st = conn.createStatement();
ResultSet rs = st.executeQuery(realtimeSql)) {
while (rs.next()) {
deviceSet.add(rs.getString("device_id"));
}
}
logger.info("Found {} devices from realtime table", deviceSet.size());
// 2: 查询过去一小时的数据
Map<String, List<Record>> currentHourData = queryRecordData(conn, schema, sourceTable, startTs, endTs);
// 3: 查询上一小时的聚合结果(用于回退)
LocalDateTime prevHour = start.minusHours(1);
Map<String, AggregateResult> prevHourData = queryPrevHourAggregates(conn, schema, targetTable, prevHour);
// 4: 计算当前小时平均、最大、最小值
Map<String, AggregateResult> resultMap = new HashMap<>();
for (Map.Entry<String, List<Record>> entry : currentHourData.entrySet()) {
String deviceId = entry.getKey();
List<Record> records = entry.getValue();
double avg = computeTimeWeightedAverage(records, startTs, endTs);
double max = computeMaxValue(records);
double min = computeMinValue(records);
resultMap.put(deviceId, new AggregateResult(avg, max, min));
}
// 5: 对比集合 A,确保所有设备都有值(否则用上一小时或 0)
for (String deviceId : deviceSet) {
if (!resultMap.containsKey(deviceId)) {
AggregateResult prev = prevHourData.get(deviceId);
if (prev != null) {
resultMap.put(deviceId, prev);
} else {
resultMap.put(deviceId, new AggregateResult(0.0, 0.0, 0.0));
}
}
}
// 6: 写入本小时聚合表
String insertSql = String.format(
"INSERT INTO %s.%s (device_id, average_value, max_value, min_value, " +
"date_year, date_month, date_day, date_hour, time_start, time_end, aggregated_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
schema, targetTable
);
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
for (Map.Entry<String, AggregateResult> entry : resultMap.entrySet()) {
String deviceId = entry.getKey();
AggregateResult r = entry.getValue();
LocalDateTime dt = start;
ps.setString(1, deviceId);
ps.setString(2, String.format("%.6f", r.avg));
ps.setString(3, String.format("%.6f", r.max));
ps.setString(4, String.format("%.6f", r.min));
ps.setInt(5, dt.getYear());
ps.setInt(6, dt.getMonthValue());
ps.setInt(7, dt.getDayOfMonth());
ps.setInt(8, dt.getHour());
ps.setLong(9, startTs);
ps.setLong(10, endTs);
ps.setTimestamp(11, Timestamp.valueOf(now));
ps.addBatch();
}
ps.executeBatch();
}
logger.info("Hourly aggregation finished for {} devices", resultMap.size());
// 判断是否到达每日 / 每月 / 每年聚合时间点
if (now.getHour() == 0) {
aggregateLastDay(companyId, now, now.toLocalDate().minusDays(1)); // 昨天的聚合
}
if (now.getDayOfMonth() == 1 && now.getHour() == 0) {
aggregateLastMonth(companyId, now, now.toLocalDate().minusMonths(1)); // 上个月的聚合
}
if (now.getMonthValue() == 1 && now.getDayOfMonth() == 1 && now.getHour() == 0) {
aggregateLastYear(companyId, now, now.toLocalDate().minusYears(1)); // 上一年的聚合
}
} catch (SQLException e) {
logger.error("Hourly aggregation failed", e);
}
}
public void aggregateLastDay(Long companyId, LocalDateTime now, LocalDate targetDate) {
String schema = "data_center_dongjian_" + companyId;
String sourceTable = "dashboard_aggregate_measure_hour";
String targetTable = "dashboard_aggregate_measure_day";
logger.info("Aggregating daily data for company: {}, date {}", companyId, targetDate);
String sql = String.format(
"SELECT device_id, (SUM(COALESCE(CAST(NULLIF(average_value, '') AS DECIMAL(20,6)), 0)) / 24.0) AS avg_val, " +
"MAX(max_value) AS max_val, MIN(min_value) AS min_val " +
"FROM %s.%s WHERE date_year = ? AND date_month = ? AND date_day = ? GROUP BY device_id",
schema, sourceTable
);
logger.info("sql:{}, date_year:{}, date_month:{}, date_day:{}", sql, targetDate.getYear(), targetDate.getMonthValue(), targetDate.getDayOfMonth());
String insertSql = String.format(
"INSERT INTO %s.%s (device_id, average_value, max_value, min_value, date_year, date_month, date_day, aggregated_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
schema, targetTable
);
try (Connection conn = getConnection();
PreparedStatement psSelect = conn.prepareStatement(sql);
PreparedStatement psInsert = conn.prepareStatement(insertSql)) {
psSelect.setInt(1, targetDate.getYear());
psSelect.setInt(2, targetDate.getMonthValue());
psSelect.setInt(3, targetDate.getDayOfMonth());
ResultSet rs = psSelect.executeQuery();
while (rs.next()) {
String deviceId = rs.getString("device_id");
double avg = rs.getDouble("avg_val");
double max = rs.getDouble("max_val");
double min = rs.getDouble("min_val");
psInsert.setString(1, deviceId);
psInsert.setDouble(2, avg);
psInsert.setDouble(3, max);
psInsert.setDouble(4, min);
psInsert.setInt(5, targetDate.getYear());
psInsert.setInt(6, targetDate.getMonthValue());
psInsert.setInt(7, targetDate.getDayOfMonth());
psInsert.setTimestamp(8, Timestamp.valueOf(now));
psInsert.addBatch();
}
psInsert.executeBatch();
logger.info("Daily aggregation completed for {}", targetDate);
} catch (SQLException e) {
logger.error("Daily aggregation failed for {}", targetDate, e);
}
}
public void aggregateLastMonth(Long companyId, LocalDateTime now, LocalDate targetMonth) {
String schema = "data_center_dongjian_" + companyId;
String sourceTable = "dashboard_aggregate_measure_day";
String targetTable = "dashboard_aggregate_measure_month";
logger.info("Aggregating monthly data for company:{}, month: {}", companyId, targetMonth.getMonthValue());
// 天数作为分母
int daysInMonth = YearMonth.of(targetMonth.getYear(), targetMonth.getMonthValue()).lengthOfMonth();
String sql = String.format(
"SELECT device_id, (SUM(COALESCE(CAST(NULLIF(average_value, '') AS DECIMAL(20,6)), 0)) / %d.0) AS avg_val, " +
"MAX(max_value) AS max_val, MIN(min_value) AS min_val " +
"FROM %s.%s WHERE date_year = ? AND date_month = ? GROUP BY device_id",
daysInMonth, schema, sourceTable
);
logger.info("sql:{}, date_year:{}, date_month:{}", sql, targetMonth.getYear(), targetMonth.getMonthValue());
String insertSql = String.format(
"INSERT INTO %s.%s (device_id, average_value, max_value, min_value, date_year, date_month, aggregated_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?)",
schema, targetTable
);
try (Connection conn = getConnection();
PreparedStatement psSelect = conn.prepareStatement(sql);
PreparedStatement psInsert = conn.prepareStatement(insertSql)) {
psSelect.setInt(1, targetMonth.getYear());
psSelect.setInt(2, targetMonth.getMonthValue());
ResultSet rs = psSelect.executeQuery();
while (rs.next()) {
String deviceId = rs.getString("device_id");
double avg = rs.getDouble("avg_val");
double max = rs.getDouble("max_val");
double min = rs.getDouble("min_val");
psInsert.setString(1, deviceId);
psInsert.setDouble(2, avg);
psInsert.setDouble(3, max);
psInsert.setDouble(4, min);
psInsert.setInt(5, targetMonth.getYear());
psInsert.setInt(6, targetMonth.getMonthValue());
psInsert.setTimestamp(7, Timestamp.valueOf(now));
psInsert.addBatch();
}
psInsert.executeBatch();
logger.info("Monthly aggregation completed for {}", targetMonth);
} catch (SQLException e) {
logger.error("Monthly aggregation failed for {}", targetMonth, e);
}
}
public void aggregateLastYear(Long companyId, LocalDateTime now, LocalDate targetYear) {
String schema = "data_center_dongjian_" + companyId;
String sourceTable = "dashboard_aggregate_measure_month";
String targetTable = "dashboard_aggregate_measure_year";
logger.info("Aggregating yearly data for company:{}, year: {}", companyId, targetYear.getYear());
String sql = String.format(
"SELECT device_id, (SUM(COALESCE(CAST(NULLIF(average_value, '') AS DECIMAL(20,6)), 0)) / 12.0) AS avg_val," +
" MAX(max_value) AS max_val, MIN(min_value) AS min_val " +
"FROM %s.%s WHERE date_year = ? GROUP BY device_id",
schema, sourceTable
);
logger.info("sql:{}, date_year:{}", sql, targetYear.getYear());
String insertSql = String.format(
"INSERT INTO %s.%s (device_id, average_value, max_value, min_value, date_year, aggregated_at) " +
"VALUES (?, ?, ?, ?, ?, ?)",
schema, targetTable
);
try (Connection conn = getConnection();
PreparedStatement psSelect = conn.prepareStatement(sql);
PreparedStatement psInsert = conn.prepareStatement(insertSql)) {
psSelect.setInt(1, targetYear.getYear());
ResultSet rs = psSelect.executeQuery();
while (rs.next()) {
String deviceId = rs.getString("device_id");
double avg = rs.getDouble("avg_val");
double max = rs.getDouble("max_val");
double min = rs.getDouble("min_val");
psInsert.setString(1, deviceId);
psInsert.setDouble(2, avg);
psInsert.setDouble(3, max);
psInsert.setDouble(4, min);
psInsert.setInt(5, targetYear.getYear());
psInsert.setTimestamp(6, Timestamp.valueOf(now));
psInsert.addBatch();
}
psInsert.executeBatch();
logger.info("Yearly aggregation completed for {}", targetYear.getYear());
} catch (SQLException e) {
logger.error("Yearly aggregation failed for {}", targetYear.getYear(), e);
}
}
// 查询当前小时数据
private Map<String, List<Record>> queryRecordData(Connection conn, String schema, String table, long startTs, long endTs) throws SQLException {
String sql = String.format(
"SELECT device_id, upload_value, upload_at FROM %s.%s " +
"WHERE upload_at >= ? AND upload_at < ? ORDER BY device_id, upload_at",
schema, table
);
Map<String, List<Record>> map = new HashMap<>();
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setLong(1, startTs);
ps.setLong(2, endTs);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String deviceId = rs.getString("device_id");
String valStr = rs.getString("upload_value");
long ts = rs.getLong("upload_at");
double val = (valStr == null || valStr.isEmpty()) ? 0.0 : Double.parseDouble(valStr);
map.computeIfAbsent(deviceId, k -> new ArrayList<>()).add(new Record(ts, val));
}
}
return map;
}
// 查询上一小时聚合结果
private Map<String, AggregateResult> queryPrevHourAggregates(Connection conn, String schema, String table, LocalDateTime prevHour) throws SQLException {
Map<String, AggregateResult> map = new HashMap<>();
String sql = String.format(
"SELECT device_id, average_value, max_value, min_value FROM %s.%s " +
"WHERE date_year = ? AND date_month = ? AND date_day = ? AND date_hour = ?",
schema, table
);
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setInt(1, prevHour.getYear());
ps.setInt(2, prevHour.getMonthValue());
ps.setInt(3, prevHour.getDayOfMonth());
ps.setInt(4, prevHour.getHour());
ResultSet rs = ps.executeQuery();
while (rs.next()) {
map.put(rs.getString("device_id"),
new AggregateResult(
rs.getDouble("average_value"),
rs.getDouble("max_value"),
rs.getDouble("min_value")));
}
}
return map;
}
// 时间加权平均
private double computeTimeWeightedAverage(List<Record> records, long startTs, long endTs) {
if (records == null || records.isEmpty()) return 0.0;
double sum = 0.0;
double totalMinutes = (endTs - startTs) / 60000.0; // 毫秒 → 分钟
for (int i = 0; i < records.size(); i++) {
Record rec = records.get(i);
long nextTs = (i < records.size() - 1) ? records.get(i + 1).timestamp : endTs;
double intervalMinutes = (nextTs - rec.timestamp) / 60000.0; // 转分钟
sum += rec.value * intervalMinutes;
}
return totalMinutes > 0 ? sum / totalMinutes : 0.0;
}
// 最大值
private double computeMaxValue(List<Record> records) {
return records.stream().mapToDouble(r -> r.value).max().orElse(0.0);
}
// 最小值
private double computeMinValue(List<Record> records) {
return records.stream().mapToDouble(r -> r.value).min().orElse(0.0);
}
private static class Record {
long timestamp;
double value;
Record(long timestamp, double value) {
this.timestamp = timestamp;
this.value = value;
}
}
private static class AggregateResult {
double avg;
double max;
double min;
AggregateResult(double avg, double max, double min) {
this.avg = avg;
this.max = max;
this.min = min;
}
}
}