2025-05-07 23:17:08 +08:00

301 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 给重力波用的
from dataclasses import dataclass
from os import path
import numpy as np
import netCDF4 as nc
import pandas as pd
from scipy.optimize import curve_fit
# ----------------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------------
# 5---同周期下不同高度数据的BT_z背景位等指标计算 -
# ----------------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------
def power_indices(ktemp_cycles, ktemp_wn5, ktemp_ifft, altitude_min, altitude_max):
# 定义Brunt-Väisälä频率计算函数
def brunt_vaisala_frequency(g, BT_z, c_p, height):
# 计算位温随高度的变化率
dBT_z_dz = np.gradient(BT_z, height)
# 计算 Brunt-Väisälä 频率
return np.sqrt((g / BT_z) * ((g / c_p) + dBT_z_dz))
# 定义势能计算函数
def calculate_gravitational_potential_energy(g, BT_z, N_z, PT_z):
return 0.5 * ((g / N_z) ** 2) * ((PT_z / BT_z) ** 2)
# 重力加速度
g = 9.81
# 空气比热容
c_p = 1004.5
height = np.linspace(altitude_min, altitude_max,
ktemp_cycles.shape[1]) * 1000 # 高度
background = ktemp_cycles - ktemp_wn5
# 初始化结果矩阵
N_z_matrix = []
# 初始化结果矩阵
PT_z_matrix = []
# 循环处理background和filtered_perturbation所有行
for i in range(background.shape[0]):
BT_z = np.array(background[i])
# 滤波后的扰动
PT_z = np.array(ktemp_ifft[i])
# 调用Brunt-Väisälä频率函数
N_z = brunt_vaisala_frequency(g, BT_z, c_p, height)
PT_z = calculate_gravitational_potential_energy(
g, BT_z, N_z, PT_z) # 调用势能函数
N_z_matrix.append(N_z)
PT_z_matrix.append(PT_z)
ktemp_Nz = np.vstack(N_z_matrix)
ktemp_Ptz = np.vstack(PT_z_matrix)
return ktemp_Nz, ktemp_Ptz
# ----------------------------------------------------------------------------------------------------------------------------
# 1---打开文件并读取不同变量数据 -
# ----------------------------------------------------------------------------------------------------------------------------
@dataclass
class NcData:
dataset: nc.Dataset
tplatitude: np.ndarray
tplongitude: np.ndarray
tpaltitude: np.ndarray
ktemp: np.ndarray
time: np.ndarray
date: np.ndarray
date_time: np.ndarray
path: str = None
# 兼容旧代码,老的解构方式也能用
def __iter__(self):
return iter([self.dataset, self.tplatitude, self.tplongitude, self.tpaltitude, self.ktemp, self.time, self.date, self.date_time])
ENABLE_SABER_FILTERING = False # 是否启用SABER数据的过滤
def data_nc_load(file_path):
dataset = nc.Dataset(file_path, 'r')
# do filtering
if ENABLE_SABER_FILTERING:
# lon_valid = (ds.tplongitude >= 0) & (ds.tplongitude <= 360)
lon_valid = (dataset.variables['tplongitude'][:] >= 0) & (
dataset.variables['tplongitude'][:] <= 360)
dataset = dataset.where(lon_valid, drop=True)
# 第二步:纬度筛选 (-90-90)
lat_valid = (dataset.variables['tplatitude'][:] >= -
90) & (dataset.variables['tplatitude'][:] <= 90)
dataset = dataset.where(lat_valid, drop=True)
# 第三步:温度筛选 (80-1000 或 -999)
temp_valid = ((dataset.variables['ktemp'][:] >= 80) & (
dataset.variables['ktemp'][:] <= 1000)) | (dataset.variables['ktemp'][:] == -999)
dataset = dataset.where(temp_valid, drop=True)
# 纬度数据,二维数组形状为(42820,379) 42820为事件379则为不同高度
tplatitude = dataset.variables['tplatitude'][:, :]
tplongitude = dataset.variables['tplongitude'][:, :] # 经度数据
tpaltitude = dataset.variables['tpaltitude'][:, :] # 高度,二维数组形状为(42820,379)
time = dataset.variables['time'][:, :] # 二维数组形状为(42820,379)
date = dataset.variables['date'][:]
date_time = np.unique(date) # 输出数据时间信息
ktemp = dataset.variables['ktemp'][:, :] # 温度数据,二维数组形状为(42820,379)
return NcData(dataset, tplatitude, tplongitude, tpaltitude, ktemp, time, date, date_time, path=file_path)
# return dataset, tplatitude, tplongitude, tpaltitude, ktemp, time, date, date_time
# ----------------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------------
# 2---筛选某一天、某个纬度和高度范围15个不同cycle的温度数据 -
# ----------------------------------------------------------------------------------------------------------------------------
# 2-1 读取某一天的所有事件及其对应的纬度数据
def day_data_read(date, day_read, tplatitude):
events = np.where(date == day_read)[0] # 读取筛选天的事件编号 4294-5714位置从0开始编号
time_events = date[date == day_read] # 读取筛选天的事件编号 4294-5714位置从0开始编号
latitudes = tplatitude[events, 189] # 输出每个事件中间位置 即第189个经纬度
df = pd.DataFrame([ # 创建一个包含事件编号、纬度的列表共1421个事件
{'time': time, 'event': event, 'latitude': lat}
for time, event, lat in zip(time_events, events, latitudes)])
# print(df.head()) # 打印前几行数据以检查
return df
# ----------------------------------------------------------------------------------------------------------------------------
# 2-2 将事件按照卫星轨迹周期进行输出处理,并输出落在纬度范围内的每个周期的事件的行号
def data_cycle_identify(df, latitude_min, latitude_max):
cycles = [] # 存储每个周期的事件编号列表
# 存储当前周期的事件编号
current_cycle_events = []
prev_latitude = None
# 遍历DataFrame中的每一行以识别周期和筛选事件
for index, row in df.iterrows():
current_event = int(row['event'])
current_latitude = row['latitude']
if prev_latitude is not None and prev_latitude < 0 and current_latitude >= 0: # 检查是否是新周期的开始(纬度从负变正,且首次变正)
# 重置当前周期的事件编号列表
current_cycle_events = []
if latitude_min <= current_latitude <= latitude_max: # 如果事件的纬度在指定范围内,添加到当前周期的事件编号列表
current_cycle_events.append(current_event)
if prev_latitude is not None and prev_latitude >= 0 and current_latitude < 0: # 检查是否是周期的结束(纬度从正变负)
# 添加当前周期的事件编号列表到周期列表
if current_cycle_events: # 确保当前周期有事件编号
cycles.append(current_cycle_events)
current_cycle_events = [] # 重置当前周期的事件编号列表
prev_latitude = current_latitude
if current_cycle_events: # 处理最后一个周期,如果存在的话
cycles.append(current_cycle_events)
print(f"一天周期为 {len(cycles)}")
for cycle_index, cycle in enumerate(cycles, start=0):
# 屏幕显示每个循环周期的事件
print(f"周期 {cycle_index} 包含事件个数: {len(cycle)} 具体事件为: {cycle} ")
return cycles
# ----------------------------------------------------------------------------------------------------------------------------
# 2-3---按照循环周期合并同周期数据,并输出处理后的温度数据、对应的高度数据
def data_cycle_generate(cycles, ktemp, tpaltitude, altitude_min, altitude_max):
if not cycles: # 如果周期列表为空,跳过当前迭代
return None, None
ktemp_cycles = [] # 初始化列表存储每个周期的温度
altitude_cycles = [] # 初始化每个循环周期的高度数据
for event in cycles:
ktemp_cycles_events = np.array(ktemp[event, :]) # 获取每个周期各个事件的ktemp数据
ktemp_cycles_events[
np.logical_or(ktemp_cycles_events == -999, ktemp_cycles_events > 999)] = np.nan # 缺失值处理,避免影响结果
ktemp_cycles_mean = np.nanmean(
ktemp_cycles_events, axis=0) # 对所有周期的 ktemp 数据取均值
altitude_cycles_mean = tpaltitude[event[0], :] # 使用第一个的高度来表征所有的
altitude_indices = np.where((altitude_cycles_mean >= altitude_min) & (
altitude_cycles_mean <= altitude_max))[0]
ktemp_cycles.append(np.array(ktemp_cycles_mean[altitude_indices]))
altitude_cycles.append(
np.array(altitude_cycles_mean[altitude_indices]))
min_length = 157 # min(len(arr) for arr in ktemp_cycles) # 找到最短列表的长度
ktemp_cycles = np.vstack([arr[:min_length]
for arr in ktemp_cycles]) # 创建新的列表,将每个子列表截断为最短长度
altitude_cycles = np.vstack([arr[:min_length] for arr in altitude_cycles])
return ktemp_cycles, altitude_cycles
# ----------------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------------
# 4---高度相同下不同循环周期数据的波拟合和滤波处理 -
# ----------------------------------------------------------------------------------------------------------------------------
# 对输入数据按照行即纬向进行波数为k的滤波,数据为15*157
def fit_wave(ktemp_wn0, k):
wave_fit = []
def single_harmonic(x, A, phi, C, k):
return A * np.sin(2 * np.pi / (15/k) * x + phi) + C
# 数据转置并对每行进行操作,按照同高度数据进行处理
for rtemp in ktemp_wn0.T:
# 为当前高度层创建索引数组
indices = np.arange(rtemp.size)
def fit_temp(x, A, phi, C): return single_harmonic(
x, A, phi, C, k) # 定义只拟合 A, phi, C 的 lambda 函数k 固定
params, params_covariance = curve_fit(
fit_temp, indices, rtemp) # 使用 curve_fit 进行拟合
# 提取拟合参数 A, phi, C
A, phi, C = params
# 使用拟合参数计算 wn3
rtemp1 = single_harmonic(indices, A, phi, C, k)
# 存储拟合参数和拟合曲线
wave_fit.append(np.array(rtemp1))
wave_fit = np.vstack(wave_fit)
wave_fit = wave_fit.T
wave_wn = ktemp_wn0-wave_fit
return wave_fit, wave_wn
# ----------------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------------
# 4---同周期下不同高度数据的波拟合和滤波处理 -
# ----------------------------------------------------------------------------------------------------------------------------
# 对输入数据按照列即纬向进行滤波滤除波长2和15km以内的波, 数据为15*157
def fft_ifft_wave(ktemp_wn5, lamda_low, lamda_high, altitude_min, altitude_max, lvboin):
ktemp_fft = []
ktemp_ifft = []
ktemp_fft_lvbo = []
for rtemp in ktemp_wn5:
# 采样点数或长度
N = len(rtemp)
# 采样时间间隔其倒数等于采用频率以1km为标准尺度等同于1s假设波的速度为1km/s
dt = (altitude_max-altitude_min)/(N-1)
# 时间序列索引
n = np.arange(N)
# # t = altitude_min + n * dt # 时间向量
# t = np.round(np.linspace(altitude_min, altitude_max, N),2)
# 频率索引向量
f = n / (N * dt)
# 对输入信号进行傅里叶变换
y = np.fft.fft(rtemp)
# 定义波长滤波范围(以频率计算) # 高频截止频率
f_low = 2*np.pi / lamda_high
f_high = 2*np.pi / lamda_low
# f_low = 1 / lamda_high # 定义波长滤波范围(以频率计算) # 高频截止频率
# f_high = 1 / lamda_low # 低频截止频率
# 创建滤波后的频域信号
yy = y.copy()
# 使用逻辑索引过滤特定频段(未确定)
if lvboin:
freq_filter = (f > f_low) & (f < f_high) # 创建逻辑掩码
else:
freq_filter = (f < f_low) | (f > f_high) # 创建逻辑掩码
yy[freq_filter] = 0 # 过滤掉指定频段
yy_ifft = np.real(np.fft.ifft(yy))
ktemp_fft.append(y)
# 存储拟合参数和拟合曲线
ktemp_ifft.append(np.array(yy_ifft))
ktemp_fft_lvbo.append(yy)
ktemp_fft = np.vstack(ktemp_fft)
ktemp_ifft = np.vstack(ktemp_ifft)
ktemp_fft_lvbo = np.vstack(ktemp_fft_lvbo)
return ktemp_fft, ktemp_fft_lvbo, ktemp_ifft