Browse Source

同步s3到mysql

master
review512jwy@163.com 15 hours ago
parent
commit
1a27bf9862
  1. 12
      src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java
  2. 78
      src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java
  3. 43
      src/main/java/com/dashboard/aws/lambda/service/MySQLService.java
  4. 16
      src/main/java/com/dashboard/aws/lambda/service/S3Service.java
  5. 23
      src/main/java/com/dashboard/aws/lambda/util/CsvUtil.java
  6. 40
      src/main/java/com/dashboard/aws/lambda/util/DateUtil.java

12
src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java

@ -22,8 +22,12 @@ public class MySQLToS3Handler implements RequestHandler<Map<String, Object>, Str
@Override
public String handleRequest(Map<String, Object> event, Context context) {
try {
LocalDate targetDate = LocalDate.now(ZoneId.of("Asia/Tokyo")).minusDays(3);
long cutoff = targetDate.atStartOfDay(ZoneId.of("Asia/Tokyo")).toInstant().toEpochMilli();
LocalDate now = LocalDate.now(ZoneId.of("Asia/Tokyo"));
LocalDate startDate = now.minusDays(5); // 5天前
LocalDate endDate = now.minusDays(3); // 3天前
long startTs = startDate.atStartOfDay(ZoneId.of("Asia/Tokyo")).toInstant().toEpochMilli();
long endTs = endDate.atStartOfDay(ZoneId.of("Asia/Tokyo")).toInstant().toEpochMilli();
// 查询符合条件的企业ID
List<Long> companyIds = mysqlService.getActiveCompanyIds();
@ -34,7 +38,7 @@ public class MySQLToS3Handler implements RequestHandler<Map<String, Object>, Str
for (String table : List.of("dashboard_record_accumulate", "dashboard_record_measure")) {
// 查询旧数据
List<Map<String, Object>> rows = mysqlService.queryOldData(schema, table, cutoff);
List<Map<String, Object>> rows = mysqlService.queryOldData(schema, table, startTs, endTs);
if (rows.isEmpty()) {
logger.info("[{}][{}] no data....", schema, table);
continue;
@ -71,7 +75,7 @@ public class MySQLToS3Handler implements RequestHandler<Map<String, Object>, Str
// 全部上传成功才删除数据
if (allSuccess) {
mysqlService.deleteOldData(schema, table, cutoff);
mysqlService.deleteOldData(schema, table, startTs, endTs);
logger.info("[{}][{}] delete old data", schema, table);
} else {
logger.error("[{}][{}] not all files uploaded successfully, skip delete", schema, table);

78
src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java

@ -0,0 +1,78 @@
package com.dashboard.aws.lambda.handler;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.dashboard.aws.lambda.service.MySQLService;
import com.dashboard.aws.lambda.service.S3Service;
import com.dashboard.aws.lambda.util.CsvUtil;
import com.dashboard.aws.lambda.util.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.*;
public class S3ToMySQLHandler implements RequestHandler<Map<String, Object>, String> {
private static final Logger logger = LoggerFactory.getLogger(S3ToMySQLHandler.class);
private final MySQLService mysqlService = new MySQLService();
private final S3Service s3Service = new S3Service();
@Override
public String handleRequest(Map<String, Object> event, Context context) {
try {
// 同步目标日期(这里同步 3 天前的数据)
// LocalDate targetDate = LocalDate.now(ZoneId.of("Asia/Tokyo")).minusDays(3);
// String dateStr = targetDate.format(java.time.format.DateTimeFormatter.ISO_DATE);
// String dateStr = "2025-10-08";
LocalDate now = LocalDate.now(ZoneId.of("Asia/Tokyo"));
String dateStr = DateUtil.getLastYearSameIsoWeekDayStr(now);
// 查询符合条件的企业ID
List<Long> companyIds = mysqlService.getActiveCompanyIds();
logger.info("company id list: {}", companyIds);
List<String> tables = List.of("dashboard_record_accumulate", "dashboard_record_measure");
for (Long companyId : companyIds) {
String schema = "data_center_dongjian_" + companyId;
for (String table : tables) {
// 构造 S3 文件路径
String s3Key = String.format("%s/%s/%s.csv", table, companyId, dateStr);
logger.info("processing s3 file: {}", s3Key);
// 下载 CSV
byte[] content = s3Service.download(s3Key);
if (content == null || content.length == 0) {
logger.warn("[{}][{}] file empty or not found: {}", schema, table, s3Key);
continue;
}
// 解析 CSV
List<Map<String, String>> rows = CsvUtil.parseCsv(content);
if (rows.isEmpty()) {
logger.warn("[{}][{}] csv empty: {}", schema, table, s3Key);
continue;
}
// 插入到 MySQL
try {
int inserted = mysqlService.insertRows(schema, table, rows);
logger.info("[{}][{}] inserted {} rows", schema, table, inserted);
} catch (Exception e) {
logger.error("[{}][{}] insert failed -> {}", schema, table, s3Key, e);
}
}
}
return "sync process finished";
} catch (Exception e) {
logger.error("sync error", e);
return "sync error:" + e.getMessage();
}
}
}

43
src/main/java/com/dashboard/aws/lambda/service/MySQLService.java

@ -37,14 +37,15 @@ public class MySQLService {
}
}
public List<Map<String, Object>> queryOldData(String schema, String table, long cutoff) throws SQLException {
String sql = String.format("SELECT * FROM %s.%s WHERE upload_at < ?", schema, table);
logger.info("query sql: {}, cutoff: {}", sql, cutoff);
public List<Map<String, Object>> queryOldData(String schema, String table, long startTs, long endTs) throws SQLException {
String sql = String.format("SELECT * FROM %s.%s WHERE upload_at >= ? AND upload_at < ?", schema, table);
logger.info("query sql: {}, start: {}, end: {}", sql, startTs, endTs);
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setLong(1, cutoff);
ps.setLong(1, startTs);
ps.setLong(2, endTs);
ResultSet rs = ps.executeQuery();
ResultSetMetaData meta = rs.getMetaData();
@ -61,17 +62,43 @@ public class MySQLService {
}
}
public void deleteOldData(String schema, String table, long cutoff) throws SQLException {
String sql = String.format("DELETE FROM %s.%s WHERE upload_at < ?", schema, table);
logger.info("deleteOldData sql: {}, cutoff: {}", sql, cutoff);
public void deleteOldData(String schema, String table, long startTs, long endTs) throws SQLException {
String sql = String.format("DELETE FROM %s.%s WHERE upload_at >= ? AND upload_at < ?", schema, table);
logger.info("deleteOldData sql: {}, start: {}, end: {}", sql, startTs, endTs);
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setLong(1, cutoff);
ps.setLong(1, startTs);
ps.setLong(2, endTs);
int deleted = ps.executeUpdate();
logger.info("[{}.{}] 删除 {} 条记录", schema, table, deleted);
}
}
public int insertRows(String schema, String table, List<Map<String, String>> rows) throws SQLException {
if (rows == null || rows.isEmpty()) return 0;
Map<String, String> first = rows.get(0);
List<String> columns = new ArrayList<>(first.keySet());
String columnStr = String.join(",", columns);
String placeholders = String.join(",", Collections.nCopies(columns.size(), "?"));
String sql = String.format("INSERT INTO %s.%s (%s) VALUES (%s)", schema, table, columnStr, placeholders);
logger.info("insert sql: {}", sql);
int total = 0;
try (java.sql.Connection conn = java.sql.DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
for (Map<String, String> row : rows) {
for (int i = 0; i < columns.size(); i++) {
ps.setString(i + 1, row.getOrDefault(columns.get(i), null));
}
ps.addBatch();
}
int[] results = ps.executeBatch();
for (int r : results) total += r;
}
return total;
}
}

16
src/main/java/com/dashboard/aws/lambda/service/S3Service.java

@ -4,10 +4,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
@ -80,4 +82,18 @@ public class S3Service {
s3.close();
}
}
public byte[] download(String key) {
try {
ResponseBytes<?> objectBytes = s3.getObjectAsBytes(GetObjectRequest.builder()
.bucket(bucket)
.key(key)
.build());
logger.info("download success: {}", key);
return objectBytes.asByteArray();
} catch (Exception e) {
logger.error("download failed: {}", key, e);
return null;
}
}
}

23
src/main/java/com/dashboard/aws/lambda/util/CsvUtil.java

@ -3,6 +3,8 @@ package com.dashboard.aws.lambda.util;
import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -26,4 +28,25 @@ public class CsvUtil {
writer.flush();
return out.toString(StandardCharsets.UTF_8).getBytes(StandardCharsets.UTF_8);
}
public static List<Map<String, String>> parseCsv(byte[] content) throws java.io.IOException {
List<Map<String, String>> list = new ArrayList<>();
try (java.io.BufferedReader reader = new java.io.BufferedReader(
new java.io.InputStreamReader(new java.io.ByteArrayInputStream(content), java.nio.charset.StandardCharsets.UTF_8))) {
String line = reader.readLine();
if (line == null) return list;
String[] headers = line.split(",");
while ((line = reader.readLine()) != null) {
String[] values = line.split(",", -1);
Map<String, String> row = new LinkedHashMap<>();
for (int i = 0; i < headers.length; i++) {
row.put(headers[i], i < values.length ? values[i] : "");
}
list.add(row);
}
}
return list;
}
}

40
src/main/java/com/dashboard/aws/lambda/util/DateUtil.java

@ -0,0 +1,40 @@
package com.dashboard.aws.lambda.util;
import java.time.DayOfWeek;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.time.temporal.IsoFields;
public class DateUtil {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ISO_DATE;
/**
* 根据指定日期获取去年同 ISO 周同星期的日期
* @param date 目标日期
* @return 去年同周同日 LocalDate
*/
public static LocalDate getLastYearSameIsoWeekDay(LocalDate date) {
if (date == null) throw new IllegalArgumentException("date不能为空");
int isoWeekYear = date.get(IsoFields.WEEK_BASED_YEAR);
int isoWeekNumber = date.get(IsoFields.WEEK_OF_WEEK_BASED_YEAR);
DayOfWeek dayOfWeek = date.getDayOfWeek();
return LocalDate.now()
.with(IsoFields.WEEK_BASED_YEAR, isoWeekYear - 1)
.with(IsoFields.WEEK_OF_WEEK_BASED_YEAR, isoWeekNumber)
.with(dayOfWeek);
}
/**
* 根据指定日期获取去年同 ISO 周同星期的日期字符串
* @param date 目标日期
* @return 去年同周同日 yyyy-MM-dd
*/
public static String getLastYearSameIsoWeekDayStr(LocalDate date) {
LocalDate lastYearDate = getLastYearSameIsoWeekDay(date);
return lastYearDate.format(FORMATTER);
}
}
Loading…
Cancel
Save