Browse Source

数据源改成aurora

master
review512jwy@163.com 2 months ago
parent
commit
d5c6d4119a
  1. 13
      src/main/java/com/dashboard/aws/lambda/handler/AccumulateIncrementHandler.java
  2. 10
      src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java
  3. 89
      src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java
  4. 71
      src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java
  5. 9
      src/main/java/com/dashboard/aws/lambda/service/AccumulateIncrementService.java
  6. 31
      src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java
  7. 36
      src/main/java/com/dashboard/aws/lambda/service/MySQLService.java
  8. 13
      src/main/java/com/dashboard/aws/lambda/util/DateUtil.java

13
src/main/java/com/dashboard/aws/lambda/handler/AccumulateIncrementHandler.java

@ -24,20 +24,15 @@ public class AccumulateIncrementHandler implements RequestHandler<Map<String, Ob
public String handleRequest(Map<String, Object> event, Context context) { public String handleRequest(Map<String, Object> event, Context context) {
try { try {
ZoneId zone = ZoneId.of("Asia/Tokyo"); ZoneId zone = ZoneId.of("Asia/Tokyo");
LocalDateTime now = DateUtil.resolveEventTime(event, zone); LocalDateTime now = DateUtil.normalizeToHalfHourTime(event, zone);
List<AggregationLevel> levels = DateUtil.getAggregationLevels(now); List<AggregationLevel> levels = DateUtil.getAggregationLevels(now);
logger.info("Aggregation triggered at: {}, levels: {}", now, levels); logger.info("Aggregation triggered at: {}, levels: {}", now, levels);
// 从数据库或配置中取企业列表 // 从数据库或配置中取企业列表
List<Long> companyIds = mysqlService.getActiveCompanyIds(); for (AggregationLevel level : levels) {
logger.info("company id list: {}", companyIds); logger.info("Start aggregation [level:{}]", level);
accumulateIncrementService.aggregate(level, now);
for (Long companyId : companyIds) {
for (AggregationLevel level : levels) {
logger.info("Start aggregation [company:{}, level:{}]", companyId, level);
accumulateIncrementService.aggregate(companyId, level, now);
}
} }
return "Aggregation finished at: " + now; return "Aggregation finished at: " + now;

10
src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java

@ -22,17 +22,11 @@ public class MeasureAverageHandler implements RequestHandler<Map<String, Object>
@Override @Override
public String handleRequest(Map<String, Object> event, Context context) { public String handleRequest(Map<String, Object> event, Context context) {
try { try {
// 从数据库或配置中取企业列表
List<Long> companyIds = mysqlService.getActiveCompanyIds();
logger.info("company id list: {}", companyIds);
ZoneId zone = ZoneId.of("Asia/Tokyo"); ZoneId zone = ZoneId.of("Asia/Tokyo");
LocalDateTime now = DateUtil.resolveEventTime(event, zone); LocalDateTime now = DateUtil.normalizeToHalfHourTime(event, zone);
LocalDateTime start = now.minusHours(1); LocalDateTime start = now.minusHours(1);
for (Long companyId : companyIds) { measureAverageService.aggregateLastHour(zone, now, start);
measureAverageService.aggregateLastHour(companyId, zone, now, start);
}
return "Measure average finished"; return "Measure average finished";

89
src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java

@ -18,6 +18,9 @@ public class MySQLToS3Handler implements RequestHandler<Map<String, Object>, Str
private static final Logger logger = LoggerFactory.getLogger(MySQLToS3Handler.class); private static final Logger logger = LoggerFactory.getLogger(MySQLToS3Handler.class);
private static final String DB_SCHEMA = System.getenv("DB_SCHEMA");
private final MySQLService mysqlService = new MySQLService(); private final MySQLService mysqlService = new MySQLService();
private final S3Service s3Service = new S3Service(); private final S3Service s3Service = new S3Service();
@ -32,57 +35,51 @@ public class MySQLToS3Handler implements RequestHandler<Map<String, Object>, Str
long startTs = startDate.atStartOfDay(ZoneId.of("Asia/Tokyo")).toInstant().toEpochMilli(); long startTs = startDate.atStartOfDay(ZoneId.of("Asia/Tokyo")).toInstant().toEpochMilli();
long endTs = endDate.atStartOfDay(ZoneId.of("Asia/Tokyo")).toInstant().toEpochMilli(); long endTs = endDate.atStartOfDay(ZoneId.of("Asia/Tokyo")).toInstant().toEpochMilli();
// 查询符合条件的企业ID String schema = DB_SCHEMA;
List<Long> companyIds = mysqlService.getActiveCompanyIds();
logger.info("company id list: {}", companyIds);
for (Long companyId : companyIds) {
String schema = "data_center_dongjian_" + companyId;
for (String table : Constants.tables) { for (String table : Constants.tables) {
// 查询旧数据 // 查询旧数据
List<Map<String, Object>> rows = mysqlService.queryOldData(schema, table, startTs, endTs); List<Map<String, Object>> rows = mysqlService.queryOldData(schema, table, startTs, endTs);
if (rows.isEmpty()) { if (rows.isEmpty()) {
logger.info("[{}][{}] no data....", schema, table); logger.info("[{}][{}] no data....", schema, table);
continue; continue;
} }
// 按年月日分组 // 按年月日分组
Map<String, List<Map<String, Object>>> grouped = new HashMap<>(); Map<String, List<Map<String, Object>>> grouped = new HashMap<>();
for (Map<String, Object> row : rows) { for (Map<String, Object> row : rows) {
int y = (int) row.get("date_year"); int y = (int) row.get("date_year");
int m = (int) row.get("date_month"); int m = (int) row.get("date_month");
int d = (int) row.get("date_day"); int d = (int) row.get("date_day");
// 格式化年月日,不足两位补0 // 格式化年月日,不足两位补0
String monthStr = String.format("%02d", m); String monthStr = String.format("%02d", m);
String dayStr = String.format("%02d", d); String dayStr = String.format("%02d", d);
String key = String.format("%d-%s-%s", y, monthStr, dayStr); String key = String.format("%d-%s-%s", y, monthStr, dayStr);
grouped.computeIfAbsent(key, k -> new ArrayList<>()).add(row); grouped.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
} }
// 上传每个分组 // 上传每个分组
boolean allSuccess = true; boolean allSuccess = true;
for (Map.Entry<String, List<Map<String, Object>>> entry : grouped.entrySet()) { for (Map.Entry<String, List<Map<String, Object>>> entry : grouped.entrySet()) {
String dateKey = entry.getKey(); String dateKey = entry.getKey();
byte[] csvBytes = CsvUtil.toCsvBytes(entry.getValue()); byte[] csvBytes = CsvUtil.toCsvBytes(entry.getValue());
String s3Key = String.format("%s/%s/%s.csv", table, companyId, dateKey); String s3Key = String.format("%s/%s.csv", table, dateKey);
try { try {
s3Service.upload(s3Key, csvBytes); s3Service.upload(s3Key, csvBytes);
logger.info("[{}][{}] backup success -> {}", schema, table, s3Key); logger.info("[{}][{}] backup success -> {}", schema, table, s3Key);
} catch (Exception e) { } catch (Exception e) {
allSuccess = false; allSuccess = false;
logger.error("[{}][{}] backup failed -> {}", schema, table, s3Key, e); logger.error("[{}][{}] backup failed -> {}", schema, table, s3Key, e);
}
} }
}
// 全部上传成功才删除数据 // 全部上传成功才删除数据
if (allSuccess) { if (allSuccess) {
mysqlService.deleteOldData(schema, table, startTs, endTs); mysqlService.deleteOldData(schema, table, startTs, endTs);
logger.info("[{}][{}] delete old data", schema, table); logger.info("[{}][{}] delete old data", schema, table);
} else { } else {
logger.error("[{}][{}] not all files uploaded successfully, skip delete", schema, table); logger.error("[{}][{}] not all files uploaded successfully, skip delete", schema, table);
}
} }
} }

71
src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java

@ -18,6 +18,9 @@ public class S3ToMySQLHandler implements RequestHandler<Map<String, Object>, Str
private static final Logger logger = LoggerFactory.getLogger(S3ToMySQLHandler.class); private static final Logger logger = LoggerFactory.getLogger(S3ToMySQLHandler.class);
private static final String DB_SCHEMA = System.getenv("DB_SCHEMA");
private final MySQLService mysqlService = new MySQLService(); private final MySQLService mysqlService = new MySQLService();
private final S3Service s3Service = new S3Service(); private final S3Service s3Service = new S3Service();
@ -35,45 +38,39 @@ public class S3ToMySQLHandler implements RequestHandler<Map<String, Object>, Str
// --- 解析出 fileDate(用于删除早于该日期的数据) --- // --- 解析出 fileDate(用于删除早于该日期的数据) ---
LocalDate fileDate = LocalDate.parse(dateStr); LocalDate fileDate = LocalDate.parse(dateStr);
// 查询符合条件的企业ID
List<Long> companyIds = mysqlService.getActiveCompanyIds();
logger.info("company id list: {}", companyIds);
List<String> tables = Constants.tables; List<String> tables = Constants.tables;
for (Long companyId : companyIds) { String schema = DB_SCHEMA;
String schema = "data_center_dongjian_" + companyId;
for (String table : tables) {
for (String table : tables) { //先删除旧数据
//先删除旧数据 int deleted = mysqlService.deleteBeforeDate(schema, table, fileDate);
int deleted = mysqlService.deleteBeforeDate(schema, table, fileDate); logger.info("[{}][{}] deleted {} records before {}", schema, table, deleted, fileDate);
logger.info("[{}][{}] deleted {} records before {}", schema, table, deleted, fileDate);
// 构造 S3 文件路径
// 构造 S3 文件路径 String s3Key = String.format("%s/%s.csv", table, dateStr);
String s3Key = String.format("%s/%s/%s.csv", table, companyId, dateStr); logger.info("processing s3 file: {}", s3Key);
logger.info("processing s3 file: {}", s3Key);
// 下载 CSV
// 下载 CSV byte[] content = s3Service.download(s3Key);
byte[] content = s3Service.download(s3Key); if (content == null || content.length == 0) {
if (content == null || content.length == 0) { logger.warn("[{}][{}] file empty or not found: {}", schema, table, s3Key);
logger.warn("[{}][{}] file empty or not found: {}", schema, table, s3Key); continue;
continue; }
}
// 解析 CSV
// 解析 CSV List<Map<String, String>> rows = CsvUtil.parseCsv(content);
List<Map<String, String>> rows = CsvUtil.parseCsv(content); if (rows.isEmpty()) {
if (rows.isEmpty()) { logger.warn("[{}][{}] csv empty: {}", schema, table, s3Key);
logger.warn("[{}][{}] csv empty: {}", schema, table, s3Key); continue;
continue; }
}
// 插入到 MySQL
// 插入到 MySQL try {
try { int inserted = mysqlService.insertRows(schema, table, rows);
int inserted = mysqlService.insertRows(schema, table, rows); logger.info("[{}][{}] inserted {} rows", schema, table, inserted);
logger.info("[{}][{}] inserted {} rows", schema, table, inserted); } catch (Exception e) {
} catch (Exception e) { logger.error("[{}][{}] insert failed -> {}", schema, table, s3Key, e);
logger.error("[{}][{}] insert failed -> {}", schema, table, s3Key, e);
}
} }
} }

9
src/main/java/com/dashboard/aws/lambda/service/AccumulateIncrementService.java

@ -17,12 +17,13 @@ public class AccumulateIncrementService {
private static final String MYSQL_URL = System.getenv("DB_URL"); private static final String MYSQL_URL = System.getenv("DB_URL");
private static final String DB_USER = System.getenv("DB_USER"); private static final String DB_USER = System.getenv("DB_USER");
private static final String DB_PASSWORD = System.getenv("DB_PASSWORD"); private static final String DB_PASSWORD = System.getenv("DB_PASSWORD");
private static final String DB_SCHEMA = System.getenv("DB_SCHEMA");
private Connection getConnection() throws SQLException { private Connection getConnection() throws SQLException {
return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
} }
public void aggregate(Long companyId, AggregationLevel level, LocalDateTime now) { public void aggregate(AggregationLevel level, LocalDateTime now) {
String sourceTable; String sourceTable;
String targetTable; String targetTable;
String sourceColumn; String sourceColumn;
@ -141,7 +142,7 @@ public class AccumulateIncrementService {
} }
} }
String schema = "data_center_dongjian_" + companyId; String schema = DB_SCHEMA;
// --- 2) SQL:用占位符传入 date 字段 + start/end 时间戳(毫秒) --- // --- 2) SQL:用占位符传入 date 字段 + start/end 时间戳(毫秒) ---
String sql = String.format( String sql = String.format(
@ -174,9 +175,9 @@ public class AccumulateIncrementService {
ps.setLong(idx, endMillis); ps.setLong(idx, endMillis);
int inserted = ps.executeUpdate(); int inserted = ps.executeUpdate();
logger.info("[company:{}][{}] aggregated {} -> rows inserted: {}, start={}, end={}", companyId, level, targetTable, inserted, startMillis, endMillis); logger.info("[{}] aggregated {} -> rows inserted: {}, start={}, end={}", level, targetTable, inserted, startMillis, endMillis);
} catch (SQLException e) { } catch (SQLException e) {
logger.error("[company:{}][{}] aggregation failed, start={}, end={}", companyId, level, startMillis, endMillis, e); logger.error("[{}] aggregation failed, start={}, end={}", level, startMillis, endMillis, e);
} }
} }
} }

31
src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java

@ -14,18 +14,19 @@ public class MeasureAverageService {
private static final String MYSQL_URL = System.getenv("DB_URL"); private static final String MYSQL_URL = System.getenv("DB_URL");
private static final String DB_USER = System.getenv("DB_USER"); private static final String DB_USER = System.getenv("DB_USER");
private static final String DB_PASSWORD = System.getenv("DB_PASSWORD"); private static final String DB_PASSWORD = System.getenv("DB_PASSWORD");
private static final String DB_SCHEMA = System.getenv("DB_SCHEMA");
private Connection getConnection() throws SQLException { private Connection getConnection() throws SQLException {
return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
} }
public void aggregateLastHour(Long companyId, ZoneId zone, LocalDateTime now, LocalDateTime start) { public void aggregateLastHour(ZoneId zone, LocalDateTime now, LocalDateTime start) {
long startTs = start.atZone(zone).toInstant().toEpochMilli(); long startTs = start.atZone(zone).toInstant().toEpochMilli();
long endTs = now.atZone(zone).toInstant().toEpochMilli(); long endTs = now.atZone(zone).toInstant().toEpochMilli();
logger.info("Aggregating data for company {} from {} to {}", companyId, start, now); logger.info("Aggregating data from {} to {}", start, now);
String schema = "data_center_dongjian_" + companyId; String schema = DB_SCHEMA;
String realtimeTable = "dashboard_realtime_measure"; String realtimeTable = "dashboard_realtime_measure";
String sourceTable = "dashboard_record_measure"; String sourceTable = "dashboard_record_measure";
String targetTable = "dashboard_aggregate_measure_hour"; String targetTable = "dashboard_aggregate_measure_hour";
@ -110,13 +111,13 @@ public class MeasureAverageService {
// 判断是否到达每日 / 每月 / 每年聚合时间点 // 判断是否到达每日 / 每月 / 每年聚合时间点
if (now.getHour() == 0) { if (now.getHour() == 0) {
aggregateLastDay(companyId, now, now.toLocalDate().minusDays(1)); // 昨天的聚合 aggregateLastDay(now, now.toLocalDate().minusDays(1)); // 昨天的聚合
} }
if (now.getDayOfMonth() == 1 && now.getHour() == 0) { if (now.getDayOfMonth() == 1 && now.getHour() == 0) {
aggregateLastMonth(companyId, now, now.toLocalDate().minusMonths(1)); // 上个月的聚合 aggregateLastMonth(now, now.toLocalDate().minusMonths(1)); // 上个月的聚合
} }
if (now.getMonthValue() == 1 && now.getDayOfMonth() == 1 && now.getHour() == 0) { if (now.getMonthValue() == 1 && now.getDayOfMonth() == 1 && now.getHour() == 0) {
aggregateLastYear(companyId, now, now.toLocalDate().minusYears(1)); // 上一年的聚合 aggregateLastYear(now, now.toLocalDate().minusYears(1)); // 上一年的聚合
} }
} catch (SQLException e) { } catch (SQLException e) {
@ -124,12 +125,12 @@ public class MeasureAverageService {
} }
} }
public void aggregateLastDay(Long companyId, LocalDateTime now, LocalDate targetDate) { public void aggregateLastDay(LocalDateTime now, LocalDate targetDate) {
String schema = "data_center_dongjian_" + companyId; String schema = DB_SCHEMA;
String sourceTable = "dashboard_aggregate_measure_hour"; String sourceTable = "dashboard_aggregate_measure_hour";
String targetTable = "dashboard_aggregate_measure_day"; String targetTable = "dashboard_aggregate_measure_day";
logger.info("Aggregating daily data for company: {}, date {}", companyId, targetDate); logger.info("Aggregating daily data, date {}", targetDate);
String sql = String.format( String sql = String.format(
"SELECT device_id, (SUM(COALESCE(CAST(NULLIF(average_value, '') AS DECIMAL(20,6)), 0)) / 24.0) AS avg_val, " + "SELECT device_id, (SUM(COALESCE(CAST(NULLIF(average_value, '') AS DECIMAL(20,6)), 0)) / 24.0) AS avg_val, " +
@ -178,12 +179,12 @@ public class MeasureAverageService {
} }
} }
public void aggregateLastMonth(Long companyId, LocalDateTime now, LocalDate targetMonth) { public void aggregateLastMonth(LocalDateTime now, LocalDate targetMonth) {
String schema = "data_center_dongjian_" + companyId; String schema = DB_SCHEMA;
String sourceTable = "dashboard_aggregate_measure_day"; String sourceTable = "dashboard_aggregate_measure_day";
String targetTable = "dashboard_aggregate_measure_month"; String targetTable = "dashboard_aggregate_measure_month";
logger.info("Aggregating monthly data for company:{}, month: {}", companyId, targetMonth.getMonthValue()); logger.info("Aggregating monthly data, month: {}", targetMonth.getMonthValue());
// 天数作为分母 // 天数作为分母
int daysInMonth = YearMonth.of(targetMonth.getYear(), targetMonth.getMonthValue()).lengthOfMonth(); int daysInMonth = YearMonth.of(targetMonth.getYear(), targetMonth.getMonthValue()).lengthOfMonth();
@ -234,12 +235,12 @@ public class MeasureAverageService {
} }
} }
public void aggregateLastYear(Long companyId, LocalDateTime now, LocalDate targetYear) { public void aggregateLastYear(LocalDateTime now, LocalDate targetYear) {
String schema = "data_center_dongjian_" + companyId; String schema = DB_SCHEMA;
String sourceTable = "dashboard_aggregate_measure_month"; String sourceTable = "dashboard_aggregate_measure_month";
String targetTable = "dashboard_aggregate_measure_year"; String targetTable = "dashboard_aggregate_measure_year";
logger.info("Aggregating yearly data for company:{}, year: {}", companyId, targetYear.getYear()); logger.info("Aggregating yearly data, year: {}", targetYear.getYear());
String sql = String.format( String sql = String.format(
"SELECT device_id, (SUM(COALESCE(CAST(NULLIF(average_value, '') AS DECIMAL(20,6)), 0)) / 12.0) AS avg_val," + "SELECT device_id, (SUM(COALESCE(CAST(NULLIF(average_value, '') AS DECIMAL(20,6)), 0)) / 12.0) AS avg_val," +

36
src/main/java/com/dashboard/aws/lambda/service/MySQLService.java

@ -19,24 +19,24 @@ public class MySQLService {
/** /**
* 获取符合条件的企业ID * 获取符合条件的企业ID
*/ */
public List<Long> getActiveCompanyIds() throws SQLException { // public List<Long> getActiveCompanyIds() throws SQLException {
String sql = """ // String sql = """
SELECT id FROM data_center_new.basic_company // SELECT id FROM data_center_new.basic_company
WHERE flag != 1 // WHERE flag != 1
AND (parent_id = -1 OR parent_id = 1) // AND (parent_id = -1 OR parent_id = 1)
"""; // """;
//
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); // try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
PreparedStatement ps = conn.prepareStatement(sql); // PreparedStatement ps = conn.prepareStatement(sql);
ResultSet rs = ps.executeQuery()) { // ResultSet rs = ps.executeQuery()) {
//
List<Long> ids = new ArrayList<>(); // List<Long> ids = new ArrayList<>();
while (rs.next()) { // while (rs.next()) {
ids.add(rs.getLong("id")); // ids.add(rs.getLong("id"));
} // }
return ids; // return ids;
} // }
} // }
public List<Map<String, Object>> queryOldData(String schema, String table, long startTs, long endTs) throws SQLException { public List<Map<String, Object>> queryOldData(String schema, String table, long startTs, long endTs) throws SQLException {
String sql = String.format("SELECT * FROM %s.%s WHERE upload_at >= ? AND upload_at < ?", schema, table); String sql = String.format("SELECT * FROM %s.%s WHERE upload_at >= ? AND upload_at < ?", schema, table);

13
src/main/java/com/dashboard/aws/lambda/util/DateUtil.java

@ -109,11 +109,8 @@ public class DateUtil {
} }
*/ */
public static LocalDateTime resolveEventTime(Map<String, Object> event, ZoneId zone) { public static LocalDateTime normalizeToHalfHourTime(Map<String, Object> event, ZoneId zone) {
LocalDateTime now = LocalDateTime.now(zone) LocalDateTime now = LocalDateTime.now(zone);
.withMinute(0)
.withSecond(0)
.withNano(0);
if (MapUtil.isNotEmpty(event)) { if (MapUtil.isNotEmpty(event)) {
Integer year = MapUtil.getInt(event, "year"); Integer year = MapUtil.getInt(event, "year");
@ -137,7 +134,11 @@ public class DateUtil {
logger.info("Using default Tokyo time (missing year/month/day): {}", now); logger.info("Using default Tokyo time (missing year/month/day): {}", now);
} }
} else { } else {
logger.info("Using default Tokyo time (no event provided): {}", now); // 根据当前分钟数判断是落在 0 还是 30 分段
int currentMinute = now.getMinute();
int roundedMinute = (currentMinute >= 30) ? 30 : 0;
now = now.withMinute(roundedMinute).withSecond(0).withNano(0);
logger.info("Using default Tokyo time (rounded to {}): {}", roundedMinute, now);
} }
return now; return now;

Loading…
Cancel
Save