Browse Source

批量循环存储数据

master
review512jwy@163.com 2 weeks ago
parent
commit
b4c3c4e4b3
  1. 5
      pom.xml
  2. 1
      src/main/java/com/dashboard/aws/lambda/Constants.java
  3. 237
      src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java
  4. 79
      src/main/java/com/dashboard/aws/lambda/service/MySQLService.java
  5. 54
      src/main/java/com/dashboard/aws/lambda/service/S3Service.java
  6. 128
      src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java

5
pom.xml

@ -7,6 +7,11 @@
<version>1.0-SNAPSHOT</version>
<name>dashboard-statistics-lambda Maven Webapp</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>

1
src/main/java/com/dashboard/aws/lambda/Constants.java

@ -5,5 +5,6 @@ import java.util.List;
public class Constants {
public static final List<String> tables = List.of("dashboard_record_accumulate", "dashboard_record_measure");
// public static final List<String> tables = List.of("dashboard_record_accumulate_test", "dashboard_record_measure_test");
}

237
src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java

@ -0,0 +1,237 @@
package com.dashboard.aws.lambda.handler;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.dashboard.aws.lambda.Constants;
import com.dashboard.aws.lambda.service.MySQLService;
import com.dashboard.aws.lambda.service.S3Service;
import com.dashboard.aws.lambda.util.CsvUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>, String> {
private static final Logger logger = LoggerFactory.getLogger(MySQLBatchToS3Handler.class);
private static final String DB_SCHEMA = System.getenv("DB_SCHEMA");
private static final int BATCH_SIZE = Integer.parseInt(System.getenv("BATCH_SIZE"));
private static final String CUTOFF_DATE_STR = System.getenv("CUTOFF_DATE");
private static final ZoneId TOKYO_ZONE = ZoneId.of("Asia/Tokyo");
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final Pattern FILE_PATTERN = Pattern.compile(".*/(\\d{3})\\.csv$");
private final MySQLService mysqlService = new MySQLService();
private final S3Service s3Service = new S3Service();
@Override
public String handleRequest(Map<String, Object> event, Context context) {
long startTime = System.currentTimeMillis();
try {
// 确定截止日期:优先使用环境变量,否则默认使用东京时区3天前
LocalDate cutoffDate;
if (CUTOFF_DATE_STR != null && !CUTOFF_DATE_STR.isEmpty()) {
cutoffDate = LocalDate.parse(CUTOFF_DATE_STR);
logger.info("Using cutoff date from env: {}", cutoffDate);
} else {
cutoffDate = LocalDate.now(TOKYO_ZONE).minusDays(2);
logger.info("Using default cutoff date (2 days ago): {}", cutoffDate);
}
// 转换为毫秒(日本时区当天0点)
long cutoffUploadAt = cutoffDate.atStartOfDay(TOKYO_ZONE).toInstant().toEpochMilli();
logger.info("Cutoff upload_at: {}", cutoffUploadAt);
// 处理每个配置的表
for (String table : Constants.tables) {
processTable(table, cutoffUploadAt, startTime);
}
long totalTime = System.currentTimeMillis() - startTime;
logger.info("All tables processed, total time: {}ms", totalTime);
return "batch backup process finished";
} catch (Exception e) {
logger.error("batch backup error", e);
return "batch backup error:" + e.getMessage();
}
}
/**
* 处理单个表的数据备份
* 循环查询批次数据直到没有数据或遇到截止日期或超时
*/
private void processTable(String table, long cutoffUploadAt, long overallStartTime) {
logger.info("[{}][{}] start processing batch size: {}", DB_SCHEMA, table, BATCH_SIZE);
long tableStartTime = System.currentTimeMillis();
Long lastProcessedId = null;
Long lastUploadAt = null;
boolean shouldContinue = true;
while (shouldContinue) {
// 检查是否快超时:距离15分钟还剩小于2分钟时停止,留点余量时间,防止lambda超时
long elapsedTime = System.currentTimeMillis() - overallStartTime;
long remainingTime = 15 * 60 * 1000 - elapsedTime;
if (remainingTime < 2 * 60 * 1000) {
logger.warn("[{}][{}] Time limit approaching, stopping. Elapsed: {}ms, Remaining: {}ms",
DB_SCHEMA, table, elapsedTime, remainingTime);
break;
}
// 查询当前批次数据
List<Map<String, Object>> rows = null;
try {
rows = mysqlService.queryFirstNByIdGreaterThan(DB_SCHEMA, table, BATCH_SIZE, lastProcessedId, lastUploadAt, cutoffUploadAt);
} catch (Exception e) {
logger.error("[{}][{}] query failed", DB_SCHEMA, table, e);
break;
}
if (rows.isEmpty()) {
logger.info("[{}][{}] no more data", DB_SCHEMA, table);
break;
}
logger.info("[{}][{}] fetched {} rows", DB_SCHEMA, table, rows.size());
// 按日期分组数据
Map<String, List<Map<String, Object>>> grouped = new HashMap<>();
Long maxIdInBatch = null;
Long maxUploadAtInBatch = null;
for (Map<String, Object> row : rows) {
Long id = (Long) row.get("id");
// 更新当前批次的最大ID和最大uploadAt
if (maxIdInBatch == null || id > maxIdInBatch) {
maxIdInBatch = id;
}
Long uploadAt = (Long) row.get("upload_at");
if (maxUploadAtInBatch == null || uploadAt > maxUploadAtInBatch) {
maxUploadAtInBatch = uploadAt;
}
// 根据upload_at转换为东京时区日期
LocalDate date = Instant.ofEpochMilli(uploadAt).atZone(TOKYO_ZONE).toLocalDate();
// 按日期分组
String dateKey = date.format(DATE_FORMATTER);
grouped.computeIfAbsent(dateKey, k -> new ArrayList<>()).add(row);
}
// 如果当前批次没有有效数据,继续下一批
if (grouped.isEmpty()) {
logger.info("[{}][{}] no valid data in this batch", DB_SCHEMA, table);
if (rows.size() < BATCH_SIZE) {
shouldContinue = false;
} else {
lastProcessedId = maxIdInBatch;
lastUploadAt = maxUploadAtInBatch;
}
continue;
}
// 每个日期分组独立处理:上传成功一个,就立即删除该分组的ID
boolean anyFailure = false;
List<Long> processedIds = new ArrayList<>();
for (Map.Entry<String, List<Map<String, Object>>> entry : grouped.entrySet()) {
long groupStartTime = System.currentTimeMillis();
String dateKey = entry.getKey();
List<Map<String, Object>> dateRows = entry.getValue();
// 收集当前分组的ID
List<Long> currentDateIds = new ArrayList<>();
for (Map<String, Object> row : dateRows) {
currentDateIds.add((Long) row.get("id"));
}
// 生成S3文件名,按序号递增
String prefix = String.format("%s/%s", table, dateKey);
int nextSeq = getNextSequenceNumber(prefix);
String s3Key = String.format("%s/%s/%03d.csv", table, dateKey, nextSeq);
try {
byte[] csvBytes = CsvUtil.toCsvBytes(dateRows);
s3Service.upload(s3Key, csvBytes);
logger.info("[{}][{}] upload success -> {}, {} rows", DB_SCHEMA, table, s3Key, dateRows.size());
// 上传成功,立即删除该分组对应的ID
try {
mysqlService.deleteByIds(DB_SCHEMA, table, currentDateIds);
logger.info("[{}][{}] delete {} records for date {}", DB_SCHEMA, table, currentDateIds.size(), dateKey);
processedIds.addAll(currentDateIds);
} catch (Exception e) {
logger.error("[{}][{}] delete failed for date {}, trying to rollback S3 file", DB_SCHEMA, table, dateKey, e);
// MySQL删除失败,回滚S3文件,保持一致性
try {
s3Service.delete(s3Key);
logger.info("[{}][{}] rollback success, deleted S3 file: {}", DB_SCHEMA, table, s3Key);
} catch (Exception rollbackEx) {
logger.error("[{}][{}] rollback failed, S3 file may be orphaned: {}", DB_SCHEMA, table, s3Key, rollbackEx);
}
anyFailure = true;
}
} catch (Exception e) {
anyFailure = true;
logger.error("[{}][{}] upload failed -> {}", DB_SCHEMA, table, s3Key, e);
}
// 记录当前分组处理耗时
long groupTime = System.currentTimeMillis() - groupStartTime;
logger.info("[{}][{}] date {} processed in {}ms", DB_SCHEMA, table, dateKey, groupTime);
}
// 更新下次查询的起始ID:只要处理了数据,就更新到已处理的最大ID
if (!processedIds.isEmpty()) {
Long maxProcessedId = processedIds.stream().max(Long::compareTo).orElse(null);
if (maxProcessedId != null) {
lastProcessedId = maxProcessedId;
lastUploadAt = maxUploadAtInBatch;
}
}
// 确定是否继续:只有全部成功且当前批次是完整批次时才继续
if (anyFailure || rows.size() < BATCH_SIZE) {
shouldContinue = false;
}
}
// 记录表处理总耗时
long tableTime = System.currentTimeMillis() - tableStartTime;
logger.info("[{}][{}] table processing finished, total time: {}ms", DB_SCHEMA, table, tableTime);
}
/**
* 获取下一个文件序号
* 查询S3上指定前缀的文件找到最大序号后+1
*/
private int getNextSequenceNumber(String prefix) {
List<String> keys = s3Service.listKeysWithPrefix(prefix);
int maxSeq = 0;
for (String key : keys) {
Matcher matcher = FILE_PATTERN.matcher(key);
if (matcher.matches()) {
int seq = Integer.parseInt(matcher.group(1));
if (seq > maxSeq) {
maxSeq = seq;
}
}
}
return maxSeq + 1;
}
}

