From c29445bb72e3a642310e741730f78fce20154b96 Mon Sep 17 00:00:00 2001 From: xiaowuler Date: Mon, 21 Mar 2022 17:47:17 +0800 Subject: [PATCH] commit some codes --- .../src/main/resources/application.properties | 4 ++-- .../xxljobexecutor/dao/IDataRepository.java | 2 +- .../dao/impl/DataRepository.java | 10 +++++---- .../model/domain/RadarBlock.java | 2 +- .../model/domain/RadialBlock.java | 15 +++++++++++++ .../service/impl/RadarService.java | 22 ++++++++++++------- .../service/impl/SchemaService.java | 12 +++++----- .../custom/impl/CustomRadarInputStream.java | 2 +- 8 files changed, 47 insertions(+), 22 deletions(-) create mode 100644 04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadialBlock.java diff --git a/04.系统编码/01.xxl-job/xxl-job-admin/src/main/resources/application.properties b/04.系统编码/01.xxl-job/xxl-job-admin/src/main/resources/application.properties index 5588b71..4af2151 100644 --- a/04.系统编码/01.xxl-job/xxl-job-admin/src/main/resources/application.properties +++ b/04.系统编码/01.xxl-job/xxl-job-admin/src/main/resources/application.properties @@ -23,9 +23,9 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml #mybatis.type-aliases-package=com.xxl.job.admin.core.model ### xxl-job, datasource -spring.datasource.url=jdbc:mysql://localhost:3306/ahpmsdp_xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai +spring.datasource.url=jdbc:mysql://112.124.40.88:33306/ahpmsdp_xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username=root -spring.datasource.password=root +spring.datasource.password=3cqscbr@only1 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver ### datasource-pool diff --git a/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/dao/IDataRepository.java b/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/dao/IDataRepository.java index f929007..b6054ad 100644 --- a/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/dao/IDataRepository.java +++ b/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/dao/IDataRepository.java @@ -14,5 +14,5 @@ import com.ping.chuan.ahpmsdp.xxljobexecutor.model.dao.DealInfo; */ public interface IDataRepository { void insert(String keyspace, String tableName, Date initialTime, Date forecastTime, int timeEffect, Map>> columns); - void insert(String keyspace, String tableName, Date initialTime, Date forecastTime, String station, Map columns, int maxCount); + void insert(String keyspace, String tableName, Date initialTime, Date forecastTime, String station, String productType, Map> columns, int maxCount); } diff --git a/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/dao/impl/DataRepository.java b/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/dao/impl/DataRepository.java index 1fd7a2a..a8c9b6f 100644 --- a/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/dao/impl/DataRepository.java +++ b/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/dao/impl/DataRepository.java @@ -42,9 +42,9 @@ public class DataRepository implements IDataRepository { } @Override - public void insert(String keyspace, String tableName, Date initialTime, Date forecastTime, String station, Map columns, int maxCount) { + public void insert(String keyspace, String tableName, Date initialTime, Date forecastTime, String station, String productType, Map> columns, int maxCount) { List targetColumns = prepareRadarColumnNames(columns.keySet()); - List targetValues = prepareColumnValues(initialTime, forecastTime, station, columns.values()); + List targetValues = prepareColumnValues(initialTime, forecastTime, station, productType, columns.values()); if (targetColumns.size() <= maxCount){ Insert builder = QueryBuilder.insertInto(keyspace, tableName) @@ -53,7 +53,7 @@ public class DataRepository implements IDataRepository { return; } - groupInsert(keyspace, tableName, targetColumns, targetValues, maxCount, 3); + groupInsert(keyspace, tableName, targetColumns, targetValues, maxCount, 4); } private void groupInsert(String keyspace, String tableName, List columns, List values, int maxCount, int limit){ @@ -85,6 +85,7 @@ public class DataRepository implements IDataRepository { columns.add("initial_time"); columns.add("forecast_time"); columns.add("station"); + columns.add("product_type"); columns.add("create_time"); columns.addAll(columnSet); return columns; @@ -100,11 +101,12 @@ public class DataRepository implements IDataRepository { return columns; } - private List prepareColumnValues(Date initialTime, Date forecastTime, String station, Collection values){ + private List prepareColumnValues(Date initialTime, Date forecastTime, String station, String productType, Collection> values){ List columns = new ArrayList<>(); columns.add(initialTime); columns.add(forecastTime); columns.add(station); + columns.add(productType); columns.add(new Date()); columns.addAll(values); return columns; diff --git a/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadarBlock.java b/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadarBlock.java index 4c99adf..dbd4c68 100644 --- a/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadarBlock.java +++ b/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadarBlock.java @@ -20,5 +20,5 @@ import com.xiaowuler.radar.core.weather.domain.RadialHeader; @AllArgsConstructor public class RadarBlock { private CutConfiguration cutConfiguration; - private List radials; + private List radials; } diff --git a/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadialBlock.java b/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadialBlock.java new file mode 100644 index 0000000..0e8ee44 --- /dev/null +++ b/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadialBlock.java @@ -0,0 +1,15 @@ +package com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain; + +import com.xiaowuler.radar.core.weather.domain.MomentBlock; +import com.xiaowuler.radar.core.weather.domain.RadialHeader; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class RadialBlock { + private RadialHeader radialHeader; + private MomentBlock momentBlock; +} diff --git a/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/service/impl/RadarService.java b/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/service/impl/RadarService.java index 6d15831..6f79cd5 100644 --- a/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/service/impl/RadarService.java +++ b/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/service/impl/RadarService.java @@ -8,9 +8,11 @@ import com.ping.chuan.ahpmsdp.xxljobexecutor.model.dao.DealInfo; import com.ping.chuan.ahpmsdp.xxljobexecutor.model.dao.MetaInfo; import com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain.Coordinate; import com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain.RadarBlock; +import com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain.RadialBlock; import com.ping.chuan.ahpmsdp.xxljobexecutor.service.IRadarService; import com.ping.chuan.ahpmsdp.xxljobexecutor.util.GZipUtils; +import com.xiaowuler.radar.core.weather.domain.MomentBlock; import org.springframework.stereotype.Service; import java.io.IOException; @@ -49,9 +51,9 @@ public class RadarService implements IRadarService { RadarReader radarReader = new RadarReader(); radarReader.read(entry.getKey().getLocalPath()); - Map columns = toColumns(radarReader.getRadarBlock().getCommonBlock()); + Map> columns = toColumns(radarReader.getRadarBlock().getCommonBlock()); toColumns(columns, radarReader.getRadarBlock().getCommonBlock().getCutConfigurations(), radarReader.getRadarBlock().getRadialBlock().getRadials()); - dataRepository.insert(keyspace, entry.getValue().getKey().toTableName(), entry.getKey().getInitialTime(), entry.getKey().getInitialTime(), radarReader.getRadarBlock().getCommonBlock().getSiteConfiguration().getSiteCode(), columns, 2); + dataRepository.insert(keyspace, entry.getValue().getKey().toTableName(), entry.getKey().getInitialTime(), entry.getKey().getInitialTime(), radarReader.getRadarBlock().getCommonBlock().getSiteConfiguration().getSiteCode(), "dBT", columns, 1); entry.getKey().setState(StateConstant.SUCCESS); } catch (RadarReadException | IOException e) { entry.getKey().setState(StateConstant.FAILED); @@ -62,19 +64,23 @@ public class RadarService implements IRadarService { return Objects.isNull(error) ? "文件解析成功" : "文件解析失败,%s".formatted(error); } - private Map toColumns(CommonBlock commonBlock) throws IOException { + private Map> toColumns(CommonBlock commonBlock) throws IOException { return new LinkedHashMap<>(){{ - put("site_configuration", GZipUtils.compress(mapper.writeValueAsBytes(commonBlock.getSiteConfiguration()))); - put("task_configuration", GZipUtils.compress(mapper.writeValueAsBytes(commonBlock.getTaskConfiguration()))); - put("generic_header", GZipUtils.compress(mapper.writeValueAsBytes(commonBlock.getGenericHeader()))); + put("site_configuration", GZipUtils.toList(GZipUtils.compress(mapper.writeValueAsBytes(commonBlock.getSiteConfiguration())))); + put("task_configuration", GZipUtils.toList(GZipUtils.compress(mapper.writeValueAsBytes(commonBlock.getTaskConfiguration())))); + put("generic_header", GZipUtils.toList(GZipUtils.compress(mapper.writeValueAsBytes(commonBlock.getGenericHeader())))); }}; } - private void toColumns(Map columns, List cutConfigurations, List radials) throws IOException { + private void toColumns(Map> columns, List cutConfigurations, List radials) throws IOException { for(int index = 0, len = cutConfigurations.size(); index < len; index++){ int num = index + 1; List targetRadial = radials.stream().filter(r -> r.getRadialHeader().getElevationNumber() == num).collect(Collectors.toList()); - columns.put("col_%s".formatted(index), GZipUtils.compress(mapper.writeValueAsBytes(new RadarBlock(cutConfigurations.get(index), targetRadial)))); + List radialBlocks = targetRadial.stream().map(tr -> { + MomentBlock momentBlock = tr.getMomentBlocks().stream().filter(mb -> mb.getMomentHeader().getDataType() == 1).findAny().orElse(null); + return new RadialBlock(tr.getRadialHeader(), momentBlock); + }).collect(Collectors.toList()); + columns.put("col_%s".formatted(index), GZipUtils.toList(GZipUtils.compress(mapper.writeValueAsBytes(new RadarBlock(cutConfigurations.get(index), radialBlocks))))); } } } diff --git a/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/service/impl/SchemaService.java b/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/service/impl/SchemaService.java index d29046a..f0233e4 100644 --- a/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/service/impl/SchemaService.java +++ b/04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/service/impl/SchemaService.java @@ -54,6 +54,7 @@ public class SchemaService implements ISchemaService { private static Map radarPartitionKey = new LinkedHashMap<>(partitionKey){{ put("station", DataType.varchar()); + put("product_type", DataType.varchar()); }}; private final ISchemaRepository schemaRepository; @@ -92,15 +93,16 @@ public class SchemaService implements ISchemaService { } private Map toColumn(int length){ - Map columns = new LinkedHashMap<>(length + 3){{ - put("site_configuration", DataType.blob()); - put("task_configuration", DataType.blob()); - put("generic_header", DataType.blob()); + Map columns = new LinkedHashMap<>(length + 4){{ + put("site_configuration", DataType.frozenList(DataType.cint())); + put("task_configuration", DataType.frozenList(DataType.cint())); + put("generic_header", DataType.frozenList(DataType.cint())); }}; for(int index = 0; index < length; index++){ - columns.put("col_%s".formatted(index), DataType.blob()); + columns.put("col_%s".formatted(index), DataType.frozenList(DataType.cint())); } + columns.put("create_time", DataType.timestamp()); return columns; } diff --git a/04.系统编码/03.radar-resolver/radar-core/src/main/java/com/xiaowuler/radar/core/custom/impl/CustomRadarInputStream.java b/04.系统编码/03.radar-resolver/radar-core/src/main/java/com/xiaowuler/radar/core/custom/impl/CustomRadarInputStream.java index c276fb3..1d2fdbc 100644 --- a/04.系统编码/03.radar-resolver/radar-core/src/main/java/com/xiaowuler/radar/core/custom/impl/CustomRadarInputStream.java +++ b/04.系统编码/03.radar-resolver/radar-core/src/main/java/com/xiaowuler/radar/core/custom/impl/CustomRadarInputStream.java @@ -47,7 +47,7 @@ public class CustomRadarInputStream implements ICustomInputStream, Closeable{ private void customRadarInputStream(String filepath, Class clazz) throws NoSuchMethodException, IOException, InvocationTargetException, InstantiationException, IllegalAccessException { fileInputStream = new FileInputStream(filepath); - available = new File(filepath).length(); +// available = new File(filepath).length(); inputStream = clazz.getConstructor(InputStream.class).newInstance(fileInputStream); }