zephyr-backend/modules/radar/allw_amplitude.py

526 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 此代码可以绘制某年、某高度的纬向风/经向风的,潮汐波和行星波的
# 年振幅图、月振幅图、日拟合正弦波图
# 基本思想是1-先处理某高度全年观测数据得到final_df
# 2-再选择风的类型,得到此高度全年每一天的纬向风/经向风大气长波拟合参数函数process_wind_data_with_models
# 3-下一步就是根据参数绘制图形,可选择年振幅图、月振幅图、日拟合正弦波图(#todo:上一步风的类型已确定)
# plot_yearly_params、plot_month_params、plot_sine_wave_for_day
import glob
import pandas as pd
import numpy as np
import os
from scipy.optimize import curve_fit
import matplotlib.pyplot as plt
from datetime import datetime
# 解决绘图中中文不能显示的问题
import matplotlib
from CONSTANT import DATA_BASEPATH
# 设置中文显示和负号正常显示
matplotlib.rcParams['font.sans-serif'] = ['SimHei'] # 显示中文
matplotlib.rcParams['axes.unicode_minus'] = False # 正常显示负号
# 定义所有模型
def tidal_model(t, v0, a1, b1, a2, b2, a3, b3, a4, b4):
"""潮汐波模型"""
T1, T2, T3, T4 = 6, 8, 12, 24 # 周期以小时为单位
return (v0 + a1 * np.sin((2 * np.pi / T1) * t + b1)
+ a2 * np.sin((2 * np.pi / T2) * t + b2)
+ a3 * np.sin((2 * np.pi / T3) * t + b3)
+ a4 * np.sin((2 * np.pi / T4) * t + b4))
def planetary_model_2day(t, v0, a, b):
"""2日行星波模型"""
T = 48 # 周期为2天48小时
return v0 + a * np.sin((2 * np.pi / T) * t + b)
def planetary_model_5day(t, v0, a, b):
"""5日行星波模型"""
T = 120 # 周期为5天120小时
return v0 + a * np.sin((2 * np.pi / T) * t + b)
def planetary_model_10day(t, v0, a, b):
"""10日行星波模型"""
T = 240 # 周期为10天240小时
return v0 + a * np.sin((2 * np.pi / T) * t + b)
def planetary_model_16day(t, v0, a, b):
"""16日行星波模型"""
T = 384 # 周期为16天384小时
return v0 + a * np.sin((2 * np.pi / T) * t + b)
# 统一管理模型及其参数
MODELS = {
'潮汐波': {
'function': tidal_model,
'param_names': ['v0', 'a1', 'b1', 'a2', 'b2', 'a3', 'b3', 'a4', 'b4'],
'initial_guess': [0, 1, 0, 1, 0, 1, 0, 1, 0],
'bounds': ([-np.inf, 0, -np.inf, 0, -np.inf, 0, -np.inf, 0, -np.inf],
[np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.inf]),
'window': 5 # 窗口为5天
},
'2日行星波': {
'function': planetary_model_2day,
'param_names': ['v0', 'a', 'b'],
'initial_guess': [0, 1, 0],
'bounds': ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]),
'window': 5 # 窗口为5天
},
'5日行星波': {
'function': planetary_model_5day,
'param_names': ['v0', 'a', 'b'],
'initial_guess': [0, 1, 0],
'bounds': ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]),
'window': 15 # 窗口为15天
},
'10日行星波': {
'function': planetary_model_10day,
'param_names': ['v0', 'a', 'b'],
'initial_guess': [0, 1, 0],
'bounds': ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]),
'window': 29 # 窗口为29天
},
'16日行星波': {
'function': planetary_model_16day,
'param_names': ['v0', 'a', 'b'],
'initial_guess': [0, 1, 0],
'bounds': ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]),
'window': 49 # 窗口为49天
},
}
# 选定风的类型,得到整年的每一天拟合参数
def process_wind_data_with_models(final_df, wind_type, year, H, selected_models=None):
"""
处理风速数据并针对多种模型进行拟合和绘图。
:param selected_models: 可选,指定使用的模型类型列表。如果为 None则处理所有模型。
:return: 整年所有模型的拟合参数字典
"""
wind_data = final_df[[col for col in final_df.columns if wind_type in col]]
wind_data.columns = [col[:8] for col in wind_data.columns]
result = wind_data.to_numpy().T.flatten()
column_names = np.repeat(wind_data.columns.to_numpy(), 24)
combined = pd.DataFrame(np.vstack((column_names, result)))
dates = combined.iloc[0].values
wind_speed = combined.iloc[1].values
date_series = pd.to_datetime(dates)
data = pd.DataFrame({'date': date_series, 'wind_speed': wind_speed})
data.set_index('date', inplace=True)
unique_dates = np.unique(data.index.date)
# 如果指定了模型,则只处理指定的模型
if selected_models is None:
models_to_process = MODELS.keys() # 默认处理所有模型
else:
models_to_process = selected_models # 使用指定的模型列表
all_params = {} # 存储所有模型的参数数据框
for model_name in models_to_process:
if model_name not in MODELS:
print(f"Model {model_name} is not valid.")
continue
model_info = MODELS[model_name]
params = []
model_func = model_info['function']
param_names = model_info['param_names']
initial_guess = model_info['initial_guess']
bounds = model_info['bounds']
window_days = model_info['window'] # 获取窗口长度
for date in unique_dates:
# 根据模型窗口调整时间范围
start_date = pd.to_datetime(
date) - pd.Timedelta(days=(window_days - 1) / 2)
end_date = pd.to_datetime(
date) + pd.Timedelta(days=(window_days - 1) / 2)
window_data = data[start_date:end_date]
t = np.arange(len(window_data)) + 1
y = pd.to_numeric(window_data['wind_speed'], errors='coerce')
valid_mask = ~np.isnan(y)
t = t[valid_mask]
y = y[valid_mask]
if len(y) < len(initial_guess) * 2: # todo认为如果数据点个数少于参数的2倍则认为数据不够拟合之前是6倍在这里放宽了
print(
f"Not enough valid data after filtering for date: {date}")
params.append([None] * len(param_names))
continue
try:
popt, _ = curve_fit(
model_func, t, y, p0=initial_guess, bounds=bounds)
params.append(popt)
except RuntimeError:
print(f"Fitting failed for date: {date}")
params.append([None] * len(param_names))
# 保存整年参数
params_df = pd.DataFrame(
params, columns=param_names, index=unique_dates)
# file_name = f'{year}年_{wind_type}_{H // 1000}km_{model_name}_强度.txt'
# params_df.to_csv(os.path.join(output_dir, file_name), sep='\t', index=True, header=True)
all_params[model_name] = params_df # 将每个模型的参数数据框存入字典
return all_params # 返回包含所有模型参数的数据字典
# 外部绘制全年图形
def plot_yearly_params(params_dict, model_name, year, wind_type, H):
"""
绘制全年的参数图形。
:param params_dict: 存储所有模型的参数数据框字典
:param model_name: 模型名称
"""
params_df = params_dict[model_name]
# 绘图
if 'a1' in params_df.columns: # 处理潮汐波多分量
plt.figure(figsize=(12, 6))
for i, T in enumerate([6, 8, 12, 24], start=1):
plt.plot(params_df[f'a{i}'], label=f'{T}h', linewidth=2)
else: # 处理行星波单分量
plt.figure(figsize=(12, 6))
plt.plot(params_df['a'],
label=f'{model_name}', color='orange', linewidth=2)
plt.title(f'{year}{wind_type}_{H // 1000}km_{model_name}_强度', fontsize=16)
plt.xlabel('日期', fontsize=14)
plt.ylabel('幅度 (m/s)', fontsize=14)
plt.xticks(rotation=45)
plt.legend()
plt.tick_params(axis='both', labelsize=12) # 调整刻度字体大小
plt.tight_layout()
# 绘制月振幅图
def plot_month_params(params_dict, model_name, wind_type, year, month, H):
"""
绘制指定月份的参数图形。
:param params_dict: 存储所有模型的参数数据框字典
:param model_name: 模型名称
:param wind_type: 风速类型
:param year: 需要绘制的年份
:param month: 需要绘制的月份
:param output_dir: 输出目录
:param H: 水平尺度
"""
# 获取该年份的数据框
params_df = params_dict[model_name]
# 确保索引是 datetime 类型
if not pd.api.types.is_datetime64_any_dtype(params_df.index):
# print("Index is not datetime. Converting to datetime...")
params_df.index = pd.to_datetime(params_df.index)
# 筛选出指定年份和月份的数据
params_df_month = params_df[(params_df.index.year == year) & (
params_df.index.month == month)]
# 检查筛选后的数据是否为空
if params_df_month.empty:
print(f"No data available for {year}-{month}.")
return
# 绘图
plt.figure(figsize=(12, 6))
# 判断是潮汐波还是行星波
if model_name == '潮汐波': # 处理潮汐波多分量
for i, T in enumerate([6, 8, 12, 24], start=1):
col_name = f'a{i}'
if col_name in params_df_month.columns:
plt.plot(params_df_month[col_name], label=f'{T}h', linewidth=2)
else:
print(
f"Warning: Column '{col_name}' not found in the dataframe.")
else: # 处理行星波单分量
if 'a' in params_df_month.columns:
plt.plot(
params_df_month['a'], label=f'{model_name}', color='orange', linewidth=2)
else:
print(f"Warning: Column 'a' not found in the dataframe.")
# 设置标题和标签
plt.title(
f'{year}{month}{wind_type}_{H // 1000}km_{model_name}_强度', fontsize=16)
plt.xlabel('日期', fontsize=14)
plt.ylabel('幅度 (m/s)', fontsize=14)
plt.xticks(rotation=45)
plt.legend()
plt.tick_params(axis='both', labelsize=12) # 调整刻度字体大小
plt.tight_layout()
# 绘制日拟合正弦波图
def plot_sine_wave_for_day(params_df, date, wind_type, H, model_name):
"""
根据指定日期的参数,绘制标准的正弦曲线图。
:param params_df: 存储所有模型的参数数据框
:param date: 指定日期(例如 '2024-03-17'
:param wind_type: 风速类型(例如 'uwind'
:param H: 水平尺度
:param model_name: 模型名称(例如 '5日行星波', '10日行星波', '潮汐波'
"""
# 将字符串日期转换为 datetime 格式
try:
date = datetime.strptime(date, '%Y-%m-%d')
except ValueError:
print(f"Error: Invalid date format '{date}', expected 'YYYY-MM-DD'.")
return
# 检查 params_df.index 是否为 datetime 类型,若不是,则将其转换
if not isinstance(params_df.index, pd.DatetimeIndex):
print("Warning: params_df index is not of type datetime. Converting index to datetime...")
params_df.index = pd.to_datetime(params_df.index)
# 获取指定日期的参数
if date not in params_df.index:
print(
f"Warning: {date.strftime('%Y-%m-%d')} not found in the parameter dataframe.")
return
params = params_df.loc[date]
# 检查参数是否为 NaN 或 Inf
if params.isnull().any() or np.isinf(params).any():
print(
f"Warning: Parameters for {date.strftime('%Y-%m-%d')} contain NaN or Inf values. Skipping the plot.")
return
# 判断是否是潮汐波模型
if 'a1' in params: # 如果是潮汐波模型
T1, T2, T3, T4 = 6, 8, 12, 24 # 潮汐波的周期
# 生成时间数据
t = np.linspace(0, 48, 1000) # 生成0到48小时的1000个点
# 从参数中提取值
v0 = params['v0']
a1 = params['a1']
b1 = params['b1']
a2 = params['a2']
b2 = params['b2']
a3 = params['a3']
b3 = params['b3']
a4 = params['a4']
b4 = params['b4']
# 计算正弦波的值
y1 = v0 * np.ones_like(t) # 常量项
y2 = a1 * np.sin((2 * np.pi / T1) * t + b1)
y3 = a2 * np.sin((2 * np.pi / T2) * t + b2)
y4 = a3 * np.sin((2 * np.pi / T3) * t + b3)
y5 = a4 * np.sin((2 * np.pi / T4) * t + b4)
# 绘制图形
plt.figure(figsize=(12, 8))
plt.plot(t, y1, label='常量', color='purple')
plt.plot(t, y2, label='6h', color='blue')
plt.plot(t, y3, label='8h', color='orange')
plt.plot(t, y4, label='12h', color='green')
plt.plot(t, y5, label='24h', color='red')
# 添加图例和标签
plt.title(
f'{date.strftime("%Y-%m-%d")} {wind_type}风速拟合潮汐波', fontsize=16)
plt.xlabel('时间 (小时)', fontsize=14)
plt.ylabel('幅度 (m/s)', fontsize=14)
plt.legend()
plt.grid(True)
plt.xlim(0, 48) # 0到48小时
plt.ylim(min(y1.min(), y2.min(), y3.min(), y4.min(), y5.min()-1),
max(y1.max(), y2.max(), y3.max(), y4.max(), y5.max())+1)
plt.tight_layout()
else: # 处理行星波模型
# 根据传入的模型名称选择周期
if model_name == '2日行星波':
T = 48 # 2日行星波的周期为48小时
elif model_name == '5日行星波':
T = 120 # 5日行星波的周期为120小时
elif model_name == '10日行星波':
T = 240 # 10日行星波的周期为240小时
elif model_name == '16日行星波':
T = 384 # 16日行星波的周期为384小时
else:
print(f"Error: Unsupported planetary wave model '{model_name}'.")
return
# 生成时间数据
t = np.linspace(0, T*2, 1000) # 根据周期生成对应的时间范围
# 从参数中提取值
v0 = params['v0']
a = params['a']
b = params['b']
# 计算正弦波的值
y1 = v0 * np.ones_like(t) # 常量项
y2 = a * np.sin((2 * np.pi / T) * t + b)
# 绘制图形
plt.figure(figsize=(12, 8))
plt.plot(t, y1, label='常量', color='purple')
plt.plot(t, y2, label=f'{model_name} ({T}小时)', color='orange')
# 添加图例和标签
plt.title(
f'{date.strftime("%Y-%m-%d")} {wind_type}风速拟合 {model_name}', fontsize=16)
plt.xlabel('时间 (小时)', fontsize=14)
plt.ylabel('幅度 (m/s)', fontsize=14)
plt.legend()
plt.grid(True)
plt.xlim(0, T*2) # 根据周期调整时间范围
plt.ylim(min(y1.min(), y2.min()-1), max(y1.max(), y2.max())+1)
plt.tight_layout()
def get_final_df(H, year, station):
# 下边处理,绘图
# 参数设置,选择高度、时间、站点
# # station = '黑龙江漠河站'
# 文件路径设置
file_dir = rf'{DATA_BASEPATH.radar}/{station}/{year}' # 数据文件路径
# output_dir = rf'./out\{station}\{year}' # 参数txt、图片输出路径
# os.makedirs(output_dir, exist_ok=True) # 确保输出路径存在
# 1------------------------提取整个文件夹的数据
# 获取所有 txt 文件
file_list = [f for f in os.listdir(file_dir) if f.endswith('.txt')]
# 初始化空的 DataFrame用于合并所有文件的数据
final_df = pd.DataFrame()
# 批量处理每个 txt 文件
for file_name in file_list:
file_path = os.path.join(file_dir, file_name) # 拼接文件路径
df = pd.read_csv(file_path, delim_whitespace=True) # 读取文件
date_part = os.path.splitext(file_name)[0][-8:] # 提取日期部分
# 处理缺失值,将 1000000 替换为 NaN
df.loc[df['uwind'] == 1000000, 'uwind'] = np.nan
df.loc[df['vwind'] == 1000000, 'vwind'] = np.nan
# 筛选出 height 列为 90km 的行
filtered_df = df[df['height'] == H]
# 重命名列,包含日期信息
filtered_df = filtered_df.rename(columns={
'uwind': f'{date_part}_uwind',
'vwind': f'{date_part}_vwind'
})
# 只保留需要的列
filtered_df = filtered_df[['height', 'time',
f'{date_part}_uwind', f'{date_part}_vwind']]
# 如果 final_df 为空,则直接赋值
if final_df.empty:
final_df = filtered_df
else:
# 按 'height' 和 'time' 列对齐合并
final_df = pd.merge(final_df, filtered_df, on=[
'height', 'time'], how='outer')
# 生成完整日期范围的列名补全缺失的天使得平年365闰年366
all_dates = pd.date_range(
f'{year}-01-01', f'{year}-12-31').strftime('%Y%m%d')
expected_columns = [f'{date}_uwind' for date in all_dates] + \
[f'{date}_vwind' for date in all_dates]
# 确保 final_df 包含所有日期的列,缺失列补为 NaN
for col in expected_columns:
if col not in final_df.columns:
final_df[col] = np.nan
# 按列名排序(确保日期顺序)
final_df = final_df[['height', 'time'] + sorted(expected_columns)]
return final_df
# 此时final_df已经是90km包含完整天的观测数据---------------------------------------------------------
# 2---------------------------整年纬向风/经向风拟合参数
# params_dict = process_wind_data_with_models(final_df,'uwind', output_dir, year, H,selected_models=['潮汐波','2日行星波'])
# params_dict = process_wind_data_with_models(final_df,
# 'uwind', output_dir, year, H,
# selected_models=None)
class radar_allw_amplitude():
def __init__(self, H, year, station, wind_type):
self.H, self.year, self.station, self.wind_type = H, year, station, wind_type
final_df = get_final_df(H, year, station)
self.params_dict = process_wind_data_with_models(
final_df, wind_type, year, H, selected_models=None)
def render_day(self, date, model_name):
plot_sine_wave_for_day(
self.params_dict[model_name], date, self.wind_type, self.H, model_name)
def render_month(self, month, model_name):
plot_month_params(self.params_dict, model_name,
self.wind_type, self.year, month, self.H)
def render_year(self, model_name):
plot_yearly_params(self.params_dict, model_name,
self.year, self.wind_type, self.H)
@staticmethod
def get_all_models():
return list(MODELS.keys())
@staticmethod
def get_all_pathes():
#
result = glob.glob(f"{DATA_BASEPATH.radar}/**/*.txt", recursive=True)
# normalize path
result = [os.path.normpath(path).replace("\\", "/") for path in result]
return result
# todo:此时得到的参数,年份、高度、风的类型已经固定
# 3---------------------------绘图
# 使用示例:绘制全年图形
if __name__ == '__main__':
# H = 94000 # 高度为 90km
# year = 2022
import matplotlib.font_manager as fm
fm.fontManager.addfont("./SimHei.ttf")
# station = '武汉左岭镇站'
renderer = radar_allw_amplitude(94000, 2022, '武汉左岭镇站', 'uwind')
renderer.render_year('潮汐波')
plt.show()
renderer.render_month(3, '潮汐波')
plt.show()
renderer.render_day('2022-03-17', '潮汐波')
plt.show()