From e63aa1d310b4fde2bc8acd2705fdee69dc256a57 Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Wed, 10 Dec 2025 22:26:54 +0800 Subject: [PATCH] =?UTF-8?q?rawData=5Frealtime=E6=8D=A2=E6=88=90redis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datacenter/business/common/Constants.java | 3 + .../service/impl/CommonServiceImpl.java | 434 ++++++++---------- .../business/util/redis/RedisUtil.java | 7 +- 3 files changed, 190 insertions(+), 254 deletions(-) diff --git a/data-center-business-common/src/main/java/com/techsor/datacenter/business/common/Constants.java b/data-center-business-common/src/main/java/com/techsor/datacenter/business/common/Constants.java index f5817b9..c72a51b 100644 --- a/data-center-business-common/src/main/java/com/techsor/datacenter/business/common/Constants.java +++ b/data-center-business-common/src/main/java/com/techsor/datacenter/business/common/Constants.java @@ -78,5 +78,8 @@ public class Constants { public static final String RILI_API_TOKEN = "RILI_API_TOKEN"; + public static final int REDIS_PARTITION_NUM = 100; + + public static final String REDIS_RAW_DATA_REALTIME = "rawData_realtime:"; } diff --git a/data-center-business-service/src/main/java/com/techsor/datacenter/business/service/impl/CommonServiceImpl.java b/data-center-business-service/src/main/java/com/techsor/datacenter/business/service/impl/CommonServiceImpl.java index 09cf740..9094735 100644 --- a/data-center-business-service/src/main/java/com/techsor/datacenter/business/service/impl/CommonServiceImpl.java +++ b/data-center-business-service/src/main/java/com/techsor/datacenter/business/service/impl/CommonServiceImpl.java @@ -120,24 +120,24 @@ import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignReques import software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest; /** - * + * * @author jwy-style * */ @Service public class CommonServiceImpl implements CommonService { - + private static Logger logger = LoggerFactory.getLogger(CommonServiceImpl.class); - + @Value("${amazon.aws.accesskey}") private String awsAccessKeyId; @Value("${amazon.aws.secretkey}") private String awsAccessSecret; - + @Value("${amazon.aws.bucket.asset:tokyobuild-stg-databucket-923770123186}") private String awsBucketAdress; - + @Value("${spring.redis.host}") private String redisHost; @@ -152,41 +152,41 @@ public class CommonServiceImpl implements CommonService { @Value("${third.problem.reports.token}") private String thirdProblemReportsToken; - + @Resource public CacheManager cacheManager; - + @Autowired private RedisUtil redisUtil; @Autowired private MsgLanguageChange msgLanguageChange; - + @Autowired private AthenaQuery athenaQuery; - + @Value("${spring.datasource.url}") private String dbUrl; - + @Value("${datacenter.v1.query.url}") private String datacenterV1QueryUrl; - + @Resource(name="trustRestTemplate") private RestTemplate trustRestTemplate; - + private static final String CONTENT_START_END_REGEX = "\\((.*?)\\)"; - + private static final String CONTENT_DELIMITER_REGEX = "\\$\\['(.*)'\\]([<>=]{1,2})(-?\\d+(?:\\.\\d+)?) && \\$\\['\\1'\\]([<>=]{1,2})(-?\\d+(?:\\.\\d+)?)";; - + @Autowired private DynamicRouteDataSource dynamicDataSource; @Autowired private DataSourceAdminConfig dataSourceAdminConfig; - + @Autowired private CommonOpt commonOpt; - + @Autowired private CompanyMapperExt companyMapperExt; @Autowired @@ -217,8 +217,8 @@ public class CommonServiceImpl implements CommonService { private BasicBuildingMapperExt basicBuildingMapperExt; @Autowired private BasicFloorMapperExt basicFloorMapperExt; - - + + @Override @@ -242,7 +242,7 @@ public class CommonServiceImpl implements CommonService { paramMap.put("apikey", apikey); ApikeyInfo2 result = basicCompanyMapperExt.getAuroraInfoByApikey(paramMap); resp.setCode(200); - + if (result != null) { resp.setData(true); } else { @@ -269,16 +269,16 @@ public class CommonServiceImpl implements CommonService { try { HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.add("Apikey", datacenterV1QueryParams.getApikey()); - + Map params = new HashMap<>(); params.put("startTime", datacenterV1QueryParams.getStartTime()); params.put("endTime", datacenterV1QueryParams.getEndTime()); params.put("deviceId", datacenterV1QueryParams.getDeviceId()); params.put("hashId", datacenterV1QueryParams.getHashId()); params.put("date", datacenterV1QueryParams.getDate()); - + ResponseEntity responseEntity = doDatacenterV1QueryRequest(params, httpHeaders, 3); - + ObjectMapper objectMapper = new ObjectMapper(); simpleDataResponse = objectMapper.readValue(responseEntity.getBody(), SimpleDataResponse.class); } catch (Exception e) { @@ -286,7 +286,7 @@ public class CommonServiceImpl implements CommonService { simpleDataResponse.setCode(ResponseCode.SERVER_ERROR); simpleDataResponse.setMsg(ResponseCode.SERVER_ERROR_MSG); } - + return simpleDataResponse; } @@ -327,7 +327,7 @@ public class CommonServiceImpl implements CommonService { @Override public SimpleDataResponse logEmailSentResult(JSONObject jsonObj) { try { - + //get large company id Map paramMap = new HashMap<>(); paramMap.put("companyId", jsonObj.getString("companyId")); @@ -336,10 +336,10 @@ public class CommonServiceImpl implements CommonService { logger.error("logEmailSentResult error: company not found"); return SimpleDataResponse.fail(ResponseCode.MSG_ERROR, "company not found"); } - + DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.setCurrentDataSourceKey(Constants.DATASOURCE_PREFIX+ apikeyInfo.getId()); - + MailSentResult mailSentResult = new MailSentResult(); mailSentResult.setDeviceId(jsonObj.getString("deviceId")); mailSentResult.setErrorMsg(jsonObj.getString("errorMsg")); @@ -352,9 +352,9 @@ public class CommonServiceImpl implements CommonService { mailSentResult.setDeviceName(jsonObj.getString("deviceName")); mailSentResult.setDeviceSn(jsonObj.getString("deviceSn")); mailSentResult.setAlertType(jsonObj.getInteger("alertType")); - + mailSentResultMapperExt.insertSelective(mailSentResult); - + return SimpleDataResponse.success(); } catch (Exception e) { SimpleDataResponse resp = new SimpleDataResponse(); @@ -385,17 +385,17 @@ public class CommonServiceImpl implements CommonService { if (null == apikeyInfo) { return SimpleDataResponse.fail(ResponseCode.MSG_ERROR, "Apikey is invalid"); } - if (StringUtils.isBlank(pageSearchParam.getSymbol()) || + if (StringUtils.isBlank(pageSearchParam.getSymbol()) || (StringUtils.isBlank(pageSearchParam.getBuildingName()) && StringUtils.isBlank(pageSearchParam.getUdfBuildingId()))) { return SimpleDataResponse.fail(ResponseCode.MSG_ERROR, "symbol or buildingName/udfBuildingId is required"); } ApikeyInfo2 companyInfo = basicCompanyMapperExt.getAuroraInfoByApikey(paramMap); long companyId = companyInfo.getId(); - + DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.setCurrentDataSourceKey(Constants.DATASOURCE_PREFIX+ apikeyInfo.getId()); - + if (StringUtils.isBlank(pageSearchParam.getCompanyIds())) { pageSearchParam.setCompanyIdList(Arrays.asList(companyId)); } else { @@ -421,55 +421,29 @@ public class CommonServiceImpl implements CommonService { List deviceInfos = deviceInfoMapperExt.getInfo4QueryAssetInfo(DeviceParamMap); // Query latest data and time if Aurora URL exists and devices are found - if (StringUtils.isNotBlank(apikeyInfo.getAuroraUrl()) && CollectionUtils.isNotEmpty(deviceInfos)) { - - Class.forName("com.mysql.cj.jdbc.Driver"); - - String regex = "(jdbc:mysql://)([^/]+)(/data_center_aeon_admin.*)"; - Pattern pattern = Pattern.compile(regex); - Matcher matcher = pattern.matcher(dbUrl); - String newJdbcUrl = ""; - if (matcher.find()) { - newJdbcUrl = matcher.replaceAll("$1" + apikeyInfo.getAuroraUrl() + "$3"); - } - - try (Connection conn = DriverManager.getConnection( - newJdbcUrl.replace("data_center_aeon_admin", "aeon") + "&allowPublicKeyRetrieval=true", - DESUtil.decrypt(apikeyInfo.getAuroraUsername(), Constants.DES_SALT), - DESUtil.decrypt(apikeyInfo.getAuroraPassword(), Constants.DES_SALT))) { - - for (ApiDeviceInfoVO apiDeviceInfoVO : deviceInfos) { - String sql = "select rawData, receive_ts from rawData_realtime where deviceId = ? limit 1"; - logger.info("queryAssetInfo aurora sql:" + sql); - - try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) { - preparedStatement.setString(1, apiDeviceInfoVO.getDeviceId()); - - ResultSet retult = preparedStatement.executeQuery(); - - while (retult.next()) { - String rawData = retult.getString("rawData"); - Long receiveTs = retult.getLong("receive_ts"); - apiDeviceInfoVO.setLatestDataTime(receiveTs); - apiDeviceInfoVO.setLatestRawData(rawData); - - JSONObject jsonObject = JSON.parseObject(rawData, Feature.OrderedField); - // 遍历所有键值对 - List valueList = new ArrayList<>(); - for (String key : jsonObject.keySet()) { - valueList.add(jsonObject.get(key)); - } - apiDeviceInfoVO.setDataValue(StringUtils.join(valueList, ",")); - } - - preparedStatement.close(); - } + for (ApiDeviceInfoVO apiDeviceInfoVO : deviceInfos) { + String deviceId = apiDeviceInfoVO.getDeviceId(); + // 计算分片 + int partitionIndex = Math.abs(deviceId.hashCode()) % Constants.REDIS_PARTITION_NUM; + String redisKey = Constants.REDIS_RAW_DATA_REALTIME + partitionIndex; + + // 从 Redis Hash 获取 JSON + Object redisData = redisUtil.HGet(redisKey, deviceId); + if (redisData != null) { + JSONObject jsonObject = JSON.parseObject(JSON.toJSONString(redisData), Feature.OrderedField); + Long receiveTs = jsonObject.getLong("receive_ts"); + String rawData = jsonObject.getString("rawData"); + + apiDeviceInfoVO.setLatestRawData(rawData); + apiDeviceInfoVO.setLatestDataTime(receiveTs); + + // 遍历所有键值对 + JSONObject rawDataObj = JSON.parseObject(rawData, Feature.OrderedField); + List valueList = new ArrayList<>(); + for (String key : rawDataObj.keySet()) { + valueList.add(rawDataObj.get(key)); } - - conn.close(); - } catch (Exception e) { - logger.error("queryAssetInfo aurora query error", e); - return SimpleDataResponse.fail(ResponseCode.SERVER_ERROR, ResponseCode.SERVER_ERROR_MSG); + apiDeviceInfoVO.setDataValue(StringUtils.join(valueList, ",")); } } @@ -504,10 +478,10 @@ public class CommonServiceImpl implements CommonService { ApikeyInfo2 companyInfo = basicCompanyMapperExt.getAuroraInfoByApikey(paramMap); long companyId = companyInfo.getId(); - + DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.setCurrentDataSourceKey(Constants.DATASOURCE_PREFIX+ apikeyInfo.getId()); - + if (StringUtils.isBlank(pageSearchParam.getCompanyIds())) { pageSearchParam.setCompanyIdList(Arrays.asList(companyId)); } else { @@ -584,22 +558,22 @@ public class CommonServiceImpl implements CommonService { responseSetThresholdV1.setMessage("Invalid apikey"); return responseSetThresholdV1; } - + ApikeyInfo2 companyInfo = basicCompanyMapperExt.getAuroraInfoByApikey(companyParamMap); long companyId = companyInfo.getId(); - + DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.setCurrentDataSourceKey(Constants.DATASOURCE_PREFIX+ apikeyInfo.getId()); - + // Convert the set back to a string with commas String finalTargetIds = String.join(",", uniqueTargetId); Map idParamMap = new HashMap<>(); idParamMap.put("companyId", companyId); idParamMap.put("targetIds", finalTargetIds); - - + + Set dbTargetIdSet = doCheckTargetIds(idParamMap, uniqueTargetId); - + if (!dbTargetIdSet.equals(uniqueTargetId)) { Set difference = new HashSet<>(uniqueTargetId); difference.removeAll(dbTargetIdSet); @@ -738,7 +712,7 @@ public class CommonServiceImpl implements CommonService { if (null == apikeyInfo) { responseQueryStatusV1.setMessage("Invalid apikey"); return responseQueryStatusV1; - } + } // else { // companyId = companyInfo.getId(); // } @@ -856,10 +830,10 @@ public class CommonServiceImpl implements CommonService { ApikeyInfo2 companyInfo = basicCompanyMapperExt.getAuroraInfoByApikey(paramMap); long companyId = companyInfo.getId(); - + DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.setCurrentDataSourceKey(Constants.DATASOURCE_PREFIX+ apikeyInfo.getId()); - + if (StringUtils.isBlank(apiAssetSearchByClassParams.getCompanyIds())) { apiAssetSearchByClassParams.setCompanyIdList(Arrays.asList(companyId)); } else { @@ -948,7 +922,7 @@ public class CommonServiceImpl implements CommonService { paramMap2.put("assetSymbol", apiAlarmDeviceSearchParams.getAssetSymbol()); paramMap2.put("udfBuildingId", apiAlarmDeviceSearchParams.getUdfBuildingId()); paramMap2.put("companyId", companyId); - + deviceInfoVOs = deviceInfoMapperExt.getAlarmDeviceInfo(paramMap2); @@ -969,7 +943,7 @@ public class CommonServiceImpl implements CommonService { DESUtil.decrypt(apikeyInfo.getAuroraUsername(), Constants.DES_SALT), DESUtil.decrypt(apikeyInfo.getAuroraPassword(), Constants.DES_SALT))) { - for (ApiAlarmDeviceInfoVO apiAlarmDeviceInfoVO : deviceInfoVOs) { + for (ApiAlarmDeviceInfoVO apiAlarmDeviceInfoVO : deviceInfoVOs) { // String sql = " select rawData, receive_ts, alertTitle, alertLevel,alertLevelName,alertTypeName from "+formatRawDataWithDate()+" where deviceId = '" + apiAlarmDeviceInfoVO.getDeviceId() + "' order by receive_ts desc limit 1" ; // String sql = " select rawData, receive_ts, alertTitle, alertLevel, alertTypeName from "+formatRawDataWithDate()+" where deviceId = '" + apiAlarmDeviceInfoVO.getDeviceId() + "' order by receive_ts desc limit 1" ; String sql = "select rawData, receive_ts, alertTitle, alertLevel, alertLevelName, alertTypeName from alertData where deviceId = ? order by receive_ts desc limit 1"; @@ -979,7 +953,7 @@ public class CommonServiceImpl implements CommonService { preparedStatement.setString(1, apiAlarmDeviceInfoVO.getDeviceId()); ResultSet retult = preparedStatement.executeQuery(); - + while (retult.next()) { String rawData = retult.getString("rawData"); Long receiveTs = retult.getLong("receive_ts"); @@ -1002,7 +976,7 @@ public class CommonServiceImpl implements CommonService { } apiAlarmDeviceInfoVO.setDataValue(StringUtils.join(valueList, ",")); } - + preparedStatement.close(); } } @@ -1047,8 +1021,8 @@ public class CommonServiceImpl implements CommonService { // return "rawData_" + formattedDate; // } // } - - + + // 定义方法改为接收参数的日期表名 private String formatRawDataWithDate(LocalDate date, Long deviceCategoryId) { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy_MM_dd"); @@ -1090,23 +1064,23 @@ public class CommonServiceImpl implements CommonService { Map paramMap = new HashMap<>(); paramMap.put("apikey", apikey); ApikeyInfo2 apikeyInfo = commonOpt.getAuroraInfoByApikey(paramMap); - + if (null == apikeyInfo) { return SimpleDataResponse.fail(ResponseCode.MSG_ERROR, "Apikey is invalid"); } - + ApikeyInfo2 companyInfo = basicCompanyMapperExt.getAuroraInfoByApikey(paramMap); long companyId = companyInfo.getId(); - + DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.setCurrentDataSourceKey(Constants.DATASOURCE_PREFIX+ apikeyInfo.getId()); - + if (StringUtils.isBlank(apiAssetClassSearchParams.getCompanyIds())) { apiAssetClassSearchParams.setCompanyIdList(Arrays.asList(companyId)); } - + List resultList = basicMonitoringAssetMapperExt.getClassInfo(apiAssetClassSearchParams); - + Map> groupedByBuilding = resultList.stream().collect(Collectors.groupingBy(ApiAssetClassInfoVO::getBuildingName)); List buildings = new ArrayList<>(); @@ -1143,7 +1117,7 @@ public class CommonServiceImpl implements CommonService { buildings.add(new Building(buildingName, classBigs)); } - + return SimpleDataResponse.success(buildings); } catch (Exception e) { @@ -1175,10 +1149,10 @@ public class CommonServiceImpl implements CommonService { if (StringUtils.isBlank(params.getAssetSymbol())) { return SimpleDataResponse.fail(ResponseCode.MSG_ERROR, "assetSymbol is required"); } - + DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.setCurrentDataSourceKey(Constants.DATASOURCE_PREFIX+ apikeyInfo.getId()); - + //资产关联的设备的targetId获取 List targetIdOfAssetList = basicMonitoringAssetMapperExt.getTargetIdListByAssetName(params); if (targetIdOfAssetList.size()==0){ @@ -1360,7 +1334,7 @@ public class CommonServiceImpl implements CommonService { if (null == companyInfo) { responseSetThresholdV1.setMessage("Invalid apikey"); return responseSetThresholdV1; - } + } // else { // dbCompanyId = companyInfo.getId()+""; // } @@ -1377,12 +1351,12 @@ public class CommonServiceImpl implements CommonService { // ApikeyInfo2 apikeyInfo2 = basicCompanyMapperExt.getAuroraInfoByApikey(companyParamMap); paramMap.put("companyId", apikeyInfo2.getId()); - + DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.setCurrentDataSourceKey(Constants.DATASOURCE_PREFIX+ companyInfo.getId()); - + Set dbTargetIdSet = doCheckTargetIds(paramMap, uniqueTargetId); - + if (dbTargetIdSet.equals(uniqueTargetId)) { responseSetThresholdV1.setCode(ResponseCode.SUCCESS); responseSetThresholdV1.setMessage("success"); @@ -1406,7 +1380,7 @@ public class CommonServiceImpl implements CommonService { List dbIdList = deviceAlertConfigMapperExt.checkTargetIds(paramMap); Set dbTargetIdSet = new HashSet<>(); dbTargetIdSet.addAll(dbIdList); - + return dbTargetIdSet; } @@ -1479,7 +1453,7 @@ public class CommonServiceImpl implements CommonService { paramMap.put("deviceIdList", deviceIdList); paramMap.put("companyId", selfCompanyInfo.getId()); dpfDeviceInfoVOs = deviceInfoMapperExt.getRiliDeviceInfo(paramMap); - + // Retrieve the set of DeviceId from DeviceInfoVOs Set deviceIdsInVOs = Optional.ofNullable(dpfDeviceInfoVOs) .orElse(Collections.emptyList()) @@ -1495,46 +1469,31 @@ public class CommonServiceImpl implements CommonService { } //Query latest data from Aurora if configured and devices are found - if (StringUtils.isNotBlank(apikeyInfo.getAuroraUrl()) && CollectionUtils.isNotEmpty(dpfDeviceInfoVOs)) { - Class.forName("com.mysql.cj.jdbc.Driver"); - - String regex = "(jdbc:mysql://)([^/]+)(/data_center_aeon_admin.*)"; - Pattern pattern = Pattern.compile(regex); - Matcher matcher = pattern.matcher(dbUrl); - String newJdbcUrl = ""; - if (matcher.find()) { - newJdbcUrl = matcher.replaceAll("$1" + apikeyInfo.getAuroraUrl() + "$3"); - } - - try (Connection conn = DriverManager.getConnection( - newJdbcUrl.replace("data_center_aeon_admin", "aeon") + "&allowPublicKeyRetrieval=true", - DESUtil.decrypt(apikeyInfo.getAuroraUsername(), Constants.DES_SALT), - DESUtil.decrypt(apikeyInfo.getAuroraPassword(), Constants.DES_SALT))) { - - for (ApiRiliDeviceInfoVO apiRiliDeviceInfoVO : dpfDeviceInfoVOs) { - - Map categoryIdSearchMap = new HashMap<>(); - categoryIdSearchMap.put("deviceId", apiRiliDeviceInfoVO.getDeviceId()); - Long deviceCategoryId = deviceInfoMapperExt.getDeviceCategoryId(categoryIdSearchMap); - - String baseSql = "SELECT rawData, receive_ts FROM rawData_realtime WHERE deviceId = ? LIMIT 1"; -// String sql = String.format(baseSql, formatRawDataWithDate(LocalDate.now(), deviceCategoryId)); - String sql = baseSql; - logger.info("queryDeviceInfo aurora sql: {}", sql); - - try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) { - preparedStatement.setString(1, apiRiliDeviceInfoVO.getDeviceId()); - try (ResultSet result = preparedStatement.executeQuery()) { - - if (result.next()) { - processResult(result, apiRiliDeviceInfoVO); - } - } - } - } - } catch (Exception e) { - logger.error("queryDeviceInfo processing aurora error", e); - return SimpleDataResponse.fail(ResponseCode.SERVER_ERROR, ResponseCode.SERVER_ERROR_MSG, new ArrayList<>()); + if (CollectionUtils.isNotEmpty(dpfDeviceInfoVOs)) { + for (ApiRiliDeviceInfoVO apiRiliDeviceInfoVO : dpfDeviceInfoVOs) { + String deviceId = apiRiliDeviceInfoVO.getDeviceId(); + // 计算分片 + int partitionIndex = Math.abs(deviceId.hashCode()) % Constants.REDIS_PARTITION_NUM; + String redisKey = Constants.REDIS_RAW_DATA_REALTIME + partitionIndex; + + // 从 Redis Hash 获取 JSON + Object redisData = redisUtil.HGet(redisKey, deviceId); + if (redisData != null) { + JSONObject jsonObject = JSON.parseObject(JSON.toJSONString(redisData), Feature.OrderedField); + Long receiveTs = jsonObject.getLong("receive_ts"); + String rawData = jsonObject.getString("rawData"); + + apiRiliDeviceInfoVO.setLatestRawData(rawData); + apiRiliDeviceInfoVO.setLatestDataTime(receiveTs); + + // 遍历所有键值对 + JSONObject rawDataObj = JSON.parseObject(rawData, Feature.OrderedField); + List valueList = new ArrayList<>(); + for (String key : rawDataObj.keySet()) { + valueList.add(rawDataObj.get(key)); + } + apiRiliDeviceInfoVO.setDataValue(StringUtils.join(valueList, ",")); + } } } } @@ -1569,7 +1528,7 @@ public class CommonServiceImpl implements CommonService { private void processRiliPersona(){ } - + private void processResult(ResultSet result, ApiRiliDeviceInfoVO apiRiliDeviceInfoVO) throws SQLException { String rawData = result.getString("rawData"); Long receiveTs = result.getLong("receive_ts"); @@ -1594,9 +1553,9 @@ public class CommonServiceImpl implements CommonService { if (null == jsonObj || null == jsonObj.getLong("companyId")) { return new SimpleDataResponse(ResponseCode.MSG_ERROR, "No companyId"); } - + dataSourceAdminConfig.updateTargetDataSources(dynamicDataSource, jsonObj.getLongValue("companyId")); - + return SimpleDataResponse.success(); } catch (Exception e) { logger.error("updateTargetDataSources error", e); @@ -1611,9 +1570,9 @@ public class CommonServiceImpl implements CommonService { if (StringUtils.isBlank(getFileTemporaryParams.getS3FileKey())) { return new SimpleDataResponse(ResponseCode.MSG_ERROR, "s3FileKey required"); } - + String s3FileKey = getFileTemporaryParams.getS3FileKey(); - + String bucketName = awsBucketAdress; String accessKey = awsAccessKeyId; String secretKey = awsAccessSecret; @@ -1628,7 +1587,7 @@ public class CommonServiceImpl implements CommonService { // default: // return new SimpleDataResponse(ResponseCode.MSG_ERROR, "invalid source"); // } - + S3FileMappingExample s3FileMappingExample = new S3FileMappingExample(); S3FileMappingExample.Criteria criteria = s3FileMappingExample.createCriteria(); criteria.andFileKeyEqualTo(getFileTemporaryParams.getS3FileKey()); @@ -1636,9 +1595,9 @@ public class CommonServiceImpl implements CommonService { if (CollectionUtils.isEmpty(mappings)){ return new SimpleDataResponse(ResponseCode.MSG_ERROR, "invalid source"); } - + String objectKey = mappings.get(0).getS3Path(); - + //Temporary link validity period, 2 hours for images, 24 hours for PDF int expire = 6;//hours if (isPdf(s3FileKey)) { @@ -1647,12 +1606,12 @@ public class CommonServiceImpl implements CommonService { if (isImage(s3FileKey)) { expire = 2; } - + URL url = null; S3Presigner presigner = null; try { AwsBasicCredentials awsCredentials = AwsBasicCredentials.create(accessKey, secretKey); - + // Try to get the object metadata HeadObjectRequest headObjectRequest = HeadObjectRequest.builder() .bucket(bucketName) @@ -1665,33 +1624,33 @@ public class CommonServiceImpl implements CommonService { .build(); HeadObjectResponse headObjectResponse = s3Client.headObject(headObjectRequest); if (headObjectResponse != null) { - + // Generate a presigned URL using S3Prestige presigner = S3Presigner.builder() .region(Region.AP_NORTHEAST_1) .credentialsProvider(StaticCredentialsProvider.create(awsCredentials)) .build(); - + software.amazon.awssdk.services.s3.model.GetObjectRequest getObjectRequest = software.amazon.awssdk.services.s3.model.GetObjectRequest.builder() .bucket(bucketName) .key(objectKey) .build(); - + GetObjectPresignRequest presignRequest = GetObjectPresignRequest.builder() .signatureDuration(Duration.ofHours(expire)) // expiration time (e.g. 12 hour) .getObjectRequest(getObjectRequest) .build(); - + // presigned URL PresignedGetObjectRequest presignedRequest = presigner.presignGetObject(presignRequest); url = presignedRequest.url(); - + // 输出临时链接 // System.out.println("临时链接为: " + url); } return SimpleDataResponse.success(url); - + } catch (NoSuchKeyException e) { logger.error("getTemporaryLink NoSuchKeyException", e); return new SimpleDataResponse(ResponseCode.MSG_ERROR, "File not found"); @@ -1704,7 +1663,7 @@ public class CommonServiceImpl implements CommonService { } } } - + public static boolean isPdf(String fileName) { return fileName != null && fileName.toLowerCase().endsWith(".pdf"); } @@ -1721,8 +1680,8 @@ public class CommonServiceImpl implements CommonService { lower.endsWith(".raw") || lower.endsWith(".cr2") || lower.endsWith(".nef"); } - - + + private void parsePdfInfoToList(String str, List targetList) { if (StringUtils.isNotBlank(str)) { Map map = JSON.parseObject(str, new TypeReference>() {}); @@ -1734,7 +1693,7 @@ public class CommonServiceImpl implements CommonService { } } } - + private void parsePdfInfoToList(String str, List targetList, String attribute) { if (StringUtils.isNotBlank(str)) { Map map = JSON.parseObject(str, new TypeReference>() {}); @@ -1747,7 +1706,7 @@ public class CommonServiceImpl implements CommonService { } } } - + private void parseImageInfoToList(String str, List targetList) { if (StringUtils.isNotBlank(str)) { Map imagesIntroduction = JSON.parseObject(str, new TypeReference>() {}); @@ -1759,7 +1718,7 @@ public class CommonServiceImpl implements CommonService { } } } - + @Override public SimpleDataResponse querySpaceInfo(String apikey, ApiSpaceSearchParams searchParams) { @@ -1768,7 +1727,7 @@ public class CommonServiceImpl implements CommonService { } return getSpaceInfo(apikey, searchParams); } - + private SimpleDataResponse getSpaceInfo(String apikey, ApiSpaceSearchParams searchParams) { try { Map paramMap = new HashMap<>(); @@ -1781,10 +1740,10 @@ public class CommonServiceImpl implements CommonService { ApikeyInfo2 companyInfo = basicCompanyMapperExt.getAuroraInfoByApikey(paramMap); long companyId = companyInfo.getId(); - + DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.setCurrentDataSourceKey(Constants.DATASOURCE_PREFIX+ apikeyInfo.getId()); - + searchParams.setCompanyIdList(Arrays.asList(companyId)); List resultList = basicSpaceMapperExt.getApiQuerySpaceInfo(searchParams); @@ -1835,7 +1794,7 @@ public class CommonServiceImpl implements CommonService { if (CollectionUtils.isEmpty(batchGetFileTemporaryParams.getKeys())) { return new SimpleDataResponse(ResponseCode.MSG_ERROR, "keys required"); } - + Map paramMap = new HashMap<>(); paramMap.put("apikey", apikey); ApikeyInfo2 apikeyInfo = commonOpt.getAuroraInfoByApikey(paramMap); @@ -1843,7 +1802,7 @@ public class CommonServiceImpl implements CommonService { if (null == apikeyInfo) { return SimpleDataResponse.fail(ResponseCode.MSG_ERROR, "Apikey is invalid"); } - + DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.setCurrentDataSourceKey(Constants.DATASOURCE_PREFIX+ apikeyInfo.getId()); @@ -1853,10 +1812,10 @@ public class CommonServiceImpl implements CommonService { AwsBasicCredentials awsCredentials = AwsBasicCredentials.create(accessKey, secretKey); Region region = Region.AP_NORTHEAST_1; - + S3Presigner presigner = null; List result = new ArrayList<>(); - + try { presigner = S3Presigner.builder() .region(region) @@ -1914,11 +1873,11 @@ public class CommonServiceImpl implements CommonService { PresignedGetObjectRequest presignedRequest = presigner.presignGetObject(presignRequest); temporaryInfo.setUrl(presignedRequest.url().toString());; - + } catch (NoSuchKeyException e) { logger.error("Key not found: {}", s3FileKey); } - + result.add(temporaryInfo); } @@ -1944,19 +1903,19 @@ public class CommonServiceImpl implements CommonService { if (null == apikeyInfo) { return SimpleDataResponse.fail(ResponseCode.MSG_ERROR, "Apikey is invalid"); } - + ApikeyInfo2 companyInfo = basicCompanyMapperExt.getAuroraInfoByApikey(paramMap); long companyId = companyInfo.getId(); - + DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.setCurrentDataSourceKey(Constants.DATASOURCE_PREFIX+ apikeyInfo.getId()); - + OptBuildingParams optBuildingParams = new OptBuildingParams(); optBuildingParams.setBuildingBucket(addBuildingParams.getBuildingBucket()); optBuildingParams.setBuildingName(addBuildingParams.getBuildingName()); optBuildingParams.setUdfBuildingId(addBuildingParams.getUdfBuildingId()); // optBuildingParams.setFloorInfoList(addBuildingParams.getFloorInfoList()); - + return buildingService.add(optBuildingParams, null, null, companyId, 2); } @@ -1971,10 +1930,10 @@ public class CommonServiceImpl implements CommonService { if (null == apikeyInfo) { return SimpleDataResponse.fail(ResponseCode.MSG_ERROR, "Apikey is invalid"); } - + DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.setCurrentDataSourceKey(Constants.DATASOURCE_PREFIX+ apikeyInfo.getId()); - + BasicBuildingExample example = new BasicBuildingExample(); BasicBuildingExample.Criteria criteria = example.createCriteria(); criteria.andUdfBuildingIdEqualTo(queryBuilding.getUdfBuildingId()); @@ -1993,7 +1952,7 @@ public class CommonServiceImpl implements CommonService { // buildingInfo.setFloorInfoList(floorInfos); // } buildingInfo.setFloorInfoList(basicFloorMapperExt.getFloorInfo(buildingList.get(0).getBuildingId())); - + return SimpleDataResponse.success(Collections.singletonList(buildingInfo)); } } @@ -2041,7 +2000,7 @@ public class CommonServiceImpl implements CommonService { Long removedCount = redisUtil.zRemRangeByScore(redisKey, 0, oneWeekAgoTimestamp); logger.info("Deleted " + removedCount + " elements in " + redisKey); Set cancelAlarmDeviceIdList = redisUtil.zSetOperations.rangeByScore(redisKey, 0, Double.POSITIVE_INFINITY); - + if (CollectionUtils.isNotEmpty(cancelAlarmDeviceIdList)) { Map paramMap2 = new HashMap<>(); paramMap2.put("deviceIdList", cancelAlarmDeviceIdList); @@ -2054,73 +2013,42 @@ public class CommonServiceImpl implements CommonService { paramMap2.put("assetSymbol", apiAlarmDeviceSearchParams.getAssetSymbol()); paramMap2.put("udfBuildingId", apiAlarmDeviceSearchParams.getUdfBuildingId()); paramMap2.put("companyId", companyId); - + deviceInfoVOs = deviceInfoMapperExt.getCancelAlarmDeviceInfo(paramMap2); //Query latest data from Aurora if configured and devices are found - if (StringUtils.isNotBlank(apikeyInfo.getAuroraUrl()) && CollectionUtils.isNotEmpty(deviceInfoVOs)) { - Class.forName("com.mysql.cj.jdbc.Driver"); - - String regex = "(jdbc:mysql://)([^/]+)(/data_center_aeon_admin.*)"; - Pattern pattern = Pattern.compile(regex); - Matcher matcher = pattern.matcher(dbUrl); - String newJdbcUrl = ""; - if (matcher.find()) { - newJdbcUrl = matcher.replaceAll("$1" + apikeyInfo.getAuroraUrl() + "$3"); - } - - try (Connection conn = DriverManager.getConnection( - newJdbcUrl.replace("data_center_aeon_admin", "aeon") + "&allowPublicKeyRetrieval=true", - DESUtil.decrypt(apikeyInfo.getAuroraUsername(), Constants.DES_SALT), - DESUtil.decrypt(apikeyInfo.getAuroraPassword(), Constants.DES_SALT))) { - - for (ApiCancelAlarmDeviceInfoVO apiCancelAlarmDeviceInfoVO : deviceInfoVOs) { -// String sql = " select rawData, receive_ts, alertTitle, alertLevel,alertLevelName,alertTypeName from "+formatRawDataWithDate()+" where deviceId = '" + apiAlarmDeviceInfoVO.getDeviceId() + "' order by receive_ts desc limit 1" ; -// String sql = " select rawData, receive_ts, alertTitle, alertLevel, alertTypeName from "+formatRawDataWithDate()+" where deviceId = '" + apiAlarmDeviceInfoVO.getDeviceId() + "' order by receive_ts desc limit 1" ; - String sql = "select rawData, receive_ts, alertCancelTitle, alertLevel, alertLevelName, alertTypeName from rawData_realtime where deviceId = ? limit 1"; - logger.info("queryAlarmDevice aurora sql: " + sql); - - try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) { - preparedStatement.setString(1, apiCancelAlarmDeviceInfoVO.getDeviceId()); - - ResultSet retult = preparedStatement.executeQuery(); - - while (retult.next()) { - String rawData = retult.getString("rawData"); - Long receiveTs = retult.getLong("receive_ts"); - String alertLevel = retult.getString("alertLevel"); - String alertLevelName = retult.getString("alertLevelName"); - String alertCancelTitle = retult.getString("alertCancelTitle"); - String alertTypeName = retult.getString("alertTypeName"); - apiCancelAlarmDeviceInfoVO.setLatestDataTime(receiveTs); - apiCancelAlarmDeviceInfoVO.setLatestRawData(rawData); - apiCancelAlarmDeviceInfoVO.setAlertLevel(alertLevel); - apiCancelAlarmDeviceInfoVO.setAlertCancelTitle(alertCancelTitle); - apiCancelAlarmDeviceInfoVO.setAlertTypeName(alertTypeName); - apiCancelAlarmDeviceInfoVO.setAlertLevelName(alertLevelName); - - JSONObject jsonObject = JSON.parseObject(rawData,Feature.OrderedField); - // 遍历所有键值对 - List valueList = new ArrayList<>(); - for (String key : jsonObject.keySet()) { - valueList.add(jsonObject.get(key)); - } - apiCancelAlarmDeviceInfoVO.setDataValue(StringUtils.join(valueList, ",")); - } - - preparedStatement.close(); - } - } - conn.close(); - } catch (Exception e) { - logger.error("queryAlarmDevice processing aurora error", e); - return SimpleDataResponse.fail(ResponseCode.SERVER_ERROR, ResponseCode.SERVER_ERROR_MSG); - } finally { - DataSourceContextHolder.clearCurrentDataSourceKey(); - if (jedis != null) { - // Close Jedis connection - jedis.close(); + if (CollectionUtils.isNotEmpty(deviceInfoVOs)) { + for (ApiCancelAlarmDeviceInfoVO apiCancelAlarmDeviceInfoVO : deviceInfoVOs) { + String deviceId = apiCancelAlarmDeviceInfoVO.getDeviceId(); + // 计算分片 + int partitionIndex = Math.abs(deviceId.hashCode()) % Constants.REDIS_PARTITION_NUM; + String realtimeRedisKey = Constants.REDIS_RAW_DATA_REALTIME + partitionIndex; + + // 从 Redis Hash 获取 JSON + Object redisData = redisUtil.HGet(realtimeRedisKey, deviceId); + if (redisData != null) { + JSONObject jsonObject = JSON.parseObject(JSON.toJSONString(redisData), Feature.OrderedField); + Long receiveTs = jsonObject.getLong("receive_ts"); + String rawData = jsonObject.getString("rawData"); + String alertLevel = jsonObject.getString("alertLevel"); + String alertLevelName = jsonObject.getString("alertLevelName"); + String alertCancelTitle = jsonObject.getString("alertCancelTitle"); + String alertTypeName = jsonObject.getString("alertTypeName"); + apiCancelAlarmDeviceInfoVO.setLatestDataTime(receiveTs); + apiCancelAlarmDeviceInfoVO.setLatestRawData(rawData); + apiCancelAlarmDeviceInfoVO.setAlertLevel(alertLevel); + apiCancelAlarmDeviceInfoVO.setAlertCancelTitle(alertCancelTitle); + apiCancelAlarmDeviceInfoVO.setAlertTypeName(alertTypeName); + apiCancelAlarmDeviceInfoVO.setAlertLevelName(alertLevelName); + + // 遍历所有键值对 + JSONObject rawDataObj = JSON.parseObject(rawData, Feature.OrderedField); + List valueList = new ArrayList<>(); + for (String key : rawDataObj.keySet()) { + valueList.add(rawDataObj.get(key)); + } + apiCancelAlarmDeviceInfoVO.setDataValue(StringUtils.join(valueList, ",")); } } } diff --git a/data-center-business-util/src/main/java/com/techsor/datacenter/business/util/redis/RedisUtil.java b/data-center-business-util/src/main/java/com/techsor/datacenter/business/util/redis/RedisUtil.java index b794191..52894c3 100644 --- a/data-center-business-util/src/main/java/com/techsor/datacenter/business/util/redis/RedisUtil.java +++ b/data-center-business-util/src/main/java/com/techsor/datacenter/business/util/redis/RedisUtil.java @@ -7,6 +7,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.core.*; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -40,8 +41,12 @@ public class RedisUtil { Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); - om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); +// om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); + + redisTemplate.setKeySerializer(new StringRedisSerializer()); + redisTemplate.setHashKeySerializer(new StringRedisSerializer()); + redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); this.redisTemplate = redisTemplate;