from dataclasses import dataclass from datetime import datetime from typing import Dict, List, Optional, Tuple import numpy as np from CONSTANT import DATA_BASEPATH from modules.saber.utils import * # lat_range=(latitude_min, latitude_max), # alt_range=(altitude_min, altitude_max), # lambda_range=(lamda_low, lamda_high), @dataclass class WaveData: cycles: Optional[np.ndarray] = None wn0: Optional[np.ndarray] = None fit_wn: List[Optional[np.ndarray]] = None # 存储wn1-wn5的拟合结果 wn: List[Optional[np.ndarray]] = None # 存储wn1-wn5 fft: Optional[np.ndarray] = None fft_filtered: Optional[np.ndarray] = None ifft: Optional[np.ndarray] = None Nz: Optional[float] = None Ptz: Optional[float] = None lat_range: Tuple[float, float] = None alt_range: Tuple[float, float] = None lambda_range: Tuple[float, float] = None # dump to dict def to_dict(self) -> Dict: """将数据转换为字典格式,与原代码保持兼容""" result = { "ktemp_cycles_mon_list": self.cycles, "ktemp_wn0_mon_list": self.wn0, "ktemp_fit_wn1_mon_list": self.fit_wn[0], "ktemp_wn1_mon_list": self.wn[0], "ktemp_fit_wn2_mon_list": self.fit_wn[1], "ktemp_wn2_mon_list": self.wn[1], "ktemp_fit_wn3_mon_list": self.fit_wn[2], "ktemp_wn3_mon_list": self.wn[2], "ktemp_fit_wn4_mon_list": self.fit_wn[3], "ktemp_wn4_mon_list": self.wn[3], "ktemp_fit_wn5_mon_list": self.fit_wn[4], "ktemp_wn5_mon_list": self.wn[4], "ktemp_fft_mon_list": self.fft, "ktemp_fft_lvbo_mon_list": self.fft_filtered, "ktemp_ifft_mon_list": self.ifft, "ktemp_Nz_mon_list": self.Nz, "ktemp_Ptz_mon_list": self.Ptz, "lat_range": self.lat_range, "alt_range": self.alt_range, "lambda_range": self.lambda_range } return result # load from dict @classmethod def from_dict(cls, data: Dict): wave_data = cls() wave_data.cycles = data["ktemp_cycles_mon_list"] wave_data.wn0 = data["ktemp_wn0_mon_list"] wave_data.fit_wn = [ data[f"ktemp_fit_wn{i+1}_mon_list"] for i in range(5)] wave_data.wn = [data[f"ktemp_wn{i+1}_mon_list"] for i in range(5)] wave_data.fft = data["ktemp_fft_mon_list"] wave_data.fft_filtered = data["ktemp_fft_lvbo_mon_list"] wave_data.ifft = data["ktemp_ifft_mon_list"] wave_data.Nz = data["ktemp_Nz_mon_list"] wave_data.Ptz = data["ktemp_Ptz_mon_list"] wave_data.lat_range = data["lat_range"] wave_data.alt_range = data["alt_range"] wave_data.lambda_range = data["lambda_range"] return wave_data @dataclass class YearlyData: date_times: List[datetime] wave_data: List[WaveData] def to_dict(self) -> Dict: """将数据转换为字典格式,与原代码保持兼容""" result = { "date_time_list": self.date_times, "ktemp_cycles_mon_list": [], "altitude_cycles_mon_list": [], "ktemp_wn0_mon_list": [], "ktemp_fit_wn1_mon_list": [], "ktemp_wn1_mon_list": [], "ktemp_fit_wn2_mon_list": [], "ktemp_wn2_mon_list": [], "ktemp_fit_wn3_mon_list": [], "ktemp_wn3_mon_list": [], "ktemp_fit_wn4_mon_list": [], "ktemp_wn4_mon_list": [], "ktemp_fit_wn5_mon_list": [], "ktemp_wn5_mon_list": [], "ktemp_fft_mon_list": [], "ktemp_fft_lvbo_mon_list": [], "ktemp_ifft_mon_list": [], "ktemp_Nz_mon_list": [], "ktemp_Ptz_mon_list": [], "alt_range": [], } for data in self.wave_data: result["ktemp_cycles_mon_list"].append(data.cycles) result["ktemp_wn0_mon_list"].append(data.wn0) for i in range(5): result[f"ktemp_fit_wn{i+1}_mon_list"].append(data.fit_wn[i]) result[f"ktemp_wn{i+1}_mon_list"].append(data.wn[i]) result["ktemp_fft_mon_list"].append(data.fft) result["ktemp_fft_lvbo_mon_list"].append(data.fft_filtered) result["ktemp_ifft_mon_list"].append(data.ifft) result["ktemp_Nz_mon_list"].append(data.Nz) result["ktemp_Ptz_mon_list"].append(data.Ptz) result["alt_range"].append(data.alt_range) return result # load from dict @classmethod def from_dict(cls, data: Dict): wave_data = [] for i in range(len(data["date_time_list"])): wave_data.append(WaveData()) wave_data[-1].cycles = data["ktemp_cycles_mon_list"][i] wave_data[-1].wn0 = data["ktemp_wn0_mon_list"][i] wave_data[-1].fit_wn = [ data[f"ktemp_fit_wn{i+1}_mon_list"][i] for i in range(5)] wave_data[-1].wn = [data[f"ktemp_wn{i+1}_mon_list"][i] for i in range(5)] wave_data[-1].fft = data["ktemp_fft_mon_list"][i] wave_data[-1].fft_filtered = data["ktemp_fft_lvbo_mon_list"][i] wave_data[-1].ifft = data["ktemp_ifft_mon_list"][i] wave_data[-1].Nz = data["ktemp_Nz_mon_list"][i] wave_data[-1].Ptz = data["ktemp_Ptz_mon_list"][i] return cls(date_times=data["date_time_list"], wave_data=wave_data) class SaberGravitywProcessor: def __init__(self, lat_range: Tuple[float, float], alt_range: Tuple[float, float], lambda_range: Tuple[float, float], lvboin: float = None): self.lat_min, self.lat_max = lat_range self.alt_min, self.alt_max = alt_range self.lambda_low, self.lambda_high = lambda_range self.lvboin = lvboin def process_day(self, ncdata, day_read) -> Optional[WaveData]: # 读取数据 dataset, tplatitude, tplongitude, tpaltitude, ktemp, time, date, date_time = ncdata df = day_data_read(date, day_read, tplatitude) cycles = data_cycle_identify(df, self.lat_min, self.lat_max) if not cycles: return None # 生成周期数据 ktemp_cycles, alt_cycles = data_cycle_generate( cycles, ktemp, tpaltitude, self.alt_min, self.alt_max) if ktemp_cycles is None or alt_cycles is None: return None wave_data = WaveData() wave_data.cycles = ktemp_cycles # 计算波数分析 wave_data.wn0 = ktemp_cycles - np.mean(ktemp_cycles, axis=0) wave_data.fit_wn = [] wave_data.wn = [] temp = wave_data.wn0 # 循环计算波数1-5 for i in range(1, 6): fit, residual = fit_wave(temp, i) wave_data.fit_wn.append(fit) wave_data.wn.append(residual) temp = residual # FFT分析 wave_data.fft, wave_data.fft_filtered, wave_data.ifft = fft_ifft_wave( wave_data.wn[-1], self.lambda_low, self.lambda_high, self.alt_min, self.alt_max, self.lvboin) # 计算指数 wave_data.Nz, wave_data.Ptz = power_indices( wave_data.cycles, wave_data.wn[-1], wave_data.ifft, self.alt_min, self.alt_max) # ranges for wave_data.lat_range = (self.lat_min, self.lat_max) wave_data.alt_range = (self.alt_min, self.alt_max) wave_data.lambda_range = (self.lambda_low, self.lambda_high) return wave_data def process_month(self, ncdata: NcData) -> List[Optional[WaveData]]: monthly_data = [] for day_read in ncdata.date_time: print(f"Processing date: {day_read}") day_result = self.process_day( ncdata, day_read) if day_result is not None: monthly_data.append(day_result) return monthly_data def process_year(self, ncdata: List[NcData]) -> YearlyData: date_times = [] wave_data = [] for ncfile in ncdata: print(f"Processing file: {ncfile.path}") monthly_results = self.process_month(ncfile) date_times.extend(ncfile.date_time) wave_data.extend(monthly_results) return YearlyData(date_times=date_times, wave_data=wave_data) if __name__ == "__main__": year = 2018 path = f"{DATA_BASEPATH.saber}/2012/SABER_Temp_O3_April2012_v2.0.nc" # 初始化某一天、某个纬度、高度范围等参数 latitude_min = 30.0 latitude_max = 40.0 altitude_min = 20.0 altitude_max = 105.0 lamda_low = 2 lamda_high = 15 lvboin = True processor = SaberGravitywProcessor( lat_range=(latitude_min, latitude_max), alt_range=(altitude_min, altitude_max), lambda_range=(lamda_low, lamda_high), lvboin=lvboin ) # 处理月度数据 monthly_results = processor.process_month(path) print(monthly_results)