From 1a27bf98627918a0f213519f53c4bb0f80136f57 Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Sun, 12 Oct 2025 14:40:42 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=8C=E6=AD=A5s3=E5=88=B0mysql?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../aws/lambda/handler/MySQLToS3Handler.java | 12 ++- .../aws/lambda/handler/S3ToMySQLHandler.java | 78 +++++++++++++++++++ .../aws/lambda/service/MySQLService.java | 43 ++++++++-- .../aws/lambda/service/S3Service.java | 16 ++++ .../dashboard/aws/lambda/util/CsvUtil.java | 23 ++++++ .../dashboard/aws/lambda/util/DateUtil.java | 40 ++++++++++ 6 files changed, 200 insertions(+), 12 deletions(-) create mode 100644 src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java create mode 100644 src/main/java/com/dashboard/aws/lambda/util/DateUtil.java diff --git a/src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java b/src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java index dd508be..6ba44bb 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java @@ -22,8 +22,12 @@ public class MySQLToS3Handler implements RequestHandler, Str @Override public String handleRequest(Map event, Context context) { try { - LocalDate targetDate = LocalDate.now(ZoneId.of("Asia/Tokyo")).minusDays(3); - long cutoff = targetDate.atStartOfDay(ZoneId.of("Asia/Tokyo")).toInstant().toEpochMilli(); + LocalDate now = LocalDate.now(ZoneId.of("Asia/Tokyo")); + LocalDate startDate = now.minusDays(5); // 5天前 + LocalDate endDate = now.minusDays(3); // 3天前 + + long startTs = startDate.atStartOfDay(ZoneId.of("Asia/Tokyo")).toInstant().toEpochMilli(); + long endTs = endDate.atStartOfDay(ZoneId.of("Asia/Tokyo")).toInstant().toEpochMilli(); // 查询符合条件的企业ID List companyIds = mysqlService.getActiveCompanyIds(); @@ -34,7 +38,7 @@ public class MySQLToS3Handler implements RequestHandler, Str for (String table : List.of("dashboard_record_accumulate", "dashboard_record_measure")) { // 查询旧数据 - List> rows = mysqlService.queryOldData(schema, table, cutoff); + List> rows = mysqlService.queryOldData(schema, table, startTs, endTs); if (rows.isEmpty()) { logger.info("[{}][{}] no data....", schema, table); continue; @@ -71,7 +75,7 @@ public class MySQLToS3Handler implements RequestHandler, Str // 全部上传成功才删除数据 if (allSuccess) { - mysqlService.deleteOldData(schema, table, cutoff); + mysqlService.deleteOldData(schema, table, startTs, endTs); logger.info("[{}][{}] delete old data", schema, table); } else { logger.error("[{}][{}] not all files uploaded successfully, skip delete", schema, table); diff --git a/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java b/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java new file mode 100644 index 0000000..f407fba --- /dev/null +++ b/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java @@ -0,0 +1,78 @@ +package com.dashboard.aws.lambda.handler; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.dashboard.aws.lambda.service.MySQLService; +import com.dashboard.aws.lambda.service.S3Service; +import com.dashboard.aws.lambda.util.CsvUtil; +import com.dashboard.aws.lambda.util.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDate; +import java.time.ZoneId; +import java.util.*; + +public class S3ToMySQLHandler implements RequestHandler, String> { + + private static final Logger logger = LoggerFactory.getLogger(S3ToMySQLHandler.class); + + private final MySQLService mysqlService = new MySQLService(); + private final S3Service s3Service = new S3Service(); + + @Override + public String handleRequest(Map event, Context context) { + try { + // 同步目标日期(这里同步 3 天前的数据) +// LocalDate targetDate = LocalDate.now(ZoneId.of("Asia/Tokyo")).minusDays(3); +// String dateStr = targetDate.format(java.time.format.DateTimeFormatter.ISO_DATE); +// String dateStr = "2025-10-08"; + LocalDate now = LocalDate.now(ZoneId.of("Asia/Tokyo")); + String dateStr = DateUtil.getLastYearSameIsoWeekDayStr(now); + + // 查询符合条件的企业ID + List companyIds = mysqlService.getActiveCompanyIds(); + logger.info("company id list: {}", companyIds); + + List tables = List.of("dashboard_record_accumulate", "dashboard_record_measure"); + + for (Long companyId : companyIds) { + String schema = "data_center_dongjian_" + companyId; + + for (String table : tables) { + // 构造 S3 文件路径 + String s3Key = String.format("%s/%s/%s.csv", table, companyId, dateStr); + logger.info("processing s3 file: {}", s3Key); + + // 下载 CSV + byte[] content = s3Service.download(s3Key); + if (content == null || content.length == 0) { + logger.warn("[{}][{}] file empty or not found: {}", schema, table, s3Key); + continue; + } + + // 解析 CSV + List> rows = CsvUtil.parseCsv(content); + if (rows.isEmpty()) { + logger.warn("[{}][{}] csv empty: {}", schema, table, s3Key); + continue; + } + + // 插入到 MySQL + try { + int inserted = mysqlService.insertRows(schema, table, rows); + logger.info("[{}][{}] inserted {} rows", schema, table, inserted); + } catch (Exception e) { + logger.error("[{}][{}] insert failed -> {}", schema, table, s3Key, e); + } + } + } + + return "sync process finished"; + + } catch (Exception e) { + logger.error("sync error", e); + return "sync error:" + e.getMessage(); + } + } +} diff --git a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java index 0761d9f..820c700 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java +++ b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java @@ -37,14 +37,15 @@ public class MySQLService { } } - public List> queryOldData(String schema, String table, long cutoff) throws SQLException { - String sql = String.format("SELECT * FROM %s.%s WHERE upload_at < ?", schema, table); - logger.info("query sql: {}, cutoff: {}", sql, cutoff); + public List> queryOldData(String schema, String table, long startTs, long endTs) throws SQLException { + String sql = String.format("SELECT * FROM %s.%s WHERE upload_at >= ? AND upload_at < ?", schema, table); + logger.info("query sql: {}, start: {}, end: {}", sql, startTs, endTs); try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql)) { - ps.setLong(1, cutoff); + ps.setLong(1, startTs); + ps.setLong(2, endTs); ResultSet rs = ps.executeQuery(); ResultSetMetaData meta = rs.getMetaData(); @@ -61,17 +62,43 @@ public class MySQLService { } } - public void deleteOldData(String schema, String table, long cutoff) throws SQLException { - String sql = String.format("DELETE FROM %s.%s WHERE upload_at < ?", schema, table); - logger.info("deleteOldData sql: {}, cutoff: {}", sql, cutoff); + public void deleteOldData(String schema, String table, long startTs, long endTs) throws SQLException { + String sql = String.format("DELETE FROM %s.%s WHERE upload_at >= ? AND upload_at < ?", schema, table); + logger.info("deleteOldData sql: {}, start: {}, end: {}", sql, startTs, endTs); try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql)) { - ps.setLong(1, cutoff); + ps.setLong(1, startTs); + ps.setLong(2, endTs); int deleted = ps.executeUpdate(); logger.info("[{}.{}] 删除 {} 条记录", schema, table, deleted); } } + public int insertRows(String schema, String table, List> rows) throws SQLException { + if (rows == null || rows.isEmpty()) return 0; + + Map first = rows.get(0); + List columns = new ArrayList<>(first.keySet()); + String columnStr = String.join(",", columns); + String placeholders = String.join(",", Collections.nCopies(columns.size(), "?")); + String sql = String.format("INSERT INTO %s.%s (%s) VALUES (%s)", schema, table, columnStr, placeholders); + logger.info("insert sql: {}", sql); + + int total = 0; + try (java.sql.Connection conn = java.sql.DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); + java.sql.PreparedStatement ps = conn.prepareStatement(sql)) { + + for (Map row : rows) { + for (int i = 0; i < columns.size(); i++) { + ps.setString(i + 1, row.getOrDefault(columns.get(i), null)); + } + ps.addBatch(); + } + int[] results = ps.executeBatch(); + for (int r : results) total += r; + } + return total; + } } diff --git a/src/main/java/com/dashboard/aws/lambda/service/S3Service.java b/src/main/java/com/dashboard/aws/lambda/service/S3Service.java index 98918da..56ab85d 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/S3Service.java +++ b/src/main/java/com/dashboard/aws/lambda/service/S3Service.java @@ -4,10 +4,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; @@ -80,4 +82,18 @@ public class S3Service { s3.close(); } } + + public byte[] download(String key) { + try { + ResponseBytes objectBytes = s3.getObjectAsBytes(GetObjectRequest.builder() + .bucket(bucket) + .key(key) + .build()); + logger.info("download success: {}", key); + return objectBytes.asByteArray(); + } catch (Exception e) { + logger.error("download failed: {}", key, e); + return null; + } + } } diff --git a/src/main/java/com/dashboard/aws/lambda/util/CsvUtil.java b/src/main/java/com/dashboard/aws/lambda/util/CsvUtil.java index 2c4cbc4..b5be18e 100644 --- a/src/main/java/com/dashboard/aws/lambda/util/CsvUtil.java +++ b/src/main/java/com/dashboard/aws/lambda/util/CsvUtil.java @@ -3,6 +3,8 @@ package com.dashboard.aws.lambda.util; import java.io.ByteArrayOutputStream; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -26,4 +28,25 @@ public class CsvUtil { writer.flush(); return out.toString(StandardCharsets.UTF_8).getBytes(StandardCharsets.UTF_8); } + + public static List> parseCsv(byte[] content) throws java.io.IOException { + List> list = new ArrayList<>(); + try (java.io.BufferedReader reader = new java.io.BufferedReader( + new java.io.InputStreamReader(new java.io.ByteArrayInputStream(content), java.nio.charset.StandardCharsets.UTF_8))) { + String line = reader.readLine(); + if (line == null) return list; + + String[] headers = line.split(","); + while ((line = reader.readLine()) != null) { + String[] values = line.split(",", -1); + Map row = new LinkedHashMap<>(); + for (int i = 0; i < headers.length; i++) { + row.put(headers[i], i < values.length ? values[i] : ""); + } + list.add(row); + } + } + return list; + } + } diff --git a/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java b/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java new file mode 100644 index 0000000..69b8e6a --- /dev/null +++ b/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java @@ -0,0 +1,40 @@ +package com.dashboard.aws.lambda.util; + +import java.time.DayOfWeek; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.time.temporal.IsoFields; + +public class DateUtil { + + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ISO_DATE; + + /** + * 根据指定日期,获取去年同 ISO 周同星期的日期 + * @param date 目标日期 + * @return 去年同周同日 LocalDate + */ + public static LocalDate getLastYearSameIsoWeekDay(LocalDate date) { + if (date == null) throw new IllegalArgumentException("date不能为空"); + + int isoWeekYear = date.get(IsoFields.WEEK_BASED_YEAR); + int isoWeekNumber = date.get(IsoFields.WEEK_OF_WEEK_BASED_YEAR); + DayOfWeek dayOfWeek = date.getDayOfWeek(); + + return LocalDate.now() + .with(IsoFields.WEEK_BASED_YEAR, isoWeekYear - 1) + .with(IsoFields.WEEK_OF_WEEK_BASED_YEAR, isoWeekNumber) + .with(dayOfWeek); + } + + /** + * 根据指定日期,获取去年同 ISO 周同星期的日期字符串 + * @param date 目标日期 + * @return 去年同周同日 yyyy-MM-dd + */ + public static String getLastYearSameIsoWeekDayStr(LocalDate date) { + LocalDate lastYearDate = getLastYearSameIsoWeekDay(date); + return lastYearDate.format(FORMATTER); + } + +}