79
src/main/java/com/dashboard/aws/lambda/service/MySQLService.java

@ -88,8 +88,8 @@ public class MySQLService {
// logger.info("insert sql: {}", sql);
int total = 0;
try (java.sql.Connection conn = java.sql.DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
PreparedStatement ps = conn.prepareStatement(sql)) {
for (Map<String, String> row : rows) {
for (int i = 0; i < columns.size(); i++) {
@ -113,7 +113,7 @@ public class MySQLService {
);
logger.info("deleteBeforeDate sql: {}, cutoff:{}", sql, cutoff);
try (Connection conn = java.sql.DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setInt(1, cutoff);
int deleted = ps.executeUpdate();
@ -125,4 +125,77 @@ public class MySQLService {
}
}
/**
* 根据ID列表批量删除数据
* 使用IN语句一次性删除提高效率
*/
public void deleteByIds(String schema, String table, List<Long> ids) throws SQLException {
if (ids == null || ids.isEmpty()) {
return;
}
// 构建IN子句的占位符字符串:?, ?, ?, ...
String placeholders = String.join(",", Collections.nCopies(ids.size(), "?"));
String sql = String.format("DELETE FROM %s.%s WHERE id IN (%s)", schema, table, placeholders);
logger.info("deleteByIds sql: {}, ids count: {}", sql, ids.size());
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
PreparedStatement ps = conn.prepareStatement(sql)) {
// 逐个设置ID参数值
for (int i = 0; i < ids.size(); i++) {
ps.setLong(i + 1, ids.get(i));
}
int deleted = ps.executeUpdate();
logger.info("[{}.{}] 删除 {} 条记录", schema, table, deleted);
}
}
/**
* 查询ID大于指定值的前N条数据 upload_at + id 升序
* 用于分页分批处理数据
* @param lastId 上次处理的最大ID不包含为null时从第一条开始查询
* @param lastUploadAt 上次处理的最大 upload_at不包含
* @param cutoffUploadAt 截止时间只处理 upload_at < cutoffUploadAt 的数据
*/
public List<Map<String, Object>> queryFirstNByIdGreaterThan(String schema, String table, int limit, Long lastId, Long lastUploadAt, Long cutoffUploadAt) throws SQLException {
String sql;
if (lastId == null) {
// 第一次查询
sql = String.format("SELECT * FROM %s.%s WHERE upload_at < ? ORDER BY upload_at ASC, id ASC LIMIT ?", schema, table);
} else {
// 后续查询,从上次之后开始
sql = String.format("SELECT * FROM %s.%s WHERE (upload_at > ? OR (upload_at = ? AND id > ?)) AND upload_at < ? ORDER BY upload_at ASC, id ASC LIMIT ?", schema, table);
}
logger.info("queryFirstNByIdGreaterThan sql: {}, limit: {}, lastId: {}, lastUploadAt: {}, cutoffUploadAt: {}",
sql, limit, lastId, lastUploadAt, cutoffUploadAt);
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
PreparedStatement ps = conn.prepareStatement(sql)) {
int paramIndex = 1;
if (lastId != null) {
ps.setLong(paramIndex++, lastUploadAt);
ps.setLong(paramIndex++, lastUploadAt);
ps.setLong(paramIndex++, lastId);
}
ps.setLong(paramIndex++, cutoffUploadAt);
ps.setInt(paramIndex, limit);
ResultSet rs = ps.executeQuery();
ResultSetMetaData meta = rs.getMetaData();
List<Map<String, Object>> list = new ArrayList<>();
// 将结果集转换为Map列表
while (rs.next()) {
Map<String, Object> row = new LinkedHashMap<>();
for (int i = 1; i <= meta.getColumnCount(); i++) {
row.put(meta.getColumnName(i), rs.getObject(i));
}
list.add(row);
}
return list;
}
}
}

54
src/main/java/com/dashboard/aws/lambda/service/S3Service.java

@ -9,10 +9,16 @@ import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Object;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class S3Service {
@ -96,4 +102,52 @@ public class S3Service {
return null;
}
}
/**
* 列出指定前缀下的所有文件key
* 使用continuationToken处理分页获取全部结果
*/
public List<String> listKeysWithPrefix(String prefix) {
List<String> keys = new ArrayList<>();
String continuationToken = null;
do {
// 构建列表请求
ListObjectsV2Request.Builder builder = ListObjectsV2Request.builder()
.bucket(bucket)
.prefix(prefix);
// 如果有续延token,设置它以获取下一页
if (continuationToken != null) {
builder.continuationToken(continuationToken);
}
ListObjectsV2Response response = s3.listObjectsV2(builder.build());
// 收集当前页的文件key
for (S3Object s3Object : response.contents()) {
keys.add(s3Object.key());
}
// 获取下一页的续延token
continuationToken = response.nextContinuationToken();
} while (continuationToken != null); // 还有下一页时继续循环
return keys;
}
/**
* 删除指定key的文件
*/
public void delete(String key) {
Objects.requireNonNull(key, "S3 key不能为空");
DeleteObjectRequest request = DeleteObjectRequest.builder()
.bucket(bucket)
.key(key)
.build();
s3.deleteObject(request);
logger.info("delete success: {}", key);
}
}

