zephyr-backend/saber/process.py
2025-02-13 20:03:12 +08:00

247 lines
9.1 KiB
Python

from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import numpy as np
from CONSTANT import DATA_BASEPATH
from saber.utils import *
# lat_range=(latitude_min, latitude_max),
# alt_range=(altitude_min, altitude_max),
# lambda_range=(lamda_low, lamda_high),
@dataclass
class WaveData:
cycles: Optional[np.ndarray] = None
wn0: Optional[np.ndarray] = None
fit_wn: List[Optional[np.ndarray]] = None # 存储wn1-wn5的拟合结果
wn: List[Optional[np.ndarray]] = None # 存储wn1-wn5
fft: Optional[np.ndarray] = None
fft_filtered: Optional[np.ndarray] = None
ifft: Optional[np.ndarray] = None
Nz: Optional[float] = None
Ptz: Optional[float] = None
lat_range: Tuple[float, float] = None
alt_range: Tuple[float, float] = None
lambda_range: Tuple[float, float] = None
# dump to dict
def to_dict(self) -> Dict:
"""将数据转换为字典格式,与原代码保持兼容"""
result = {
"ktemp_cycles_mon_list": self.cycles,
"ktemp_wn0_mon_list": self.wn0,
"ktemp_fit_wn1_mon_list": self.fit_wn[0],
"ktemp_wn1_mon_list": self.wn[0],
"ktemp_fit_wn2_mon_list": self.fit_wn[1],
"ktemp_wn2_mon_list": self.wn[1],
"ktemp_fit_wn3_mon_list": self.fit_wn[2],
"ktemp_wn3_mon_list": self.wn[2],
"ktemp_fit_wn4_mon_list": self.fit_wn[3],
"ktemp_wn4_mon_list": self.wn[3],
"ktemp_fit_wn5_mon_list": self.fit_wn[4],
"ktemp_wn5_mon_list": self.wn[4],
"ktemp_fft_mon_list": self.fft,
"ktemp_fft_lvbo_mon_list": self.fft_filtered,
"ktemp_ifft_mon_list": self.ifft,
"ktemp_Nz_mon_list": self.Nz,
"ktemp_Ptz_mon_list": self.Ptz,
"lat_range": self.lat_range,
"alt_range": self.alt_range,
"lambda_range": self.lambda_range
}
return result
# load from dict
@classmethod
def from_dict(cls, data: Dict):
wave_data = cls()
wave_data.cycles = data["ktemp_cycles_mon_list"]
wave_data.wn0 = data["ktemp_wn0_mon_list"]
wave_data.fit_wn = [
data[f"ktemp_fit_wn{i+1}_mon_list"] for i in range(5)]
wave_data.wn = [data[f"ktemp_wn{i+1}_mon_list"] for i in range(5)]
wave_data.fft = data["ktemp_fft_mon_list"]
wave_data.fft_filtered = data["ktemp_fft_lvbo_mon_list"]
wave_data.ifft = data["ktemp_ifft_mon_list"]
wave_data.Nz = data["ktemp_Nz_mon_list"]
wave_data.Ptz = data["ktemp_Ptz_mon_list"]
wave_data.lat_range = data["lat_range"]
wave_data.alt_range = data["alt_range"]
wave_data.lambda_range = data["lambda_range"]
return wave_data
@dataclass
class YearlyData:
date_times: List[datetime]
wave_data: List[WaveData]
def to_dict(self) -> Dict:
"""将数据转换为字典格式,与原代码保持兼容"""
result = {
"date_time_list": self.date_times,
"ktemp_cycles_mon_list": [],
"altitude_cycles_mon_list": [],
"ktemp_wn0_mon_list": [],
"ktemp_fit_wn1_mon_list": [],
"ktemp_wn1_mon_list": [],
"ktemp_fit_wn2_mon_list": [],
"ktemp_wn2_mon_list": [],
"ktemp_fit_wn3_mon_list": [],
"ktemp_wn3_mon_list": [],
"ktemp_fit_wn4_mon_list": [],
"ktemp_wn4_mon_list": [],
"ktemp_fit_wn5_mon_list": [],
"ktemp_wn5_mon_list": [],
"ktemp_fft_mon_list": [],
"ktemp_fft_lvbo_mon_list": [],
"ktemp_ifft_mon_list": [],
"ktemp_Nz_mon_list": [],
"ktemp_Ptz_mon_list": [],
"alt_range": [],
}
for data in self.wave_data:
result["ktemp_cycles_mon_list"].append(data.cycles)
result["ktemp_wn0_mon_list"].append(data.wn0)
for i in range(5):
result[f"ktemp_fit_wn{i+1}_mon_list"].append(data.fit_wn[i])
result[f"ktemp_wn{i+1}_mon_list"].append(data.wn[i])
result["ktemp_fft_mon_list"].append(data.fft)
result["ktemp_fft_lvbo_mon_list"].append(data.fft_filtered)
result["ktemp_ifft_mon_list"].append(data.ifft)
result["ktemp_Nz_mon_list"].append(data.Nz)
result["ktemp_Ptz_mon_list"].append(data.Ptz)
result["alt_range"].append(data.alt_range)
return result
# load from dict
@classmethod
def from_dict(cls, data: Dict):
wave_data = []
for i in range(len(data["date_time_list"])):
wave_data.append(WaveData())
wave_data[-1].cycles = data["ktemp_cycles_mon_list"][i]
wave_data[-1].wn0 = data["ktemp_wn0_mon_list"][i]
wave_data[-1].fit_wn = [
data[f"ktemp_fit_wn{i+1}_mon_list"][i] for i in range(5)]
wave_data[-1].wn = [data[f"ktemp_wn{i+1}_mon_list"][i]
for i in range(5)]
wave_data[-1].fft = data["ktemp_fft_mon_list"][i]
wave_data[-1].fft_filtered = data["ktemp_fft_lvbo_mon_list"][i]
wave_data[-1].ifft = data["ktemp_ifft_mon_list"][i]
wave_data[-1].Nz = data["ktemp_Nz_mon_list"][i]
wave_data[-1].Ptz = data["ktemp_Ptz_mon_list"][i]
return cls(date_times=data["date_time_list"], wave_data=wave_data)
class DataProcessor:
def __init__(self, lat_range: Tuple[float, float],
alt_range: Tuple[float, float],
lambda_range: Tuple[float, float],
lvboin: float):
self.lat_min, self.lat_max = lat_range
self.alt_min, self.alt_max = alt_range
self.lambda_low, self.lambda_high = lambda_range
self.lvboin = lvboin
def process_day(self, ncdata, day_read) -> Optional[WaveData]:
# 读取数据
dataset, tplatitude, tplongitude, tpaltitude, ktemp, time, date, date_time = ncdata
df = day_data_read(date, day_read, tplatitude)
cycles = data_cycle_identify(df, self.lat_min, self.lat_max)
if not cycles:
return None
# 生成周期数据
ktemp_cycles, alt_cycles = data_cycle_generate(
cycles, ktemp, tpaltitude, self.alt_min, self.alt_max)
if ktemp_cycles is None or alt_cycles is None:
return None
wave_data = WaveData()
wave_data.cycles = ktemp_cycles
# 计算波数分析
wave_data.wn0 = ktemp_cycles - np.mean(ktemp_cycles, axis=0)
wave_data.fit_wn = []
wave_data.wn = []
temp = wave_data.wn0
# 循环计算波数1-5
for i in range(1, 6):
fit, residual = fit_wave(temp, i)
wave_data.fit_wn.append(fit)
wave_data.wn.append(residual)
temp = residual
# FFT分析
wave_data.fft, wave_data.fft_filtered, wave_data.ifft = fft_ifft_wave(
wave_data.wn[-1], self.lambda_low, self.lambda_high,
self.alt_min, self.alt_max, self.lvboin)
# 计算指数
wave_data.Nz, wave_data.Ptz = power_indices(
wave_data.cycles, wave_data.wn[-1], wave_data.ifft,
self.alt_min, self.alt_max)
# ranges for
wave_data.lat_range = (self.lat_min, self.lat_max)
wave_data.alt_range = (self.alt_min, self.alt_max)
wave_data.lambda_range = (self.lambda_low, self.lambda_high)
return wave_data
def process_month(self, ncdata: NcData) -> List[Optional[WaveData]]:
monthly_data = []
for day_read in ncdata.date_time:
print(f"Processing date: {day_read}")
day_result = self.process_day(
ncdata, day_read)
if day_result is not None:
monthly_data.append(day_result)
return monthly_data
def process_year(self, ncdata: List[NcData]) -> YearlyData:
date_times = []
wave_data = []
for ncfile in ncdata:
print(f"Processing file: {ncfile.path}")
monthly_results = self.process_month(ncfile)
date_times.extend(ncfile.date_time)
wave_data.extend(monthly_results)
return YearlyData(date_times=date_times, wave_data=wave_data)
if __name__ == "__main__":
year = 2018
path = f"{DATA_BASEPATH.saber}/2012/SABER_Temp_O3_April2012_v2.0.nc"
# 初始化某一天、某个纬度、高度范围等参数
latitude_min = 30.0
latitude_max = 40.0
altitude_min = 20.0
altitude_max = 105.0
lamda_low = 2
lamda_high = 15
lvboin = True
processor = DataProcessor(
lat_range=(latitude_min, latitude_max),
alt_range=(altitude_min, altitude_max),
lambda_range=(lamda_low, lamda_high),
lvboin=lvboin
)
# 处理月度数据
monthly_results = processor.process_month(path)
print(monthly_results)