# 给重力波用的 from dataclasses import dataclass from os import path import numpy as np import netCDF4 as nc import pandas as pd from scipy.optimize import curve_fit # ---------------------------------------------------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------------------------------------------------- # 5---同周期下不同高度数据的BT_z背景位等指标计算 - # ---------------------------------------------------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------- def power_indices(ktemp_cycles, ktemp_wn5, ktemp_ifft, altitude_min, altitude_max): # 定义Brunt-Väisälä频率计算函数 def brunt_vaisala_frequency(g, BT_z, c_p, height): # 计算位温随高度的变化率 dBT_z_dz = np.gradient(BT_z, height) # 计算 Brunt-Väisälä 频率 return np.sqrt((g / BT_z) * ((g / c_p) + dBT_z_dz)) # 定义势能计算函数 def calculate_gravitational_potential_energy(g, BT_z, N_z, PT_z): return 0.5 * ((g / N_z) ** 2) * ((PT_z / BT_z) ** 2) # 重力加速度 g = 9.81 # 空气比热容 c_p = 1004.5 height = np.linspace(altitude_min, altitude_max, ktemp_cycles.shape[1]) * 1000 # 高度 background = ktemp_cycles - ktemp_wn5 # 初始化结果矩阵 N_z_matrix = [] # 初始化结果矩阵 PT_z_matrix = [] # 循环处理background和filtered_perturbation所有行 for i in range(background.shape[0]): BT_z = np.array(background[i]) # 滤波后的扰动 PT_z = np.array(ktemp_ifft[i]) # 调用Brunt-Väisälä频率函数 N_z = brunt_vaisala_frequency(g, BT_z, c_p, height) PT_z = calculate_gravitational_potential_energy( g, BT_z, N_z, PT_z) # 调用势能函数 N_z_matrix.append(N_z) PT_z_matrix.append(PT_z) ktemp_Nz = np.vstack(N_z_matrix) ktemp_Ptz = np.vstack(PT_z_matrix) return ktemp_Nz, ktemp_Ptz # ---------------------------------------------------------------------------------------------------------------------------- # 1---打开文件并读取不同变量数据 - # ---------------------------------------------------------------------------------------------------------------------------- @dataclass class NcData: dataset: nc.Dataset tplatitude: np.ndarray tplongitude: np.ndarray tpaltitude: np.ndarray ktemp: np.ndarray time: np.ndarray date: np.ndarray date_time: np.ndarray path: str = None # 兼容旧代码,老的解构方式也能用 def __iter__(self): return iter([self.dataset, self.tplatitude, self.tplongitude, self.tpaltitude, self.ktemp, self.time, self.date, self.date_time]) ENABLE_SABER_FILTERING = True # 是否启用SABER数据的过滤 def data_nc_load(file_path): dataset = nc.Dataset(file_path, 'r') # do filtering if ENABLE_SABER_FILTERING: # lon_valid = (ds.tplongitude >= 0) & (ds.tplongitude <= 360) lon_valid = (dataset.variables['tplongitude'][:] >= 0) & ( dataset.variables['tplongitude'][:] <= 360) dataset = dataset.where(lon_valid, drop=True) # 第二步:纬度筛选 (-90-90) lat_valid = (dataset.variables['tplatitude'][:] >= - 90) & (dataset.variables['tplatitude'][:] <= 90) dataset = dataset.where(lat_valid, drop=True) # 第三步:温度筛选 (80-1000 或 -999) temp_valid = ((dataset.variables['ktemp'][:] >= 80) & ( dataset.variables['ktemp'][:] <= 1000)) | (dataset.variables['ktemp'][:] == -999) dataset = dataset.where(temp_valid, drop=True) # 纬度数据,二维数组形状为(42820,379) 42820为事件,379则为不同高度 tplatitude = dataset.variables['tplatitude'][:, :] tplongitude = dataset.variables['tplongitude'][:, :] # 经度数据 tpaltitude = dataset.variables['tpaltitude'][:, :] # 高度,二维数组形状为(42820,379) time = dataset.variables['time'][:, :] # 二维数组形状为(42820,379) date = dataset.variables['date'][:] date_time = np.unique(date) # 输出数据时间信息 ktemp = dataset.variables['ktemp'][:, :] # 温度数据,二维数组形状为(42820,379) return NcData(dataset, tplatitude, tplongitude, tpaltitude, ktemp, time, date, date_time, path=file_path) # return dataset, tplatitude, tplongitude, tpaltitude, ktemp, time, date, date_time # ---------------------------------------------------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------------------------------------------------- # 2---筛选某一天、某个纬度和高度范围15个不同cycle的温度数据 - # ---------------------------------------------------------------------------------------------------------------------------- # 2-1 读取某一天的所有事件及其对应的纬度数据 def day_data_read(date, day_read, tplatitude): events = np.where(date == day_read)[0] # 读取筛选天的事件编号 4294-5714位置,从0开始编号 time_events = date[date == day_read] # 读取筛选天的事件编号 4294-5714位置,从0开始编号 latitudes = tplatitude[events, 189] # 输出每个事件中间位置 即第189个经纬度 df = pd.DataFrame([ # 创建一个包含事件编号、纬度的列表,共1421个事件 {'time': time, 'event': event, 'latitude': lat} for time, event, lat in zip(time_events, events, latitudes)]) # print(df.head()) # 打印前几行数据以检查 return df # ---------------------------------------------------------------------------------------------------------------------------- # 2-2 将事件按照卫星轨迹周期进行输出处理,并输出落在纬度范围内的每个周期的事件的行号 def data_cycle_identify(df, latitude_min, latitude_max): cycles = [] # 存储每个周期的事件编号列表 # 存储当前周期的事件编号 current_cycle_events = [] prev_latitude = None # 遍历DataFrame中的每一行以识别周期和筛选事件 for index, row in df.iterrows(): current_event = int(row['event']) current_latitude = row['latitude'] if prev_latitude is not None and prev_latitude < 0 and current_latitude >= 0: # 检查是否是新周期的开始(纬度从负变正,且首次变正) # 重置当前周期的事件编号列表 current_cycle_events = [] if latitude_min <= current_latitude <= latitude_max: # 如果事件的纬度在指定范围内,添加到当前周期的事件编号列表 current_cycle_events.append(current_event) if prev_latitude is not None and prev_latitude >= 0 and current_latitude < 0: # 检查是否是周期的结束(纬度从正变负) # 添加当前周期的事件编号列表到周期列表 if current_cycle_events: # 确保当前周期有事件编号 cycles.append(current_cycle_events) current_cycle_events = [] # 重置当前周期的事件编号列表 prev_latitude = current_latitude if current_cycle_events: # 处理最后一个周期,如果存在的话 cycles.append(current_cycle_events) print(f"一天周期为 {len(cycles)}") for cycle_index, cycle in enumerate(cycles, start=0): # 屏幕显示每个循环周期的事件 print(f"周期 {cycle_index} 包含事件个数: {len(cycle)} 具体事件为: {cycle} ") return cycles # ---------------------------------------------------------------------------------------------------------------------------- # 2-3---按照循环周期合并同周期数据,并输出处理后的温度数据、对应的高度数据 def data_cycle_generate(cycles, ktemp, tpaltitude, altitude_min, altitude_max): if not cycles: # 如果周期列表为空,跳过当前迭代 return None, None ktemp_cycles = [] # 初始化列表存储每个周期的温度 altitude_cycles = [] # 初始化每个循环周期的高度数据 for event in cycles: ktemp_cycles_events = np.array(ktemp[event, :]) # 获取每个周期各个事件的ktemp数据 ktemp_cycles_events[ np.logical_or(ktemp_cycles_events == -999, ktemp_cycles_events > 999)] = np.nan # 缺失值处理,避免影响结果 ktemp_cycles_mean = np.nanmean( ktemp_cycles_events, axis=0) # 对所有周期的 ktemp 数据取均值 altitude_cycles_mean = tpaltitude[event[0], :] # 使用第一个的高度来表征所有的 altitude_indices = np.where((altitude_cycles_mean >= altitude_min) & ( altitude_cycles_mean <= altitude_max))[0] ktemp_cycles.append(np.array(ktemp_cycles_mean[altitude_indices])) altitude_cycles.append( np.array(altitude_cycles_mean[altitude_indices])) min_length = 157 # min(len(arr) for arr in ktemp_cycles) # 找到最短列表的长度 ktemp_cycles = np.vstack([arr[:min_length] for arr in ktemp_cycles]) # 创建新的列表,将每个子列表截断为最短长度 altitude_cycles = np.vstack([arr[:min_length] for arr in altitude_cycles]) return ktemp_cycles, altitude_cycles # ---------------------------------------------------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------------------------------------------------- # 4---高度相同下不同循环周期数据的波拟合和滤波处理 - # ---------------------------------------------------------------------------------------------------------------------------- # 对输入数据按照行(即纬向)进行波数为k的滤波,数据为15*157 def fit_wave(ktemp_wn0, k): wave_fit = [] def single_harmonic(x, A, phi, C, k): return A * np.sin(2 * np.pi / (15/k) * x + phi) + C # 数据转置并对每行进行操作,按照同高度数据进行处理 for rtemp in ktemp_wn0.T: # 为当前高度层创建索引数组 indices = np.arange(rtemp.size) def fit_temp(x, A, phi, C): return single_harmonic( x, A, phi, C, k) # 定义只拟合 A, phi, C 的 lambda 函数,k 固定 params, params_covariance = curve_fit( fit_temp, indices, rtemp) # 使用 curve_fit 进行拟合 # 提取拟合参数 A, phi, C A, phi, C = params # 使用拟合参数计算 wn3 rtemp1 = single_harmonic(indices, A, phi, C, k) # 存储拟合参数和拟合曲线 wave_fit.append(np.array(rtemp1)) wave_fit = np.vstack(wave_fit) wave_fit = wave_fit.T wave_wn = ktemp_wn0-wave_fit return wave_fit, wave_wn # ---------------------------------------------------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------------------------------------------------- # 4---同周期下不同高度数据的波拟合和滤波处理 - # ---------------------------------------------------------------------------------------------------------------------------- # 对输入数据按照列(即纬向)进行滤波,滤除波长2和15km以内的波, 数据为15*157 def fft_ifft_wave(ktemp_wn5, lamda_low, lamda_high, altitude_min, altitude_max, lvboin): ktemp_fft = [] ktemp_ifft = [] ktemp_fft_lvbo = [] for rtemp in ktemp_wn5: # 采样点数或长度 N = len(rtemp) # 采样时间间隔,其倒数等于采用频率,以1km为标准尺度等同于1s,假设波的速度为1km/s dt = (altitude_max-altitude_min)/(N-1) # 时间序列索引 n = np.arange(N) # # t = altitude_min + n * dt # 时间向量 # t = np.round(np.linspace(altitude_min, altitude_max, N),2) # 频率索引向量 f = n / (N * dt) # 对输入信号进行傅里叶变换 y = np.fft.fft(rtemp) # 定义波长滤波范围(以频率计算) # 高频截止频率 f_low = 2*np.pi / lamda_high f_high = 2*np.pi / lamda_low # f_low = 1 / lamda_high # 定义波长滤波范围(以频率计算) # 高频截止频率 # f_high = 1 / lamda_low # 低频截止频率 # 创建滤波后的频域信号 yy = y.copy() # 使用逻辑索引过滤特定频段(未确定) if lvboin: freq_filter = (f > f_low) & (f < f_high) # 创建逻辑掩码 else: freq_filter = (f < f_low) | (f > f_high) # 创建逻辑掩码 yy[freq_filter] = 0 # 过滤掉指定频段 yy_ifft = np.real(np.fft.ifft(yy)) ktemp_fft.append(y) # 存储拟合参数和拟合曲线 ktemp_ifft.append(np.array(yy_ifft)) ktemp_fft_lvbo.append(yy) ktemp_fft = np.vstack(ktemp_fft) ktemp_ifft = np.vstack(ktemp_ifft) ktemp_fft_lvbo = np.vstack(ktemp_fft_lvbo) return ktemp_fft, ktemp_fft_lvbo, ktemp_ifft