from io import BytesIO import os import pandas as pd import numpy as np import matplotlib.pyplot as plt from scipy.optimize import curve_fit import seaborn as sns # 解决绘图中中文不能显示的问题 import matplotlib matplotlib.rcParams['font.sans-serif'] = ['SimHei'] matplotlib.rcParams['axes.unicode_minus'] = False # ------------------------------ 模型定义 ------------------------------ def tidal_model(t, v0, a1, b1, a2, b2, a3, b3, a4, b4): """潮汐波模型""" T1, T2, T3, T4 = 6, 8, 12, 24 # 周期以小时为单位 return (v0 + a1 * np.sin((2 * np.pi / T1) * t + b1) + a2 * np.sin((2 * np.pi / T2) * t + b2) + a3 * np.sin((2 * np.pi / T3) * t + b3) + a4 * np.sin((2 * np.pi / T4) * t + b4)) def planetary_model(t, v0, a, b, period): """行星波模型""" return v0 + a * np.sin((2 * np.pi / period) * t + b) # ------------------------------ 数据处理工具函数 ------------------------------ def preprocess_data(file_dir, year): """读取数据文件并合并成完整的 DataFrame""" file_list = [f for f in os.listdir(file_dir) if f.endswith('.txt')] final_df = pd.DataFrame() for file_name in file_list: file_path = os.path.join(file_dir, file_name) df = pd.read_csv(file_path, sep='\s+') # 替换异常值为 NaN df.loc[df['uwind'] == 1000000, 'uwind'] = np.nan df.loc[df['vwind'] == 1000000, 'vwind'] = np.nan # 筛选所需列并重命名 date_part = os.path.splitext(file_name)[0][-8:] df = df[['height', 'time', 'uwind', 'vwind']].rename(columns={ 'uwind': f'{date_part}_uwind', 'vwind': f'{date_part}_vwind' }) if final_df.empty: final_df = df else: final_df = pd.merge( final_df, df, on=['height', 'time'], how='outer') # 补全缺失列并排序 all_dates = pd.date_range( f'{year}-01-01', f'{year}-12-31').strftime('%Y%m%d') expected_columns = [f'{date}_uwind' for date in all_dates] + \ [f'{date}_vwind' for date in all_dates] for col in expected_columns: if col not in final_df.columns: final_df[col] = np.nan final_df = final_df[['height', 'time'] + sorted(expected_columns)] return final_df # ------------------------------ 拟合工具函数 ------------------------------ def fit_wind_speed(data, model_func, initial_guess, bounds, window, num_params): """对风速数据进行滑动窗口拟合""" # replace "_uwind" and "_vwind" in date column data["date"] = data["date"].str.replace( "_uwind", "").str.replace("_vwind", "") data["date"] = pd.to_datetime(data["date"]) data.set_index("date", inplace=True) unique_dates = np.unique(data.index.date) params = [] for date in unique_dates: start_date = pd.to_datetime(date) - pd.Timedelta(days=(window - 1) / 2) end_date = pd.to_datetime(date) + pd.Timedelta(days=(window - 1) / 2) window_data = data[start_date:end_date] t = np.arange(len(window_data)) + 1 y = window_data['wind_speed'].values valid_mask = ~np.isnan(y) t = t[valid_mask] y = y[valid_mask] if len(y) < num_params * 6: params.append([None] * len(initial_guess)) continue try: popt, _ = curve_fit( model_func, t, y, p0=initial_guess, bounds=bounds) params.append(popt) except RuntimeError: params.append([None] * len(initial_guess)) params_df = pd.DataFrame(params, columns=[ "v0"] + [f"param_{i}" for i in range(1, len(initial_guess))], index=unique_dates) return params_df # ------------------------------ 热力图绘制函数 ------------------------------ def plot_heatmaps(result_dict, param, title, vmax): """绘制热力图""" fig, axes = plt.subplots(1, 2, figsize=(18, 8)) fig.suptitle(title, fontsize=20) for i, (wind_type, data) in enumerate(result_dict.items()): heatmap_data = data[param].unstack(level=0) heatmap_data.index = pd.to_datetime(heatmap_data.index) heatmap_data = heatmap_data.iloc[:, ::-1] sns.heatmap( heatmap_data.T, cmap="viridis", vmax=vmax, cbar_kws={'label': '振幅 (m/s)'}, yticklabels=[ f'{height / 1000:.1f}' for height in heatmap_data.columns], ax=axes[i] ) axes[i].set_title(f'{wind_type}振幅变化', fontsize=14) axes[i].set_xlabel('日期', fontsize=12) axes[i].set_ylabel('高度 (km)', fontsize=12) plt.tight_layout(rect=[0, 0, 1, 0.95]) # plt.show() # ------------------------------ 主逻辑 ------------------------------ def final_plot_v2(year, station, model_name): """ 主逻辑函数,用于处理数据并绘制选定模型的热力图。 参数: - year: int,年份 - station: str,站点名称 - model_name: str,模型名称(如 '潮汐波', '2日行星波', '5日行星波', '10日行星波', '16日行星波') """ # 配置文件路径 file_dir = rf'./radar/data\{station}\{year}' output_dir = rf'./radar/tmp\{station}\{year}\时空' os.makedirs(output_dir, exist_ok=True) # 数据预处理 final_df = preprocess_data(file_dir, year) uwind_data = final_df[["height", "time"] + [col for col in final_df.columns if "_uwind" in col]] vwind_data = final_df[["height", "time"] + [col for col in final_df.columns if "_vwind" in col]] heights = uwind_data["height"].unique() # 模型配置 models = { "潮汐波": {"func": tidal_model, "params": 9, "initial": [0, 1, 0, 1, 0, 1, 0, 1, 0], "bounds": ([-np.inf, 0, -np.inf, 0, -np.inf, 0, -np.inf, 0, -np.inf], [np.inf] * 9), "window": 5}, "2日行星波": {"func": lambda t, v0, a, b: planetary_model(t, v0, a, b, 48), "params": 3, "initial": [0, 1, 0], "bounds": ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]), "window": 5}, "5日行星波": {"func": lambda t, v0, a, b: planetary_model(t, v0, a, b, 120), "params": 3, "initial": [0, 1, 0], "bounds": ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]), "window": 15}, "10日行星波": {"func": lambda t, v0, a, b: planetary_model(t, v0, a, b, 240), "params": 3, "initial": [0, 1, 0], "bounds": ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]), "window": 29}, "16日行星波": {"func": lambda t, v0, a, b: planetary_model(t, v0, a, b, 384), "params": 3, "initial": [0, 1, 0], "bounds": ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]), "window": 49} } # 检查模型名称是否有效 if model_name not in models: raise ValueError( f"无效的模型名称: '{model_name}'。可选模型为: {list(models.keys())}") # 获取模型配置 config = models[model_name] # 拟合纬向风和经向风 u_result_dict, v_result_dict = {}, {} for height in heights: # 纬向风数据 height_data_u = uwind_data[uwind_data["height"] == height].melt( id_vars=["time", "height"], var_name="date", value_name="wind_speed") u_result_dict[height] = fit_wind_speed( height_data_u, config["func"], config["initial"], config["bounds"], config["window"], config["params"]) # 经向风数据 height_data_v = vwind_data[vwind_data["height"] == height].melt( id_vars=["time", "height"], var_name="date", value_name="wind_speed") v_result_dict[height] = fit_wind_speed( height_data_v, config["func"], config["initial"], config["bounds"], config["window"], config["params"]) # 汇总结果 u_final_result = pd.concat(u_result_dict, names=["height", "date"]) v_final_result = pd.concat(v_result_dict, names=["height", "date"]) # 绘制热力图 plot_heatmaps( {"纬向风": u_final_result, "经向风": v_final_result}, param="param_1", # 振幅参数 title=f'{year}年{model_name}振幅时空变化', vmax=100, ) buffer = BytesIO() plt.savefig(buffer, format='png') buffer.seek(0) plt.close() return buffer