Browse Source

Merge remote-tracking branch 'remotes/origin/master'

master
xiaowuler 3 years ago
parent
commit
a0ba8c50fb
  1. 4
      04.系统编码/01.xxl-job/xxl-job-admin/src/main/resources/application.properties
  2. 2
      04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/dao/IDataRepository.java
  3. 10
      04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/dao/impl/DataRepository.java
  4. 2
      04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadarBlock.java
  5. 15
      04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadialBlock.java
  6. 22
      04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/service/impl/RadarService.java
  7. 12
      04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/service/impl/SchemaService.java
  8. 2
      04.系统编码/03.radar-resolver/radar-core/src/main/java/com/xiaowuler/radar/core/custom/impl/CustomRadarInputStream.java

4
04.系统编码/01.xxl-job/xxl-job-admin/src/main/resources/application.properties

@ -23,9 +23,9 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
#mybatis.type-aliases-package=com.xxl.job.admin.core.model
### xxl-job, datasource
spring.datasource.url=jdbc:mysql://localhost:3306/ahpmsdp_xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.url=jdbc:mysql://112.124.40.88:33306/ahpmsdp_xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.password=3cqscbr@only1
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
### datasource-pool

2
04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/dao/IDataRepository.java

@ -14,5 +14,5 @@ import com.ping.chuan.ahpmsdp.xxljobexecutor.model.dao.DealInfo;
*/
public interface IDataRepository {
void insert(String keyspace, String tableName, Date initialTime, Date forecastTime, int timeEffect, Map<String, List<List<BigDecimal>>> columns);
void insert(String keyspace, String tableName, Date initialTime, Date forecastTime, String station, Map<String, byte[]> columns, int maxCount);
void insert(String keyspace, String tableName, Date initialTime, Date forecastTime, String station, String productType, Map<String, List<Integer>> columns, int maxCount);
}

10
04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/dao/impl/DataRepository.java

