diff --git a/pom.xml b/pom.xml index 69d67ce..32cac36 100644 --- a/pom.xml +++ b/pom.xml @@ -7,6 +7,11 @@ 1.0-SNAPSHOT dashboard-statistics-lambda Maven Webapp http://maven.apache.org + + + UTF-8 + + junit diff --git a/src/main/java/com/dashboard/aws/lambda/Constants.java b/src/main/java/com/dashboard/aws/lambda/Constants.java index 21cd70d..510a3b6 100644 --- a/src/main/java/com/dashboard/aws/lambda/Constants.java +++ b/src/main/java/com/dashboard/aws/lambda/Constants.java @@ -5,5 +5,6 @@ import java.util.List; public class Constants { public static final List tables = List.of("dashboard_record_accumulate", "dashboard_record_measure"); +// public static final List tables = List.of("dashboard_record_accumulate_test", "dashboard_record_measure_test"); } diff --git a/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java b/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java new file mode 100644 index 0000000..285b372 --- /dev/null +++ b/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java @@ -0,0 +1,237 @@ +package com.dashboard.aws.lambda.handler; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.dashboard.aws.lambda.Constants; +import com.dashboard.aws.lambda.service.MySQLService; +import com.dashboard.aws.lambda.service.S3Service; +import com.dashboard.aws.lambda.util.CsvUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class MySQLBatchToS3Handler implements RequestHandler, String> { + + private static final Logger logger = LoggerFactory.getLogger(MySQLBatchToS3Handler.class); + + private static final String DB_SCHEMA = System.getenv("DB_SCHEMA"); + private static final int BATCH_SIZE = Integer.parseInt(System.getenv("BATCH_SIZE")); + private static final String CUTOFF_DATE_STR = System.getenv("CUTOFF_DATE"); + private static final ZoneId TOKYO_ZONE = ZoneId.of("Asia/Tokyo"); + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + private static final Pattern FILE_PATTERN = Pattern.compile(".*/(\\d{3})\\.csv$"); + + private final MySQLService mysqlService = new MySQLService(); + private final S3Service s3Service = new S3Service(); + + @Override + public String handleRequest(Map event, Context context) { + long startTime = System.currentTimeMillis(); + try { + // 确定截止日期:优先使用环境变量,否则默认使用东京时区3天前 + LocalDate cutoffDate; + if (CUTOFF_DATE_STR != null && !CUTOFF_DATE_STR.isEmpty()) { + cutoffDate = LocalDate.parse(CUTOFF_DATE_STR); + logger.info("Using cutoff date from env: {}", cutoffDate); + } else { + cutoffDate = LocalDate.now(TOKYO_ZONE).minusDays(2); + logger.info("Using default cutoff date (2 days ago): {}", cutoffDate); + } + + // 转换为毫秒(日本时区当天0点) + long cutoffUploadAt = cutoffDate.atStartOfDay(TOKYO_ZONE).toInstant().toEpochMilli(); + logger.info("Cutoff upload_at: {}", cutoffUploadAt); + + // 处理每个配置的表 + for (String table : Constants.tables) { + processTable(table, cutoffUploadAt, startTime); + } + + long totalTime = System.currentTimeMillis() - startTime; + logger.info("All tables processed, total time: {}ms", totalTime); + return "batch backup process finished"; + + } catch (Exception e) { + logger.error("batch backup error", e); + return "batch backup error:" + e.getMessage(); + } + } + + /** + * 处理单个表的数据备份 + * 循环查询批次数据,直到没有数据或遇到截止日期或超时 + */ + private void processTable(String table, long cutoffUploadAt, long overallStartTime) { + logger.info("[{}][{}] start processing batch size: {}", DB_SCHEMA, table, BATCH_SIZE); + long tableStartTime = System.currentTimeMillis(); + + Long lastProcessedId = null; + Long lastUploadAt = null; + boolean shouldContinue = true; + + while (shouldContinue) { + // 检查是否快超时:距离15分钟还剩小于2分钟时停止,留点余量时间,防止lambda超时 + long elapsedTime = System.currentTimeMillis() - overallStartTime; + long remainingTime = 15 * 60 * 1000 - elapsedTime; + if (remainingTime < 2 * 60 * 1000) { + logger.warn("[{}][{}] Time limit approaching, stopping. Elapsed: {}ms, Remaining: {}ms", + DB_SCHEMA, table, elapsedTime, remainingTime); + break; + } + + // 查询当前批次数据 + List> rows = null; + try { + rows = mysqlService.queryFirstNByIdGreaterThan(DB_SCHEMA, table, BATCH_SIZE, lastProcessedId, lastUploadAt, cutoffUploadAt); + } catch (Exception e) { + logger.error("[{}][{}] query failed", DB_SCHEMA, table, e); + break; + } + + if (rows.isEmpty()) { + logger.info("[{}][{}] no more data", DB_SCHEMA, table); + break; + } + + logger.info("[{}][{}] fetched {} rows", DB_SCHEMA, table, rows.size()); + + // 按日期分组数据 + Map>> grouped = new HashMap<>(); + Long maxIdInBatch = null; + Long maxUploadAtInBatch = null; + + for (Map row : rows) { + Long id = (Long) row.get("id"); + + // 更新当前批次的最大ID和最大uploadAt + if (maxIdInBatch == null || id > maxIdInBatch) { + maxIdInBatch = id; + } + Long uploadAt = (Long) row.get("upload_at"); + if (maxUploadAtInBatch == null || uploadAt > maxUploadAtInBatch) { + maxUploadAtInBatch = uploadAt; + } + + // 根据upload_at转换为东京时区日期 + LocalDate date = Instant.ofEpochMilli(uploadAt).atZone(TOKYO_ZONE).toLocalDate(); + + // 按日期分组 + String dateKey = date.format(DATE_FORMATTER); + grouped.computeIfAbsent(dateKey, k -> new ArrayList<>()).add(row); + } + + // 如果当前批次没有有效数据,继续下一批 + if (grouped.isEmpty()) { + logger.info("[{}][{}] no valid data in this batch", DB_SCHEMA, table); + if (rows.size() < BATCH_SIZE) { + shouldContinue = false; + } else { + lastProcessedId = maxIdInBatch; + lastUploadAt = maxUploadAtInBatch; + } + continue; + } + + // 每个日期分组独立处理:上传成功一个,就立即删除该分组的ID + boolean anyFailure = false; + List processedIds = new ArrayList<>(); + + for (Map.Entry>> entry : grouped.entrySet()) { + long groupStartTime = System.currentTimeMillis(); + String dateKey = entry.getKey(); + List> dateRows = entry.getValue(); + + // 收集当前分组的ID + List currentDateIds = new ArrayList<>(); + for (Map row : dateRows) { + currentDateIds.add((Long) row.get("id")); + } + + // 生成S3文件名,按序号递增 + String prefix = String.format("%s/%s", table, dateKey); + int nextSeq = getNextSequenceNumber(prefix); + String s3Key = String.format("%s/%s/%03d.csv", table, dateKey, nextSeq); + + try { + byte[] csvBytes = CsvUtil.toCsvBytes(dateRows); + s3Service.upload(s3Key, csvBytes); + logger.info("[{}][{}] upload success -> {}, {} rows", DB_SCHEMA, table, s3Key, dateRows.size()); + + // 上传成功,立即删除该分组对应的ID + try { + mysqlService.deleteByIds(DB_SCHEMA, table, currentDateIds); + logger.info("[{}][{}] delete {} records for date {}", DB_SCHEMA, table, currentDateIds.size(), dateKey); + processedIds.addAll(currentDateIds); + } catch (Exception e) { + logger.error("[{}][{}] delete failed for date {}, trying to rollback S3 file", DB_SCHEMA, table, dateKey, e); + // MySQL删除失败,回滚S3文件,保持一致性 + try { + s3Service.delete(s3Key); + logger.info("[{}][{}] rollback success, deleted S3 file: {}", DB_SCHEMA, table, s3Key); + } catch (Exception rollbackEx) { + logger.error("[{}][{}] rollback failed, S3 file may be orphaned: {}", DB_SCHEMA, table, s3Key, rollbackEx); + } + anyFailure = true; + } + } catch (Exception e) { + anyFailure = true; + logger.error("[{}][{}] upload failed -> {}", DB_SCHEMA, table, s3Key, e); + } + + // 记录当前分组处理耗时 + long groupTime = System.currentTimeMillis() - groupStartTime; + logger.info("[{}][{}] date {} processed in {}ms", DB_SCHEMA, table, dateKey, groupTime); + } + + // 更新下次查询的起始ID:只要处理了数据,就更新到已处理的最大ID + if (!processedIds.isEmpty()) { + Long maxProcessedId = processedIds.stream().max(Long::compareTo).orElse(null); + if (maxProcessedId != null) { + lastProcessedId = maxProcessedId; + lastUploadAt = maxUploadAtInBatch; + } + } + + // 确定是否继续:只有全部成功且当前批次是完整批次时才继续 + if (anyFailure || rows.size() < BATCH_SIZE) { + shouldContinue = false; + } + } + + // 记录表处理总耗时 + long tableTime = System.currentTimeMillis() - tableStartTime; + logger.info("[{}][{}] table processing finished, total time: {}ms", DB_SCHEMA, table, tableTime); + } + + /** + * 获取下一个文件序号 + * 查询S3上指定前缀的文件,找到最大序号后+1 + */ + private int getNextSequenceNumber(String prefix) { + List keys = s3Service.listKeysWithPrefix(prefix); + int maxSeq = 0; + + for (String key : keys) { + Matcher matcher = FILE_PATTERN.matcher(key); + if (matcher.matches()) { + int seq = Integer.parseInt(matcher.group(1)); + if (seq > maxSeq) { + maxSeq = seq; + } + } + } + + return maxSeq + 1; + } +} diff --git a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java index dfe9e2e..f77109c 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java +++ b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java @@ -88,8 +88,8 @@ public class MySQLService { // logger.info("insert sql: {}", sql); int total = 0; - try (java.sql.Connection conn = java.sql.DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); - java.sql.PreparedStatement ps = conn.prepareStatement(sql)) { + try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); + PreparedStatement ps = conn.prepareStatement(sql)) { for (Map row : rows) { for (int i = 0; i < columns.size(); i++) { @@ -113,7 +113,7 @@ public class MySQLService { ); logger.info("deleteBeforeDate sql: {}, cutoff:{}", sql, cutoff); - try (Connection conn = java.sql.DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); + try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql)) { ps.setInt(1, cutoff); int deleted = ps.executeUpdate(); @@ -125,4 +125,77 @@ public class MySQLService { } } + /** + * 根据ID列表批量删除数据 + * 使用IN语句一次性删除,提高效率 + */ + public void deleteByIds(String schema, String table, List ids) throws SQLException { + if (ids == null || ids.isEmpty()) { + return; + } + + // 构建IN子句的占位符字符串:?, ?, ?, ... + String placeholders = String.join(",", Collections.nCopies(ids.size(), "?")); + String sql = String.format("DELETE FROM %s.%s WHERE id IN (%s)", schema, table, placeholders); + logger.info("deleteByIds sql: {}, ids count: {}", sql, ids.size()); + + try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); + PreparedStatement ps = conn.prepareStatement(sql)) { + + // 逐个设置ID参数值 + for (int i = 0; i < ids.size(); i++) { + ps.setLong(i + 1, ids.get(i)); + } + int deleted = ps.executeUpdate(); + logger.info("[{}.{}] 删除 {} 条记录", schema, table, deleted); + } + } + + /** + * 查询ID大于指定值的前N条数据(按 upload_at + id 升序) + * 用于分页分批处理数据 + * @param lastId 上次处理的最大ID(不包含),为null时从第一条开始查询 + * @param lastUploadAt 上次处理的最大 upload_at(不包含) + * @param cutoffUploadAt 截止时间,只处理 upload_at < cutoffUploadAt 的数据 + */ + public List> queryFirstNByIdGreaterThan(String schema, String table, int limit, Long lastId, Long lastUploadAt, Long cutoffUploadAt) throws SQLException { + String sql; + if (lastId == null) { + // 第一次查询 + sql = String.format("SELECT * FROM %s.%s WHERE upload_at < ? ORDER BY upload_at ASC, id ASC LIMIT ?", schema, table); + } else { + // 后续查询,从上次之后开始 + sql = String.format("SELECT * FROM %s.%s WHERE (upload_at > ? OR (upload_at = ? AND id > ?)) AND upload_at < ? ORDER BY upload_at ASC, id ASC LIMIT ?", schema, table); + } + logger.info("queryFirstNByIdGreaterThan sql: {}, limit: {}, lastId: {}, lastUploadAt: {}, cutoffUploadAt: {}", + sql, limit, lastId, lastUploadAt, cutoffUploadAt); + + try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); + PreparedStatement ps = conn.prepareStatement(sql)) { + + int paramIndex = 1; + if (lastId != null) { + ps.setLong(paramIndex++, lastUploadAt); + ps.setLong(paramIndex++, lastUploadAt); + ps.setLong(paramIndex++, lastId); + } + ps.setLong(paramIndex++, cutoffUploadAt); + ps.setInt(paramIndex, limit); + + ResultSet rs = ps.executeQuery(); + ResultSetMetaData meta = rs.getMetaData(); + List> list = new ArrayList<>(); + + // 将结果集转换为Map列表 + while (rs.next()) { + Map row = new LinkedHashMap<>(); + for (int i = 1; i <= meta.getColumnCount(); i++) { + row.put(meta.getColumnName(i), rs.getObject(i)); + } + list.add(row); + } + return list; + } + } + } diff --git a/src/main/java/com/dashboard/aws/lambda/service/S3Service.java b/src/main/java/com/dashboard/aws/lambda/service/S3Service.java index 56ab85d..f008d0b 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/S3Service.java +++ b/src/main/java/com/dashboard/aws/lambda/service/S3Service.java @@ -9,10 +9,16 @@ import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Object; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; public class S3Service { @@ -96,4 +102,52 @@ public class S3Service { return null; } } + + /** + * 列出指定前缀下的所有文件key + * 使用continuationToken处理分页,获取全部结果 + */ + public List listKeysWithPrefix(String prefix) { + List keys = new ArrayList<>(); + String continuationToken = null; + + do { + // 构建列表请求 + ListObjectsV2Request.Builder builder = ListObjectsV2Request.builder() + .bucket(bucket) + .prefix(prefix); + + // 如果有续延token,设置它以获取下一页 + if (continuationToken != null) { + builder.continuationToken(continuationToken); + } + + ListObjectsV2Response response = s3.listObjectsV2(builder.build()); + + // 收集当前页的文件key + for (S3Object s3Object : response.contents()) { + keys.add(s3Object.key()); + } + + // 获取下一页的续延token + continuationToken = response.nextContinuationToken(); + } while (continuationToken != null); // 还有下一页时继续循环 + + return keys; + } + + /** + * 删除指定key的文件 + */ + public void delete(String key) { + Objects.requireNonNull(key, "S3 key不能为空"); + + DeleteObjectRequest request = DeleteObjectRequest.builder() + .bucket(bucket) + .key(key) + .build(); + + s3.deleteObject(request); + logger.info("delete success: {}", key); + } } diff --git a/src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java b/src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java new file mode 100644 index 0000000..052d5be --- /dev/null +++ b/src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java @@ -0,0 +1,128 @@ +package com.dashboard.aws.lambda; + + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * 把历史数据从旧格式迁移到我们现在使用的新格式 table/date/001.csv ,这样新旧数据保持一致 + */ +public class S3CsvReorganizer { + + private static final String BUCKET = "dashboard-for-backup"; + + /** + * 匹配: + * dashboard-for-backup/dashboard_record_measure/2025-12-25.csv + */ + private static final Pattern FILE_PATTERN = + Pattern.compile("^(.*/)(\\d{4}-\\d{2}-\\d{2})\\.csv$"); + + /** + * 重组指定目录下的 CSV 文件 + * + * 例如: + * dashboard-for-backup/dashboard_record_measure/ + */ + public void reorganize(String prefix) { + S3Client s3 = S3Client.builder() + .region(Region.AP_NORTHEAST_1) + .httpClient(UrlConnectionHttpClient.builder().build()) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create( + "AKA", + "IFwUPLd") + ) + ) + .build(); + + String continuationToken = null; + + do { + // 构建列表请求 + ListObjectsV2Request.Builder builder = ListObjectsV2Request.builder() + .bucket(BUCKET) + .prefix(prefix); + + // 如果有续延token,设置它以获取下一页 + if (continuationToken != null) { + builder.continuationToken(continuationToken); + } + + ListObjectsV2Response response = s3.listObjectsV2(builder.build()); + + for (S3Object obj : response.contents()) { + String oldKey = obj.key(); + + // 跳过目录对象 + if (oldKey.endsWith("/")) { + continue; + } + + Matcher matcher = FILE_PATTERN.matcher(oldKey); + if (!matcher.matches()) { + continue; + } + + String basePath = matcher.group(1); // dashboard-for-backup/dashboard_record_measure/ + String date = matcher.group(2); // 2025-12-25 + + String newKey = basePath + date + "/001.csv"; + + // 如果目标文件和源文件相同则跳过 + if (oldKey.equals(newKey)) { + continue; + } + + System.out.println("Moving:"); + System.out.println(" From: " + oldKey); + System.out.println(" To : " + newKey); + + // 1. 服务端复制 + s3.copyObject( + CopyObjectRequest.builder() + .sourceBucket(BUCKET) + .sourceKey(oldKey) + .destinationBucket(BUCKET) + .destinationKey(newKey) + .build() + ); + + // 2. 删除旧文件 +// s3.deleteObject( +// DeleteObjectRequest.builder() +// .bucket(BUCKET) +// .key(oldKey) +// .build() +// ); + + System.out.println("Moved successfully."); + } + + continuationToken = response.isTruncated() + ? response.nextContinuationToken() + : null; + + } while (continuationToken != null); + } + + public static void main(String[] args) { + S3CsvReorganizer reorganizer = new S3CsvReorganizer(); + + reorganizer.reorganize( + "dashboard_record_accumulate/" + ); + } +}