Browse Source

modify some codes

master
xiaowuler 3 years ago
parent
commit
8a90ed58d2
  1. 223
      04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/jobhandler/TianQingRadarDataHandler.java
  2. 43
      04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadarRead.java
  3. 175
      04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadarReader.java
  4. 42
      04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/util/ByteUtil.java
  5. 11
      04.系统编码/01.xxl-job/xxl-job-executor/src/test/java/com/ping/chuan/ahpmsdp/xxljobexecutor/XxlJobExecutorApplicationTests.java

223
04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/jobhandler/TianQingRadarDataHandler.java

@ -0,0 +1,223 @@
package com.ping.chuan.ahpmsdp.xxljobexecutor.jobhandler;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.constant.SplitConstant;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.constant.StateConstant;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.constant.TimeConstant;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.dao.BaseInfo;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.dao.DealInfo;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.dao.MetaInfo;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain.ApplicationCommon;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain.Coordinate;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain.Grb2Reader;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.vo.ParamInfo;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.vo.TianQingFileVO;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.vo.TianQingResponse;
import com.ping.chuan.ahpmsdp.xxljobexecutor.service.IDataService;
import com.ping.chuan.ahpmsdp.xxljobexecutor.service.IDealInfoService;
import com.ping.chuan.ahpmsdp.xxljobexecutor.service.ITianQingSCMOCDataService;
import com.ping.chuan.ahpmsdp.xxljobexecutor.service.impl.BaseInfoService;
import com.ping.chuan.ahpmsdp.xxljobexecutor.template.CacheTemplate;
import com.ping.chuan.ahpmsdp.xxljobexecutor.util.FileUtil;
import com.ping.chuan.ahpmsdp.xxljobexecutor.util.WebUtil;
/**
* @describe: 天擎 spcc数据读取
* @author: xiaowuler
* @createTime: 2021-11-10 09:32
*/
@Component
public class TianQingRadarDataHandler {
private final ObjectMapper objectMapper = new ObjectMapper();
private Map<String, TianQingResponse<List<TianQingFileVO>>> results = new HashMap<>();
private final ApplicationCommon applicationCommon;
private final BaseInfoService baseInfoService;
private final ITianQingSCMOCDataService tianQingSCMOCDataService;
private final IDealInfoService dealInfoService;
private final IDataService dataService;
public TianQingRadarDataHandler(ApplicationCommon applicationCommon, BaseInfoService baseInfoService, ITianQingSCMOCDataService tianQingSCMOCDataService, IDealInfoService dealInfoService, IDataService dataService){
this.applicationCommon = applicationCommon;
this.baseInfoService = baseInfoService;
this.tianQingSCMOCDataService = tianQingSCMOCDataService;
this.dealInfoService = dealInfoService;
this.dataService = dataService;
}
@XxlJob("readRadar")
public void readRadar() throws IOException {
XxlJobHelper.log("read job handler start");
String param = XxlJobHelper.getJobParam();
ParamInfo paramInfo = objectMapper.readValue(param, ParamInfo.class);
Map<String, Object> params = applicationCommon.prepareParams(paramInfo.getModeCode().toLowerCase());
LocalDateTime startTime = LocalDateTime.now().minusHours(8).minusHours(paramInfo.getTimeLength());
LocalDateTime endTime = LocalDateTime.now().minusHours(8);
TianQingResponse<List<TianQingFileVO>> tianQingResponse = request(String.format("[%s,%s]", startTime.format(TimeConstant.YYYYMMDDHHMM00), endTime.format(TimeConstant.YYYYMMDDHHMM00)), params);
if (Objects.isNull(tianQingResponse)){
XxlJobHelper.log("未从天擎查询到数据");
return;
}
List<DealInfo> dealInfos = dealInfoService.findAllByTimeRange(paramInfo.getModeCode(), paramInfo.getMemberCode(), paramInfo.getElementCode(), startTime, endTime);
Map<DealInfo, Map.Entry<MetaInfo, Coordinate>> results = download(tianQingResponse.getDs(), paramInfo.getElementCode(), dealInfos);
XxlJobHelper.log("总共需要解析文件{}个", results.size());
for(Map.Entry<DealInfo, Map.Entry<MetaInfo, Coordinate>> entry : results.entrySet()){
XxlJobHelper.log("文件:{} 开始解析", entry.getKey().getFileName());
String result;
if(paramInfo.isWind()){
result = dataService.readWindElement(entry);
}else {
result = dataService.readRoutineElement(entry, paramInfo.isTemp());
}
XxlJobHelper.log("文件:{} 解析完成, 结果:{}", entry.getKey().getFileName(), result);
}
XxlJobHelper.log("文件解析完成");
XxlJobHelper.log("read job end");
}
public TianQingResponse<List<TianQingFileVO>> request(String timeRang, Map<String, Object> params) throws JsonProcessingException {
synchronized (this){
if (results.containsKey(timeRang)){
return results.get(timeRang);
}
results.clear();
params.put("timeRange", timeRang);
int index = 0;
while (index < 3){
String result = WebUtil.get(applicationCommon.getUrl(), params);
TianQingResponse<List<TianQingFileVO>> tianQingResponse = objectMapper.readValue(result, new TypeReference<>() {});
if (tianQingResponse.getReturnCode() == 0){
results.put(timeRang, tianQingResponse);
return tianQingResponse;
}
index++;
}
return null;
}
}
private Map<DealInfo, Map.Entry<MetaInfo, Coordinate>> download(List<TianQingFileVO> tianQingFileVOs, String elementCode, List<DealInfo> dealInfos){
Date afterTime = Date.from(LocalDateTime.now().minusMinutes(10).atZone(ZoneId.systemDefault()).toInstant());
Map<DealInfo, Map.Entry<MetaInfo, Coordinate>> targetDealInfos = new HashMap<>(tianQingFileVOs.size());
for(TianQingFileVO tianQingFileVO: tianQingFileVOs){
if (!tianQingFileVO.getProdCont().equals(elementCode)){
continue;
}
DealInfo dealInfo = toDealInfo(tianQingFileVO.getFileName());
if (Objects.isNull(dealInfo)){
continue;
}
Map.Entry<MetaInfo, Coordinate> entry = CacheTemplate.findOne(dealInfo);
if (Objects.isNull(entry)){
XxlJobHelper.log("meta_infos 未录入相关文件信息, {}", dealInfo.getFileName());
continue;
}
DealInfo sourceDealInfo = dealInfos.stream().filter(d -> d.getFileName().equals(dealInfo.getFileName())).findAny().orElse(null);
if (Objects.isNull(sourceDealInfo)){
sourceDealInfo = dealInfo.toPerfect(entry.getKey(), tianQingFileVO.getFileUrl());
sourceDealInfo.setForecastTime(new Date());
dealInfoService.insert(dealInfo.toKeyspace(), sourceDealInfo);
download(sourceDealInfo, entry, targetDealInfos);
continue;
}
// 判断文件是否需要处理
if (sourceDealInfo.getTryCount() >= 3 || (sourceDealInfo.getState() == StateConstant.PROCESSING && sourceDealInfo.getUpdateTime().before(afterTime)) || sourceDealInfo.getState() == StateConstant.SUCCESS){
continue;
}
sourceDealInfo.setState(StateConstant.PROCESSING);
sourceDealInfo.setTryCount(sourceDealInfo.getTryCount() + 1);
sourceDealInfo.setUpdateTime(new Date());
if (Files.exists(Paths.get(sourceDealInfo.getLocalPath()))){
dealInfoService.insert(sourceDealInfo.toKeyspace(), sourceDealInfo);
targetDealInfos.put(sourceDealInfo, entry);
continue;
}
download(sourceDealInfo, entry, targetDealInfos);
}
return targetDealInfos;
}
private void download(DealInfo dealInfo, Map.Entry<MetaInfo, Coordinate> entry, Map<DealInfo, Map.Entry<MetaInfo, Coordinate>> targetDealInfos){
XxlJobHelper.log("开始下载文件:{}", dealInfo.getFileName());
boolean result = FileUtil.downloadByNetwork(dealInfo.getUrl(), dealInfo.getLocalPath());
if (result){
XxlJobHelper.log("文件:{} 下载完成", dealInfo.getFileName());
targetDealInfos.put(dealInfo, entry);
return;
}
dealInfo.setState(StateConstant.FAILED);
XxlJobHelper.log("文件:{} 下载失败", dealInfo.getFileName());
dealInfo.setMessage("下载文件:%s失败".formatted(dealInfo.getFileName()));
dealInfoService.insert(dealInfo.toKeyspace(), dealInfo);
return;
}
private DealInfo toDealInfo(String fileName){
String[] names = fileName.split(SplitConstant.FILENAME_CUT_RUNG);
if (names.length != 10){
XxlJobHelper.log("文件名:{} 格式不规范,无法解析", fileName);
return null;
}
DealInfo dealInfo = new DealInfo();
dealInfo.setMemberCode(names[3]);
dealInfo.setCreateTime(Date.from(LocalDateTime.parse(names[4], TimeConstant.YYYYMMDDHHMMSS).atZone(ZoneId.systemDefault()).toInstant()));
String[] modes = names[7].split(SplitConstant.FILENAME_CUT_NEXT_RUNG);
dealInfo.setModeCode(modes[0]);
dealInfo.setElementCode(modes[1]);
dealInfo.setInitialTime(Date.from(LocalDateTime.parse(names[8], TimeConstant.YYYYMMDDHHMM).atZone(ZoneId.systemDefault()).toInstant()));
dealInfo.setHeight(BigDecimal.valueOf(1010));
dealInfo.setTimeLength(Integer.parseInt(names[9].substring(0, 3)));
dealInfo.setTimeInterval(Integer.parseInt(names[9].substring(3, 5)));
dealInfo.setLocalPath(String.format("%s\\%s\\%s\\%s\\%s\\%s", "C:\\SaveFile\\Source", names[8].substring(0, 6), dealInfo.getModeCode(), dealInfo.getMemberCode(), dealInfo.getElementCode(), fileName));
dealInfo.setFileName(fileName);
return dealInfo;
}
private void convertFile(Map<String, Boolean> results, String variable, List<BaseInfo> baseInfos, boolean isWind) throws IOException {
for(Map.Entry<String, Boolean> entry : results.entrySet()){
String filepath = entry.getKey();
boolean result = entry.getValue();
if (!result){
continue;
}
if (baseInfos.stream().filter(b -> b.getSourceFilePath().equals(filepath)).count() > 0){
continue;
}
BaseInfo baseInfo = Grb2Reader.readCLDASInfo(filepath);
if (isWind) {
tianQingSCMOCDataService.readWindElement(baseInfo);
} else {
tianQingSCMOCDataService.readRoutineElement(baseInfo, variable);
}
}
}
}