128
src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java

@ -0,0 +1,128 @@
package com.dashboard.aws.lambda;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 把历史数据从旧格式迁移到我们现在使用的新格式 table/date/001.csv 这样新旧数据保持一致
*/
public class S3CsvReorganizer {
private static final String BUCKET = "dashboard-for-backup";
/**
* 匹配
* dashboard-for-backup/dashboard_record_measure/2025-12-25.csv
*/
private static final Pattern FILE_PATTERN =
Pattern.compile("^(.*/)(\\d{4}-\\d{2}-\\d{2})\\.csv$");
/**
* 重组指定目录下的 CSV 文件
*
* 例如
* dashboard-for-backup/dashboard_record_measure/
*/
public void reorganize(String prefix) {
S3Client s3 = S3Client.builder()
.region(Region.AP_NORTHEAST_1)
.httpClient(UrlConnectionHttpClient.builder().build())
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
"AKA",
"IFwUPLd")
)
)
.build();
String continuationToken = null;
do {
// 构建列表请求
ListObjectsV2Request.Builder builder = ListObjectsV2Request.builder()
.bucket(BUCKET)
.prefix(prefix);
// 如果有续延token,设置它以获取下一页
if (continuationToken != null) {
builder.continuationToken(continuationToken);
}
ListObjectsV2Response response = s3.listObjectsV2(builder.build());
for (S3Object obj : response.contents()) {
String oldKey = obj.key();
// 跳过目录对象
if (oldKey.endsWith("/")) {
continue;
}
Matcher matcher = FILE_PATTERN.matcher(oldKey);
if (!matcher.matches()) {
continue;
}
String basePath = matcher.group(1); // dashboard-for-backup/dashboard_record_measure/
String date = matcher.group(2); // 2025-12-25
String newKey = basePath + date + "/001.csv";
// 如果目标文件和源文件相同则跳过
if (oldKey.equals(newKey)) {
continue;
}
System.out.println("Moving:");
System.out.println(" From: " + oldKey);
System.out.println(" To : " + newKey);
// 1. 服务端复制
s3.copyObject(
CopyObjectRequest.builder()
.sourceBucket(BUCKET)
.sourceKey(oldKey)
.destinationBucket(BUCKET)
.destinationKey(newKey)
.build()
);
// 2. 删除旧文件
// s3.deleteObject(
// DeleteObjectRequest.builder()
// .bucket(BUCKET)
// .key(oldKey)
// .build()
// );
System.out.println("Moved successfully.");
}
continuationToken = response.isTruncated()
? response.nextContinuationToken()
: null;
} while (continuationToken != null);
}
public static void main(String[] args) {
S3CsvReorganizer reorganizer = new S3CsvReorganizer();
reorganizer.reorganize(
"dashboard_record_accumulate/"
);
}
}
Loading…
Cancel
Save