3 changed files with 229 additions and 9 deletions
@ -0,0 +1,43 @@ |
|||
package com.dashboard.aws.lambda.handler; |
|||
|
|||
import com.amazonaws.services.lambda.runtime.Context; |
|||
import com.amazonaws.services.lambda.runtime.RequestHandler; |
|||
import com.dashboard.aws.lambda.service.MeasureAverageService; |
|||
import com.dashboard.aws.lambda.service.MySQLService; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
|
|||
import java.time.LocalDateTime; |
|||
import java.time.ZoneId; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
|
|||
public class MeasureAverageHandler implements RequestHandler<Map<String, Object>, String> { |
|||
|
|||
private static final Logger logger = LoggerFactory.getLogger(MeasureAverageHandler.class); |
|||
private final MeasureAverageService measureAverageService = new MeasureAverageService(); |
|||
private final MySQLService mysqlService = new MySQLService(); |
|||
|
|||
@Override |
|||
public String handleRequest(Map<String, Object> event, Context context) { |
|||
try { |
|||
// 从数据库或配置中取企业列表
|
|||
List<Long> companyIds = mysqlService.getActiveCompanyIds(); |
|||
logger.info("company id list: {}", companyIds); |
|||
|
|||
ZoneId zone = ZoneId.of("Asia/Tokyo"); |
|||
LocalDateTime now = LocalDateTime.now(zone).withMinute(0).withSecond(0).withNano(0); |
|||
LocalDateTime start = now.minusHours(1); |
|||
|
|||
for (Long companyId : companyIds) { |
|||
measureAverageService.aggregateLastHour(companyId, zone, now, start); |
|||
} |
|||
|
|||
return "Measure average finished"; |
|||
|
|||
} catch (Exception e) { |
|||
logger.error("MeasureAverage error", e); |
|||
return "MeasureAverage error:" + e.getMessage(); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,177 @@ |
|||
package com.dashboard.aws.lambda.service; |
|||
|
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
|
|||
import java.sql.*; |
|||
import java.time.*; |
|||
import java.util.*; |
|||
|
|||
public class MeasureAverageService { |
|||
|
|||
private static final Logger logger = LoggerFactory.getLogger(MeasureAverageService.class); |
|||
|
|||
private static final String MYSQL_URL = System.getenv("DB_URL"); |
|||
private static final String DB_USER = System.getenv("DB_USER"); |
|||
private static final String DB_PASSWORD = System.getenv("DB_PASSWORD"); |
|||
|
|||
private Connection getConnection() throws SQLException { |
|||
return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); |
|||
} |
|||
|
|||
public void aggregateLastHour(Long companyId, ZoneId zone, LocalDateTime now, LocalDateTime start) { |
|||
long startTs = start.atZone(zone).toInstant().toEpochMilli(); |
|||
long endTs = now.atZone(zone).toInstant().toEpochMilli(); |
|||
|
|||
logger.info("Aggregating data for company {} from {} to {}", companyId, start, now); |
|||
|
|||
String schema = "data_center_dongjian_" + companyId; |
|||
String realtimeTable = "dashboard_realtime_measure"; |
|||
String sourceTable = "dashboard_record_measure"; |
|||
String targetTable = "dashboard_aggregate_measure_hour"; |
|||
|
|||
try (Connection conn = getConnection()) { |
|||
|
|||
// 1: 从实时表获取所有上报过的设备ID集合
|
|||
Set<String> deviceSet = new HashSet<>(); |
|||
String realtimeSql = String.format("SELECT device_id FROM %s.%s", schema, realtimeTable); |
|||
try (Statement st = conn.createStatement(); |
|||
ResultSet rs = st.executeQuery(realtimeSql)) { |
|||
while (rs.next()) { |
|||
deviceSet.add(rs.getString("device_id")); |
|||
} |
|||
} |
|||
logger.info("Found {} devices from realtime table", deviceSet.size()); |
|||
|
|||
// 2: 查询过去一小时的数据
|
|||
Map<String, List<Record>> currentHourData = queryRecordData(conn, schema, sourceTable, startTs, endTs); |
|||
|
|||
// 3: 查询上一小时的聚合结果(用于回退)
|
|||
LocalDateTime prevHour = start.minusHours(1); |
|||
Map<String, Double> prevHourData = queryPrevHourAggregates(conn, schema, targetTable, prevHour); |
|||
|
|||
// 4: 计算当前小时平均值(如果有)
|
|||
Map<String, Double> resultMap = new HashMap<>(); |
|||
for (Map.Entry<String, List<Record>> entry : currentHourData.entrySet()) { |
|||
String deviceId = entry.getKey(); |
|||
double avg = computeTimeWeightedAverage(entry.getValue(), startTs, endTs); |
|||
resultMap.put(deviceId, avg); |
|||
} |
|||
|
|||
// 5: 对比集合 A,确保所有设备都有值(否则用上一小时或 0)
|
|||
for (String deviceId : deviceSet) { |
|||
if (!resultMap.containsKey(deviceId)) { |
|||
Double prevVal = prevHourData.get(deviceId); |
|||
double value = (prevVal != null) ? prevVal : 0.0; |
|||
resultMap.put(deviceId, value); |
|||
} |
|||
} |
|||
|
|||
// 6: 写入本小时聚合表
|
|||
String insertSql = String.format( |
|||
"INSERT INTO %s.%s (device_id, average_value, date_year, date_month, date_day, date_hour, " + |
|||
"time_start, time_end, aggregated_at) " + |
|||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", schema, targetTable |
|||
); |
|||
|
|||
try (PreparedStatement ps = conn.prepareStatement(insertSql)) { |
|||
for (Map.Entry<String, Double> entry : resultMap.entrySet()) { |
|||
String deviceId = entry.getKey(); |
|||
double avg = entry.getValue(); |
|||
LocalDateTime dt = start; |
|||
|
|||
ps.setString(1, deviceId); |
|||
ps.setString(2, String.format("%.6f", avg)); |
|||
ps.setInt(3, dt.getYear()); |
|||
ps.setInt(4, dt.getMonthValue()); |
|||
ps.setInt(5, dt.getDayOfMonth()); |
|||
ps.setInt(6, dt.getHour()); |
|||
ps.setLong(7, startTs); |
|||
ps.setLong(8, endTs); |
|||
ps.setTimestamp(9, Timestamp.valueOf(now)); |
|||
|
|||
ps.addBatch(); |
|||
} |
|||
ps.executeBatch(); |
|||
} |
|||
|
|||
logger.info("Hourly aggregation finished for {} devices", resultMap.size()); |
|||
|
|||
} catch (SQLException e) { |
|||
logger.error("Hourly aggregation failed", e); |
|||
} |
|||
} |
|||
|
|||
// 查询当前小时数据
|
|||
private Map<String, List<Record>> queryRecordData(Connection conn, String schema, String table, long startTs, long endTs) throws SQLException { |
|||
String sql = String.format( |
|||
"SELECT device_id, upload_value, upload_at FROM %s.%s " + |
|||
"WHERE upload_at >= ? AND upload_at < ? ORDER BY device_id, upload_at", |
|||
schema, table |
|||
); |
|||
Map<String, List<Record>> map = new HashMap<>(); |
|||
try (PreparedStatement ps = conn.prepareStatement(sql)) { |
|||
ps.setLong(1, startTs); |
|||
ps.setLong(2, endTs); |
|||
ResultSet rs = ps.executeQuery(); |
|||
while (rs.next()) { |
|||
String deviceId = rs.getString("device_id"); |
|||
String valStr = rs.getString("upload_value"); |
|||
long ts = rs.getLong("upload_at"); |
|||
double val = (valStr == null || valStr.isEmpty()) ? 0.0 : Double.parseDouble(valStr); |
|||
map.computeIfAbsent(deviceId, k -> new ArrayList<>()).add(new Record(ts, val)); |
|||
} |
|||
} |
|||
return map; |
|||
} |
|||
|
|||
// 查询上一小时聚合结果
|
|||
private Map<String, Double> queryPrevHourAggregates(Connection conn, String schema, String table, LocalDateTime prevHour) throws SQLException { |
|||
Map<String, Double> map = new HashMap<>(); |
|||
String sql = String.format( |
|||
"SELECT device_id, average_value FROM %s.%s " + |
|||
"WHERE date_year = ? AND date_month = ? AND date_day = ? AND date_hour = ?", |
|||
schema, table |
|||
); |
|||
try (PreparedStatement ps = conn.prepareStatement(sql)) { |
|||
ps.setInt(1, prevHour.getYear()); |
|||
ps.setInt(2, prevHour.getMonthValue()); |
|||
ps.setInt(3, prevHour.getDayOfMonth()); |
|||
ps.setInt(4, prevHour.getHour()); |
|||
ResultSet rs = ps.executeQuery(); |
|||
while (rs.next()) { |
|||
map.put(rs.getString("device_id"), rs.getDouble("average_value")); |
|||
} |
|||
} |
|||
return map; |
|||
} |
|||
|
|||
private double computeTimeWeightedAverage(List<Record> records, long startTs, long endTs) { |
|||
if (records == null || records.isEmpty()) return 0.0; |
|||
|
|||
double sum = 0.0; |
|||
double totalMinutes = (endTs - startTs) / 60000.0; // 毫秒 → 分钟
|
|||
|
|||
for (int i = 0; i < records.size(); i++) { |
|||
Record rec = records.get(i); |
|||
long nextTs = (i < records.size() - 1) ? records.get(i + 1).timestamp : endTs; |
|||
double intervalMinutes = (nextTs - rec.timestamp) / 60000.0; // 转分钟
|
|||
logger.info("intervalMinutes: {}, rec.value: {}", intervalMinutes, rec.value); |
|||
sum += rec.value * intervalMinutes; |
|||
} |
|||
|
|||
logger.info("totalMinutes: {}, sum: {}", totalMinutes, sum); |
|||
|
|||
return totalMinutes > 0 ? sum / totalMinutes : 0.0; |
|||
} |
|||
|
|||
private static class Record { |
|||
long timestamp; |
|||
double value; |
|||
|
|||
Record(long timestamp, double value) { |
|||
this.timestamp = timestamp; |
|||
this.value = value; |
|||
} |
|||
} |
|||
} |
Loading…
Reference in new issue