43
04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadarRead.java

@ -0,0 +1,43 @@
package com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain;
import java.util.Date;
import lombok.Data;
/**
* @describe: 雷达 实体类
* @author: xiaowuler
* @createTime: 2022-02-28 09:20
*/
@Data
public class RadarRead {
private short day = 0;
private short hour = 0;
private short year = 0;
private short month = 0;
private short minute = 0;
private int interval = 0;
private int radarCount = 0;
private short zNumGrids = 0;
private short xNumGrids = 0;
private short yNumGrids = 0;
private Date time;
public float yReso;
public float xReso;
private float startLon;
private float startLat;
private float centerLat;
private float centerLon;
private String flag = null;
private String version = null;
private char[] reserved = null;
private String dataName = null;
private String zoneName = null;
private byte[] mosaicFlag = null;
private float[] zHighGrids = null;
private float[] radarAltitude = null;
private float[] radarLatitude = null;
private float[] radarLongitude = null;
private String[] radarStationName = null;
private float[] observerValue = null;
}

175
04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/model/domain/RadarReader.java

@ -0,0 +1,175 @@
package com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import com.xxl.job.core.util.DateUtil;
import com.ping.chuan.ahpmsdp.xxljobexecutor.util.ByteUtil;
/**
* @describe: 雷达读取类
* @author: xiaowuler
* @createTime: 2022-02-28 09:18
*/
public class RadarReader {
public RadarRead readBinaryFile(String filePath, String time) {
RadarRead radar = new RadarRead();
try{
InputStream inputStream = new FileInputStream(filePath);
BufferedInputStream bi = new BufferedInputStream(inputStream);
radar.setZoneName(readToString(bi, 12));
radar.setDataName(readToString(bi, 38));
radar.setFlag(readToString(bi, 8));
radar.setVersion(readToString(bi, 8));
radar.setYear(readToShort(bi));
radar.setMonth(readToShort(bi));
radar.setDay(readToShort(bi));
radar.setHour(readToShort(bi));
radar.setMinute(readToShort(bi));
radar.setInterval(readToShort(bi));
radar.setXNumGrids(readToShort(bi));
radar.setYNumGrids(readToShort(bi));
radar.setZNumGrids(readToShort(bi));
LocalDateTime initialTime = LocalDateTime.parse(time, DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
Date date = Date.from(initialTime.atZone(ZoneOffset.systemDefault()).toInstant());
radar.setTime(DateUtil.addHours(date, 8));
radar.setRadarCount(readToInt(bi));
radar.setStartLon(readToFloat(bi));
radar.setStartLat(readToFloat(bi));
radar.setCenterLon(readToFloat(bi));
radar.setCenterLat(readToFloat(bi));
radar.setXReso(readToFloat(bi));
radar.setYReso(readToFloat(bi));
radar.setZHighGrids(setMultipleValue(bi, new float[40]));
radar.setRadarStationName(setMultipleValue(bi, new String[20], 16));
radar.setRadarLongitude(setMultipleValue(bi, new float[20]));
radar.setRadarLatitude(setMultipleValue(bi, new float[20]));
radar.setRadarAltitude(setMultipleValue(bi, new float[20]));
byte[] bytes = new byte[20];
bi.read(bytes);
radar.setMosaicFlag(bytes);
radar.setReserved(setMultipleValue(bi, 172));
radar.setObserverValue(new float[radar.getZNumGrids() * radar.getYNumGrids() * radar.getXNumGrids()]);
readValue(bi, radar, true);
List<Float> targetValues = new ArrayList<>();
List<Float> values = IntStream.range(0, radar.getObserverValue().length)
.mapToDouble(i -> radar.getObserverValue()[i]).boxed().map(i -> i.floatValue()).collect(Collectors.toList());
values = values.subList(305 * radar.getXNumGrids(), 700 * radar.getYNumGrids());
for(int index = 0; index < 395; index++){
int tempIndex = index * radar.getXNumGrids();
targetValues.addAll(values.subList(tempIndex + 365, tempIndex + 700));
}
float[] result = new float[132325];
for(int index = 0, len = targetValues.size(); index < len; index++){
result[index] = targetValues.get(index);
}
radar.setObserverValue(result);
}catch (Exception e) {
throw new RuntimeException(e);
}
return radar;
}
private void readValue(BufferedInputStream bis, RadarRead radar, boolean flag) throws IOException {
if (flag){
for(int i = 0, len = radar.getXNumGrids() * radar.getYNumGrids() * radar.getZNumGrids(); i < len; i++) {
float value = bis.read();
if (value != 0){
radar.getObserverValue()[i] = (value - 66) / 2;
continue;
}
radar.getObserverValue()[i] = value;
}
}else {
for(int i = 0; i < radar.getXNumGrids() * radar.getYNumGrids() * radar.getZNumGrids(); i++) {
int value = readToInt(bis);
radar.getObserverValue()[i] = value / 1;
}
}
}
private String readToString(BufferedInputStream bi, Integer... infos) throws IOException {
return convert(bi, (result, info) -> new String(result, info[1], info[2]), infos);
}
private short readToShort(BufferedInputStream bi) throws IOException{
return convert(bi, (result, infos) -> ByteUtil.toShort(result), 2);
}
private int readToInt(BufferedInputStream bi) throws IOException {
return convert(bi, (result, infos) -> ByteUtil.toInt(result), 4);
}
private float readToFloat(BufferedInputStream bi) throws IOException {
return convert(bi, (result, infos) -> ByteUtil.toFloat(result), 4);
}
private static <T> T convert(BufferedInputStream bi, Template<T> template, Integer... infos) throws IOException {
if (infos.length == 1){
infos = Arrays.copyOf(infos,3);
infos[1] = 0;
infos[2] = infos[0];
}
byte[] result = new byte[infos[0]];
bi.read(result);
return template.readBytes(result, infos);
}
interface Template<T>{
T readBytes(byte[] result, Integer... infos) throws IOException;
}
private String[] setMultipleValue(BufferedInputStream bi, String[] result, Integer... infos) throws IOException {
for (int index = 0, len = result.length; index < len; index++){
result[index] = readToString(bi, infos);
}
return result;
}
private float[] setMultipleValue(BufferedInputStream bi, float[] result) throws IOException {
for (int index = 0, len = result.length; index < len; index++){
result[index] = readToFloat(bi);
}
return result;
}
private char[] setMultipleValue(BufferedInputStream bis, int length) throws IOException {
byte[] bytes = new byte[length];
bis.read(bytes);
Charset cs = Charset.forName ("UTF-8");
ByteBuffer bb = ByteBuffer.allocate (bytes.length);
bb.put (bytes);
bb.flip ();
CharBuffer cb = cs.decode (bb);
return cb.array();
}
}

42
04.系统编码/01.xxl-job/xxl-job-executor/src/main/java/com/ping/chuan/ahpmsdp/xxljobexecutor/util/ByteUtil.java

@ -0,0 +1,42 @@
package com.ping.chuan.ahpmsdp.xxljobexecutor.util;
/**
* @describe: 字节工具类
* @author: xiaowuler
* @createTime: 2022-02-28 09:23
*/
public class ByteUtil {
private static <T> T convert(byte[] result, DefaultTemplate<T> defaultTemplate){
return defaultTemplate.convert(result);
}
public static float toFloat(byte[] result){
return convert(result, (bytes) -> {
int l;
l = bytes[0];
l &= 0xff;
l |= ((long) bytes[1] << 8);
l &= 0xffff;
l |= ((long) bytes[2] << 16);
l &= 0xffffff;
l |= ((long) bytes[3] << 24);
return Float.intBitsToFloat(l);
});
}
public static int toInt(byte[] result){
return convert(result, (bytes) -> ((((result[3] & 0xff) << 24)
| ((result[2] & 0xff) << 16)
| ((result[1] & 0xff) << 8) | ((result[0] & 0xff) << 0))));
}
public static short toShort(byte[] result) {
return convert(result, (bytes) -> (short) (((result[1] << 8) | result[0] & 0xff)));
}
}
interface DefaultTemplate<T> {
T convert(byte[] result);
}

11
04.系统编码/01.xxl-job/xxl-job-executor/src/test/java/com/ping/chuan/ahpmsdp/xxljobexecutor/XxlJobExecutorApplicationTests.java

@ -9,6 +9,8 @@ import com.ping.chuan.ahpmsdp.xxljobexecutor.model.dao.BaseInfo;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.dao.DealInfo;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain.ElementInfo;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain.Grb2Reader;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain.RadarRead;
import com.ping.chuan.ahpmsdp.xxljobexecutor.model.domain.RadarReader;
import com.ping.chuan.ahpmsdp.xxljobexecutor.service.IDealInfoService;
import com.ping.chuan.ahpmsdp.xxljobexecutor.service.ISchemaService;
import com.ping.chuan.ahpmsdp.xxljobexecutor.service.ITianQingCLDASDataService;
@ -285,4 +287,13 @@ class XxlJobExecutorApplicationTests {
schemaService.buildKeySpace();
}
@Test
void readRadar() {
RadarReader reader = new RadarReader();
RadarRead read = reader.readBinaryFile("C:\\Users\\xiaowuler\\Desktop\\test\\Z_SURF_I_58224_20220228011000_O_AWS-RSD-MM_FTM.BIN", "20220228000927");
System.out.println(read);
new HashMap<>()
}
}

Loading…
Cancel
Save