@ -42,9 +42,9 @@ public class DataRepository implements IDataRepository {
}
@Override
public void insert(String keyspace, String tableName, Date initialTime, Date forecastTime, String station, Map<String, byte[]> columns, int maxCount) {
public void insert(String keyspace, String tableName, Date initialTime, Date forecastTime, String station, String productType, Map<String, List<Integer>> columns, int maxCount) {
List<String> targetColumns = prepareRadarColumnNames(columns.keySet());
List<Object> targetValues = prepareColumnValues(initialTime, forecastTime, station, columns.values());
List<Object> targetValues = prepareColumnValues(initialTime, forecastTime, station, productType, columns.values());
if (targetColumns.size() <= maxCount){
Insert builder = QueryBuilder.insertInto(keyspace, tableName)
@ -53,7 +53,7 @@ public class DataRepository implements IDataRepository {
return;
}
groupInsert(keyspace, tableName, targetColumns, targetValues, maxCount, 3);
groupInsert(keyspace, tableName, targetColumns, targetValues, maxCount, 4);
}
private void groupInsert(String keyspace, String tableName, List<String> columns, List<Object> values, int maxCount, int limit){
@ -85,6 +85,7 @@ public class DataRepository implements IDataRepository {
columns.add("initial_time");
columns.add("forecast_time");
columns.add("station");
columns.add("product_type");
columns.add("create_time");
columns.addAll(columnSet);
return columns;
@ -100,11 +101,12 @@ public class DataRepository implements IDataRepository {
return columns;
}
private List<Object> prepareColumnValues(Date initialTime, Date forecastTime, String station, Collection<byte[]> values){
private List<Object> prepareColumnValues(Date initialTime, Date forecastTime, String station, String productType, Collection<List<Integer>> values){
List<Object> columns = new ArrayList<>();
columns.add(initialTime);
columns.add(forecastTime);
columns.add(station);
columns.add(productType);
columns.add(new Date());
columns.addAll(values);
return columns;

2
04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadarBlock.java

@ -20,5 +20,5 @@ import com.xiaowuler.radar.core.weather.domain.RadialHeader;
@AllArgsConstructor
public class RadarBlock {
private CutConfiguration cutConfiguration;
private List<Radial> radials;
private List<RadialBlock> radials;
}

15
04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadialBlock.java

@ -0,0 +1,15 @@
package com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain;
import com.xiaowuler.radar.core.weather.domain.MomentBlock;
import com.xiaowuler.radar.core.weather.domain.RadialHeader;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RadialBlock {
private RadialHeader radialHeader;
private MomentBlock momentBlock;
}

22
04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/service/impl/RadarService.java

@ -8,9 +8,11 @@ import com.ping.chuan.ahpmsdp.xxljobexecutor.model.dao.DealInfo;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.dao.MetaInfo;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain.Coordinate;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain.RadarBlock;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain.RadialBlock;
import com.ping.chuan.ahpmsdp.xxljobexecutor.service.IRadarService;
import com.ping.chuan.ahpmsdp.xxljobexecutor.util.GZipUtils;
import com.xiaowuler.radar.core.weather.domain.MomentBlock;
import org.springframework.stereotype.Service;
import java.io.IOException;
@ -49,9 +51,9 @@ public class RadarService implements IRadarService {
RadarReader radarReader = new RadarReader();
radarReader.read(entry.getKey().getLocalPath());
Map<String, byte[]> columns = toColumns(radarReader.getRadarBlock().getCommonBlock());
Map<String, List<Integer>> columns = toColumns(radarReader.getRadarBlock().getCommonBlock());
toColumns(columns, radarReader.getRadarBlock().getCommonBlock().getCutConfigurations(), radarReader.getRadarBlock().getRadialBlock().getRadials());
dataRepository.insert(keyspace, entry.getValue().getKey().toTableName(), entry.getKey().getInitialTime(), entry.getKey().getInitialTime(), radarReader.getRadarBlock().getCommonBlock().getSiteConfiguration().getSiteCode(), columns, 2);
dataRepository.insert(keyspace, entry.getValue().getKey().toTableName(), entry.getKey().getInitialTime(), entry.getKey().getInitialTime(), radarReader.getRadarBlock().getCommonBlock().getSiteConfiguration().getSiteCode(), "dBT", columns, 1);
entry.getKey().setState(StateConstant.SUCCESS);
} catch (RadarReadException | IOException e) {
entry.getKey().setState(StateConstant.FAILED);
@ -62,19 +64,23 @@ public class RadarService implements IRadarService {
return Objects.isNull(error) ? "文件解析成功" : "文件解析失败,%s".formatted(error);
}
private Map<String, byte[]> toColumns(CommonBlock commonBlock) throws IOException {
private Map<String, List<Integer>> toColumns(CommonBlock commonBlock) throws IOException {
return new LinkedHashMap<>(){{
put("site_configuration", GZipUtils.compress(mapper.writeValueAsBytes(commonBlock.getSiteConfiguration())));
put("task_configuration", GZipUtils.compress(mapper.writeValueAsBytes(commonBlock.getTaskConfiguration())));
put("generic_header", GZipUtils.compress(mapper.writeValueAsBytes(commonBlock.getGenericHeader())));
put("site_configuration", GZipUtils.toList(GZipUtils.compress(mapper.writeValueAsBytes(commonBlock.getSiteConfiguration()))));
put("task_configuration", GZipUtils.toList(GZipUtils.compress(mapper.writeValueAsBytes(commonBlock.getTaskConfiguration()))));
put("generic_header", GZipUtils.toList(GZipUtils.compress(mapper.writeValueAsBytes(commonBlock.getGenericHeader()))));
}};
}
private void toColumns(Map<String, byte[]> columns, List<CutConfiguration> cutConfigurations, List<Radial> radials) throws IOException {
private void toColumns(Map<String, List<Integer>> columns, List<CutConfiguration> cutConfigurations, List<Radial> radials) throws IOException {
for(int index = 0, len = cutConfigurations.size(); index < len; index++){
int num = index + 1;
List<Radial> targetRadial = radials.stream().filter(r -> r.getRadialHeader().getElevationNumber() == num).collect(Collectors.toList());
columns.put("col_%s".formatted(index), GZipUtils.compress(mapper.writeValueAsBytes(new RadarBlock(cutConfigurations.get(index), targetRadial))));
List<RadialBlock> radialBlocks = targetRadial.stream().map(tr -> {
MomentBlock momentBlock = tr.getMomentBlocks().stream().filter(mb -> mb.getMomentHeader().getDataType() == 1).findAny().orElse(null);
return new RadialBlock(tr.getRadialHeader(), momentBlock);
}).collect(Collectors.toList());
columns.put("col_%s".formatted(index), GZipUtils.toList(GZipUtils.compress(mapper.writeValueAsBytes(new RadarBlock(cutConfigurations.get(index), radialBlocks)))));
}
}
}

12
04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/service/impl/SchemaService.java

@ -54,6 +54,7 @@ public class SchemaService implements ISchemaService {
private static Map<String, DataType> radarPartitionKey = new LinkedHashMap<>(partitionKey){{
put("station", DataType.varchar());
put("product_type", DataType.varchar());
}};
private final ISchemaRepository schemaRepository;
@ -92,15 +93,16 @@ public class SchemaService implements ISchemaService {
}
private Map<String, DataType> toColumn(int length){
Map<String, DataType> columns = new LinkedHashMap<>(length + 3){{
put("site_configuration", DataType.blob());
put("task_configuration", DataType.blob());
put("generic_header", DataType.blob());
Map<String, DataType> columns = new LinkedHashMap<>(length + 4){{
put("site_configuration", DataType.frozenList(DataType.cint()));
put("task_configuration", DataType.frozenList(DataType.cint()));
put("generic_header", DataType.frozenList(DataType.cint()));
}};
for(int index = 0; index < length; index++){
columns.put("col_%s".formatted(index), DataType.blob());
columns.put("col_%s".formatted(index), DataType.frozenList(DataType.cint()));
}
columns.put("create_time", DataType.timestamp());
return columns;
}

2
04.系统编码/03.radar-resolver/radar-core/src/main/java/com/xiaowuler/radar/core/custom/impl/CustomRadarInputStream.java

@ -47,7 +47,7 @@ public class CustomRadarInputStream implements ICustomInputStream, Closeable{
private <T extends InputStream> void customRadarInputStream(String filepath, Class<T> clazz) throws NoSuchMethodException, IOException, InvocationTargetException, InstantiationException, IllegalAccessException {
fileInputStream = new FileInputStream(filepath);
available = new File(filepath).length();
// available = new File(filepath).length();
inputStream = clazz.getConstructor(InputStream.class).newInstance(fileInputStream);
System.out.println(inputStream.readAllBytes());
}

Loading…
Cancel
Save