247 lines
9.1 KiB
Python
247 lines
9.1 KiB
Python
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Tuple
|
|
import numpy as np
|
|
from CONSTANT import DATA_BASEPATH
|
|
from modules.saber.utils import *
|
|
|
|
# lat_range=(latitude_min, latitude_max),
|
|
# alt_range=(altitude_min, altitude_max),
|
|
# lambda_range=(lamda_low, lamda_high),
|
|
|
|
|
|
@dataclass
|
|
class WaveData:
|
|
cycles: Optional[np.ndarray] = None
|
|
wn0: Optional[np.ndarray] = None
|
|
fit_wn: List[Optional[np.ndarray]] = None # 存储wn1-wn5的拟合结果
|
|
wn: List[Optional[np.ndarray]] = None # 存储wn1-wn5
|
|
fft: Optional[np.ndarray] = None
|
|
fft_filtered: Optional[np.ndarray] = None
|
|
ifft: Optional[np.ndarray] = None
|
|
Nz: Optional[float] = None
|
|
Ptz: Optional[float] = None
|
|
lat_range: Tuple[float, float] = None
|
|
alt_range: Tuple[float, float] = None
|
|
lambda_range: Tuple[float, float] = None
|
|
|
|
# dump to dict
|
|
def to_dict(self) -> Dict:
|
|
"""将数据转换为字典格式,与原代码保持兼容"""
|
|
result = {
|
|
"ktemp_cycles_mon_list": self.cycles,
|
|
"ktemp_wn0_mon_list": self.wn0,
|
|
"ktemp_fit_wn1_mon_list": self.fit_wn[0],
|
|
"ktemp_wn1_mon_list": self.wn[0],
|
|
"ktemp_fit_wn2_mon_list": self.fit_wn[1],
|
|
"ktemp_wn2_mon_list": self.wn[1],
|
|
"ktemp_fit_wn3_mon_list": self.fit_wn[2],
|
|
"ktemp_wn3_mon_list": self.wn[2],
|
|
"ktemp_fit_wn4_mon_list": self.fit_wn[3],
|
|
"ktemp_wn4_mon_list": self.wn[3],
|
|
"ktemp_fit_wn5_mon_list": self.fit_wn[4],
|
|
"ktemp_wn5_mon_list": self.wn[4],
|
|
"ktemp_fft_mon_list": self.fft,
|
|
"ktemp_fft_lvbo_mon_list": self.fft_filtered,
|
|
"ktemp_ifft_mon_list": self.ifft,
|
|
"ktemp_Nz_mon_list": self.Nz,
|
|
"ktemp_Ptz_mon_list": self.Ptz,
|
|
"lat_range": self.lat_range,
|
|
"alt_range": self.alt_range,
|
|
"lambda_range": self.lambda_range
|
|
}
|
|
return result
|
|
|
|
# load from dict
|
|
@classmethod
|
|
def from_dict(cls, data: Dict):
|
|
wave_data = cls()
|
|
wave_data.cycles = data["ktemp_cycles_mon_list"]
|
|
wave_data.wn0 = data["ktemp_wn0_mon_list"]
|
|
wave_data.fit_wn = [
|
|
data[f"ktemp_fit_wn{i+1}_mon_list"] for i in range(5)]
|
|
wave_data.wn = [data[f"ktemp_wn{i+1}_mon_list"] for i in range(5)]
|
|
wave_data.fft = data["ktemp_fft_mon_list"]
|
|
wave_data.fft_filtered = data["ktemp_fft_lvbo_mon_list"]
|
|
wave_data.ifft = data["ktemp_ifft_mon_list"]
|
|
wave_data.Nz = data["ktemp_Nz_mon_list"]
|
|
wave_data.Ptz = data["ktemp_Ptz_mon_list"]
|
|
wave_data.lat_range = data["lat_range"]
|
|
wave_data.alt_range = data["alt_range"]
|
|
wave_data.lambda_range = data["lambda_range"]
|
|
return wave_data
|
|
|
|
|
|
@dataclass
|
|
class YearlyData:
|
|
date_times: List[datetime]
|
|
wave_data: List[WaveData]
|
|
|
|
def to_dict(self) -> Dict:
|
|
"""将数据转换为字典格式,与原代码保持兼容"""
|
|
result = {
|
|
"date_time_list": self.date_times,
|
|
"ktemp_cycles_mon_list": [],
|
|
"altitude_cycles_mon_list": [],
|
|
"ktemp_wn0_mon_list": [],
|
|
"ktemp_fit_wn1_mon_list": [],
|
|
"ktemp_wn1_mon_list": [],
|
|
"ktemp_fit_wn2_mon_list": [],
|
|
"ktemp_wn2_mon_list": [],
|
|
"ktemp_fit_wn3_mon_list": [],
|
|
"ktemp_wn3_mon_list": [],
|
|
"ktemp_fit_wn4_mon_list": [],
|
|
"ktemp_wn4_mon_list": [],
|
|
"ktemp_fit_wn5_mon_list": [],
|
|
"ktemp_wn5_mon_list": [],
|
|
"ktemp_fft_mon_list": [],
|
|
"ktemp_fft_lvbo_mon_list": [],
|
|
"ktemp_ifft_mon_list": [],
|
|
"ktemp_Nz_mon_list": [],
|
|
"ktemp_Ptz_mon_list": [],
|
|
"alt_range": [],
|
|
}
|
|
|
|
for data in self.wave_data:
|
|
result["ktemp_cycles_mon_list"].append(data.cycles)
|
|
result["ktemp_wn0_mon_list"].append(data.wn0)
|
|
for i in range(5):
|
|
result[f"ktemp_fit_wn{i+1}_mon_list"].append(data.fit_wn[i])
|
|
result[f"ktemp_wn{i+1}_mon_list"].append(data.wn[i])
|
|
result["ktemp_fft_mon_list"].append(data.fft)
|
|
result["ktemp_fft_lvbo_mon_list"].append(data.fft_filtered)
|
|
result["ktemp_ifft_mon_list"].append(data.ifft)
|
|
result["ktemp_Nz_mon_list"].append(data.Nz)
|
|
result["ktemp_Ptz_mon_list"].append(data.Ptz)
|
|
result["alt_range"].append(data.alt_range)
|
|
|
|
return result
|
|
|
|
# load from dict
|
|
@classmethod
|
|
def from_dict(cls, data: Dict):
|
|
wave_data = []
|
|
for i in range(len(data["date_time_list"])):
|
|
wave_data.append(WaveData())
|
|
wave_data[-1].cycles = data["ktemp_cycles_mon_list"][i]
|
|
wave_data[-1].wn0 = data["ktemp_wn0_mon_list"][i]
|
|
wave_data[-1].fit_wn = [
|
|
data[f"ktemp_fit_wn{i+1}_mon_list"][i] for i in range(5)]
|
|
wave_data[-1].wn = [data[f"ktemp_wn{i+1}_mon_list"][i]
|
|
for i in range(5)]
|
|
wave_data[-1].fft = data["ktemp_fft_mon_list"][i]
|
|
wave_data[-1].fft_filtered = data["ktemp_fft_lvbo_mon_list"][i]
|
|
wave_data[-1].ifft = data["ktemp_ifft_mon_list"][i]
|
|
wave_data[-1].Nz = data["ktemp_Nz_mon_list"][i]
|
|
wave_data[-1].Ptz = data["ktemp_Ptz_mon_list"][i]
|
|
|
|
return cls(date_times=data["date_time_list"], wave_data=wave_data)
|
|
|
|
|
|
class SaberGravitywProcessor:
|
|
def __init__(self, lat_range: Tuple[float, float],
|
|
alt_range: Tuple[float, float],
|
|
lambda_range: Tuple[float, float],
|
|
lvboin: float):
|
|
self.lat_min, self.lat_max = lat_range
|
|
self.alt_min, self.alt_max = alt_range
|
|
self.lambda_low, self.lambda_high = lambda_range
|
|
self.lvboin = lvboin
|
|
|
|
def process_day(self, ncdata, day_read) -> Optional[WaveData]:
|
|
# 读取数据
|
|
dataset, tplatitude, tplongitude, tpaltitude, ktemp, time, date, date_time = ncdata
|
|
df = day_data_read(date, day_read, tplatitude)
|
|
cycles = data_cycle_identify(df, self.lat_min, self.lat_max)
|
|
|
|
if not cycles:
|
|
return None
|
|
|
|
# 生成周期数据
|
|
ktemp_cycles, alt_cycles = data_cycle_generate(
|
|
cycles, ktemp, tpaltitude, self.alt_min, self.alt_max)
|
|
|
|
if ktemp_cycles is None or alt_cycles is None:
|
|
return None
|
|
|
|
wave_data = WaveData()
|
|
wave_data.cycles = ktemp_cycles
|
|
|
|
# 计算波数分析
|
|
wave_data.wn0 = ktemp_cycles - np.mean(ktemp_cycles, axis=0)
|
|
|
|
wave_data.fit_wn = []
|
|
wave_data.wn = []
|
|
temp = wave_data.wn0
|
|
|
|
# 循环计算波数1-5
|
|
for i in range(1, 6):
|
|
fit, residual = fit_wave(temp, i)
|
|
wave_data.fit_wn.append(fit)
|
|
wave_data.wn.append(residual)
|
|
temp = residual
|
|
|
|
# FFT分析
|
|
wave_data.fft, wave_data.fft_filtered, wave_data.ifft = fft_ifft_wave(
|
|
wave_data.wn[-1], self.lambda_low, self.lambda_high,
|
|
self.alt_min, self.alt_max, self.lvboin)
|
|
|
|
# 计算指数
|
|
wave_data.Nz, wave_data.Ptz = power_indices(
|
|
wave_data.cycles, wave_data.wn[-1], wave_data.ifft,
|
|
self.alt_min, self.alt_max)
|
|
|
|
# ranges for
|
|
wave_data.lat_range = (self.lat_min, self.lat_max)
|
|
wave_data.alt_range = (self.alt_min, self.alt_max)
|
|
wave_data.lambda_range = (self.lambda_low, self.lambda_high)
|
|
|
|
return wave_data
|
|
|
|
def process_month(self, ncdata: NcData) -> List[Optional[WaveData]]:
|
|
|
|
monthly_data = []
|
|
for day_read in ncdata.date_time:
|
|
print(f"Processing date: {day_read}")
|
|
day_result = self.process_day(
|
|
ncdata, day_read)
|
|
|
|
if day_result is not None:
|
|
monthly_data.append(day_result)
|
|
|
|
return monthly_data
|
|
|
|
def process_year(self, ncdata: List[NcData]) -> YearlyData:
|
|
date_times = []
|
|
wave_data = []
|
|
for ncfile in ncdata:
|
|
print(f"Processing file: {ncfile.path}")
|
|
monthly_results = self.process_month(ncfile)
|
|
date_times.extend(ncfile.date_time)
|
|
wave_data.extend(monthly_results)
|
|
|
|
return YearlyData(date_times=date_times, wave_data=wave_data)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
year = 2018
|
|
path = f"{DATA_BASEPATH.saber}/2012/SABER_Temp_O3_April2012_v2.0.nc"
|
|
# 初始化某一天、某个纬度、高度范围等参数
|
|
latitude_min = 30.0
|
|
latitude_max = 40.0
|
|
altitude_min = 20.0
|
|
altitude_max = 105.0
|
|
lamda_low = 2
|
|
lamda_high = 15
|
|
lvboin = True
|
|
processor = SaberGravitywProcessor(
|
|
lat_range=(latitude_min, latitude_max),
|
|
alt_range=(altitude_min, altitude_max),
|
|
lambda_range=(lamda_low, lamda_high),
|
|
lvboin=lvboin
|
|
)
|
|
|
|
# 处理月度数据
|
|
monthly_results = processor.process_month(path)
|
|
print(monthly_results)
|