zephyr-backend/radar/plot_prod.py
2025-01-23 23:31:52 +08:00

235 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import glob
from io import BytesIO
import os
import re
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
import seaborn as sns
# 解决绘图中中文不能显示的问题
import matplotlib
matplotlib.rcParams['font.sans-serif'] = ['SimHei']
matplotlib.rcParams['axes.unicode_minus'] = False
# ------------------------------ 模型定义 ------------------------------
def tidal_model(t, v0, a1, b1, a2, b2, a3, b3, a4, b4):
"""潮汐波模型"""
T1, T2, T3, T4 = 6, 8, 12, 24 # 周期以小时为单位
return (v0 + a1 * np.sin((2 * np.pi / T1) * t + b1)
+ a2 * np.sin((2 * np.pi / T2) * t + b2)
+ a3 * np.sin((2 * np.pi / T3) * t + b3)
+ a4 * np.sin((2 * np.pi / T4) * t + b4))
def planetary_model(t, v0, a, b, period):
"""行星波模型"""
return v0 + a * np.sin((2 * np.pi / period) * t + b)
# ------------------------------ 数据处理工具函数 ------------------------------
def preprocess_data(file_list, year, month_range=(1,12)):
def filter_given_month(month_range, file_list):
begin_month, end_month = month_range
get_date_pattern = r'_(\d{8})\.txt'
filtered_file_list = []
for file in file_list:
date = re.search(get_date_pattern, file).group(1)
month = int(date[4:6])
if month >= begin_month and month <= end_month:
filtered_file_list.append(file)
return filtered_file_list
if file_list == []:
raise ValueError("No data files found.")
file_list = filter_given_month(month_range, file_list)
"""读取数据文件并合并成完整的 DataFrame"""
# file_list = [f for f in os.listdir(file_dir) if f.endswith('.txt')]
final_df = pd.DataFrame()
for file_path in file_list:
file_name = file_path.split('/')[-1]
df = pd.read_csv(file_path, sep='\s+')
# 替换异常值为 NaN
df.loc[df['uwind'] == 1000000, 'uwind'] = np.nan
df.loc[df['vwind'] == 1000000, 'vwind'] = np.nan
# 筛选所需列并重命名
date_part = os.path.splitext(file_name)[0][-8:]
df = df[['height', 'time', 'uwind', 'vwind']].rename(columns={
'uwind': f'{date_part}_uwind',
'vwind': f'{date_part}_vwind'
})
if final_df.empty:
final_df = df
else:
final_df = pd.merge(
final_df, df, on=['height', 'time'], how='outer')
# 补全缺失列并排序
begin_month, end_month = month_range
# calculate begin and end date
begin_date = f'{year}-{begin_month:02d}-01'
end_date = f'{year}-{end_month+1:02d}-01' if end_month < 12 else f'{year}-12-31'
all_dates = pd.date_range(
begin_date, end_date).strftime('%Y%m%d')
expected_columns = [f'{date}_uwind' for date in all_dates] + \
[f'{date}_vwind' for date in all_dates]
for col in expected_columns:
if col not in final_df.columns:
final_df[col] = np.nan
final_df = final_df[['height', 'time'] + sorted(expected_columns)]
return final_df
# ------------------------------ 拟合工具函数 ------------------------------
def fit_wind_speed(data, model_func, initial_guess, bounds, window, num_params):
"""对风速数据进行滑动窗口拟合"""
# replace "_uwind" and "_vwind" in date column
data["date"] = data["date"].str.replace(
"_uwind", "").str.replace("_vwind", "")
data["date"] = pd.to_datetime(data["date"])
data.set_index("date", inplace=True)
unique_dates = np.unique(data.index.date)
params = []
for date in unique_dates:
start_date = pd.to_datetime(date) - pd.Timedelta(days=(window - 1) / 2)
end_date = pd.to_datetime(date) + pd.Timedelta(days=(window - 1) / 2)
window_data = data[start_date:end_date]
t = np.arange(len(window_data)) + 1
y = window_data['wind_speed'].values
valid_mask = ~np.isnan(y)
t = t[valid_mask]
y = y[valid_mask]
if len(y) < num_params * 2:
params.append([None] * len(initial_guess))
continue
try:
popt, _ = curve_fit(
model_func, t, y, p0=initial_guess, bounds=bounds)
params.append(popt)
except RuntimeError:
params.append([None] * len(initial_guess))
params_df = pd.DataFrame(params, columns=[
"v0"] + [f"param_{i}" for i in range(1, len(initial_guess))], index=unique_dates)
return params_df
# ------------------------------ 热力图绘制函数 ------------------------------
def plot_heatmaps(result_dict, param, title, vmax):
"""绘制热力图"""
fig, axes = plt.subplots(1, 2, figsize=(18, 8))
fig.suptitle(title, fontsize=20)
for i, (wind_type, data) in enumerate(result_dict.items()):
heatmap_data = data[param].unstack(level=0)
heatmap_data.index = pd.to_datetime(heatmap_data.index)
heatmap_data = heatmap_data.iloc[:, ::-1]
heatmap_data.index = heatmap_data.index.strftime('%Y-%m-%d')
sns.heatmap(
heatmap_data.T,
cmap="viridis",
vmax=vmax,
cbar_kws={'label': '振幅 (m/s)'},
yticklabels=[
f'{height / 1000:.1f}' for height in heatmap_data.columns],
ax=axes[i]
)
axes[i].set_title(f'{wind_type}振幅变化', fontsize=14)
axes[i].set_xlabel('日期', fontsize=12)
axes[i].set_ylabel('高度 (km)', fontsize=12)
plt.tight_layout(rect=[0, 0, 1, 0.95])
# plt.show()
# ------------------------------ 主逻辑 ------------------------------
def final_plot_v2(year, station, model_name, month_range=(1,12)):
"""
主逻辑函数,用于处理数据并绘制选定模型的热力图。
参数:
- year: int年份
- station: str站点名称
- model_name: str模型名称'潮汐波', '2日行星波', '5日行星波', '10日行星波', '16日行星波'
"""
# 配置文件路径
file_dir = rf'./radar/data/{station}/{year}'
# output_dir = rf'./radar/tmp\{station}\{year}\时空'
# os.makedirs(output_dir, exist_ok=True)
# 数据预处理
file_list = glob.glob(file_dir + '/**/*.txt', recursive=True)
final_df = preprocess_data(file_list, year, month_range)
uwind_data = final_df[["height", "time"] +
[col for col in final_df.columns if "_uwind" in col]]
vwind_data = final_df[["height", "time"] +
[col for col in final_df.columns if "_vwind" in col]]
heights = uwind_data["height"].unique()
# 模型配置
models = {
"潮汐波": {"func": tidal_model, "params": 9, "initial": [0, 1, 0, 1, 0, 1, 0, 1, 0], "bounds": ([-np.inf, 0, -np.inf, 0, -np.inf, 0, -np.inf, 0, -np.inf], [np.inf] * 9), "window": 5},
"2日行星波": {"func": lambda t, v0, a, b: planetary_model(t, v0, a, b, 48), "params": 3, "initial": [0, 1, 0], "bounds": ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]), "window": 5},
"5日行星波": {"func": lambda t, v0, a, b: planetary_model(t, v0, a, b, 120), "params": 3, "initial": [0, 1, 0], "bounds": ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]), "window": 15},
"10日行星波": {"func": lambda t, v0, a, b: planetary_model(t, v0, a, b, 240), "params": 3, "initial": [0, 1, 0], "bounds": ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]), "window": 29},
"16日行星波": {"func": lambda t, v0, a, b: planetary_model(t, v0, a, b, 384), "params": 3, "initial": [0, 1, 0], "bounds": ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]), "window": 49}
}
# 检查模型名称是否有效
if model_name not in models:
raise ValueError(
f"无效的模型名称: '{model_name}'。可选模型为: {list(models.keys())}")
# 获取模型配置
config = models[model_name]
# 拟合纬向风和经向风
u_result_dict, v_result_dict = {}, {}
for height in heights:
# 纬向风数据
height_data_u = uwind_data[uwind_data["height"] == height].melt(
id_vars=["time", "height"], var_name="date", value_name="wind_speed")
u_result_dict[height] = fit_wind_speed(
height_data_u, config["func"], config["initial"], config["bounds"], config["window"], config["params"])
# 经向风数据
height_data_v = vwind_data[vwind_data["height"] == height].melt(
id_vars=["time", "height"], var_name="date", value_name="wind_speed")
v_result_dict[height] = fit_wind_speed(
height_data_v, config["func"], config["initial"], config["bounds"], config["window"], config["params"])
# 汇总结果
u_final_result = pd.concat(u_result_dict, names=["height", "date"])
v_final_result = pd.concat(v_result_dict, names=["height", "date"])
# 绘制热力图
plot_heatmaps(
{"纬向风": u_final_result, "经向风": v_final_result},
param="param_1", # 振幅参数
title=f'{year}{month_range[0]}-{month_range[1]}{model_name}振幅时空变化',
vmax=100,
)
buffer = BytesIO()
plt.savefig(buffer, format='png')
buffer.seek(0)
plt.close()
return buffer