# 此代码可以绘制某年、某高度的纬向风/经向风的,潮汐波和行星波的 # 年振幅图、月振幅图、日拟合正弦波图 # 基本思想是,1-先处理某高度全年观测数据,得到final_df # 2-再选择风的类型,得到此高度全年每一天的纬向风/经向风大气长波拟合参数,函数process_wind_data_with_models # 3-下一步就是根据参数绘制图形,可选择年振幅图、月振幅图、日拟合正弦波图(#todo:上一步风的类型已确定) # plot_yearly_params、plot_month_params、plot_sine_wave_for_day import glob import pandas as pd import numpy as np import os from scipy.optimize import curve_fit import matplotlib.pyplot as plt from datetime import datetime # 解决绘图中中文不能显示的问题 import matplotlib from CONSTANT import DATA_BASEPATH # 设置中文显示和负号正常显示 matplotlib.rcParams['font.sans-serif'] = ['SimHei'] # 显示中文 matplotlib.rcParams['axes.unicode_minus'] = False # 正常显示负号 # 定义所有模型 def tidal_model(t, v0, a1, b1, a2, b2, a3, b3, a4, b4): """潮汐波模型""" T1, T2, T3, T4 = 6, 8, 12, 24 # 周期以小时为单位 return (v0 + a1 * np.sin((2 * np.pi / T1) * t + b1) + a2 * np.sin((2 * np.pi / T2) * t + b2) + a3 * np.sin((2 * np.pi / T3) * t + b3) + a4 * np.sin((2 * np.pi / T4) * t + b4)) def planetary_model_2day(t, v0, a, b): """2日行星波模型""" T = 48 # 周期为2天(48小时) return v0 + a * np.sin((2 * np.pi / T) * t + b) def planetary_model_5day(t, v0, a, b): """5日行星波模型""" T = 120 # 周期为5天(120小时) return v0 + a * np.sin((2 * np.pi / T) * t + b) def planetary_model_10day(t, v0, a, b): """10日行星波模型""" T = 240 # 周期为10天(240小时) return v0 + a * np.sin((2 * np.pi / T) * t + b) def planetary_model_16day(t, v0, a, b): """16日行星波模型""" T = 384 # 周期为16天(384小时) return v0 + a * np.sin((2 * np.pi / T) * t + b) # 统一管理模型及其参数 MODELS = { '潮汐波': { 'function': tidal_model, 'param_names': ['v0', 'a1', 'b1', 'a2', 'b2', 'a3', 'b3', 'a4', 'b4'], 'initial_guess': [0, 1, 0, 1, 0, 1, 0, 1, 0], 'bounds': ([-np.inf, 0, -np.inf, 0, -np.inf, 0, -np.inf, 0, -np.inf], [np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.inf]), 'window': 5 # 窗口为5天 }, '2日行星波': { 'function': planetary_model_2day, 'param_names': ['v0', 'a', 'b'], 'initial_guess': [0, 1, 0], 'bounds': ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]), 'window': 5 # 窗口为5天 }, '5日行星波': { 'function': planetary_model_5day, 'param_names': ['v0', 'a', 'b'], 'initial_guess': [0, 1, 0], 'bounds': ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]), 'window': 15 # 窗口为15天 }, '10日行星波': { 'function': planetary_model_10day, 'param_names': ['v0', 'a', 'b'], 'initial_guess': [0, 1, 0], 'bounds': ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]), 'window': 29 # 窗口为29天 }, '16日行星波': { 'function': planetary_model_16day, 'param_names': ['v0', 'a', 'b'], 'initial_guess': [0, 1, 0], 'bounds': ([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]), 'window': 49 # 窗口为49天 }, } ################################ 选定风的类型,得到整年的每一天拟合参数 def process_wind_data_with_models(final_df, wind_type, year, H, selected_models=None): """ 处理风速数据并针对多种模型进行拟合和绘图。 :param selected_models: 可选,指定使用的模型类型列表。如果为 None,则处理所有模型。 :return: 整年所有模型的拟合参数字典 """ wind_data = final_df[[col for col in final_df.columns if wind_type in col]] wind_data.columns = [col[:8] for col in wind_data.columns] result = wind_data.to_numpy().T.flatten() column_names = np.repeat(wind_data.columns.to_numpy(), 24) combined = pd.DataFrame(np.vstack((column_names, result))) dates = combined.iloc[0].values wind_speed = combined.iloc[1].values date_series = pd.to_datetime(dates) data = pd.DataFrame({'date': date_series, 'wind_speed': wind_speed}) data.set_index('date', inplace=True) unique_dates = np.unique(data.index.date) # 如果指定了模型,则只处理指定的模型 if selected_models is None: models_to_process = MODELS.keys() # 默认处理所有模型 else: models_to_process = selected_models # 使用指定的模型列表 all_params = {} # 存储所有模型的参数数据框 for model_name in models_to_process: if model_name not in MODELS: print(f"Model {model_name} is not valid.") continue model_info = MODELS[model_name] params = [] model_func = model_info['function'] param_names = model_info['param_names'] initial_guess = model_info['initial_guess'] bounds = model_info['bounds'] window_days = model_info['window'] # 获取窗口长度 for date in unique_dates: # 根据模型窗口调整时间范围 start_date = pd.to_datetime(date) - pd.Timedelta(days=(window_days - 1) / 2) end_date = pd.to_datetime(date) + pd.Timedelta(days=(window_days - 1) / 2) window_data = data[start_date:end_date] t = np.arange(len(window_data)) + 1 y = pd.to_numeric(window_data['wind_speed'], errors='coerce') valid_mask = ~np.isnan(y) t = t[valid_mask] y = y[valid_mask] if len(y) < len(initial_guess) * 2: # todo:认为如果数据点个数少于参数的2倍,则认为数据不够拟合,之前是6倍,在这里放宽了 print(f"Not enough valid data after filtering for date: {date}") params.append([None] * len(param_names)) continue try: popt, _ = curve_fit(model_func, t, y, p0=initial_guess, bounds=bounds) params.append(popt) except RuntimeError: print(f"Fitting failed for date: {date}") params.append([None] * len(param_names)) # 保存整年参数 params_df = pd.DataFrame(params, columns=param_names, index=unique_dates) # file_name = f'{year}年_{wind_type}_{H // 1000}km_{model_name}_强度.txt' # params_df.to_csv(os.path.join(output_dir, file_name), sep='\t', index=True, header=True) all_params[model_name] = params_df # 将每个模型的参数数据框存入字典 return all_params # 返回包含所有模型参数的数据字典 # 外部绘制全年图形 def plot_yearly_params(params_dict, model_name,year, wind_type, H): """ 绘制全年的参数图形。 :param params_dict: 存储所有模型的参数数据框字典 :param model_name: 模型名称 """ params_df = params_dict[model_name] # 绘图 if 'a1' in params_df.columns: # 处理潮汐波多分量 plt.figure(figsize=(12, 6)) for i, T in enumerate([6, 8, 12, 24], start=1): plt.plot(params_df[f'a{i}'], label=f'{T}h', linewidth=2) else: # 处理行星波单分量 plt.figure(figsize=(12, 6)) plt.plot(params_df['a'], label=f'{model_name}', color='orange', linewidth=2) plt.title(f'{year}年{wind_type}_{H // 1000}km_{model_name}_强度', fontsize=16) plt.xlabel('日期', fontsize=14) plt.ylabel('幅度 (m/s)', fontsize=14) plt.xticks(rotation=45) plt.legend() plt.tick_params(axis='both', labelsize=12) # 调整刻度字体大小 plt.tight_layout() # 绘制月振幅图 def plot_month_params(params_dict, model_name, wind_type, year, month, H): """ 绘制指定月份的参数图形。 :param params_dict: 存储所有模型的参数数据框字典 :param model_name: 模型名称 :param wind_type: 风速类型 :param year: 需要绘制的年份 :param month: 需要绘制的月份 :param output_dir: 输出目录 :param H: 水平尺度 """ # 获取该年份的数据框 params_df = params_dict[model_name] # 确保索引是 datetime 类型 if not pd.api.types.is_datetime64_any_dtype(params_df.index): # print("Index is not datetime. Converting to datetime...") params_df.index = pd.to_datetime(params_df.index) # 筛选出指定年份和月份的数据 params_df_month = params_df[(params_df.index.year == year) & (params_df.index.month == month)] # 检查筛选后的数据是否为空 if params_df_month.empty: print(f"No data available for {year}-{month}.") return # 绘图 plt.figure(figsize=(12, 6)) # 判断是潮汐波还是行星波 if model_name == '潮汐波': # 处理潮汐波多分量 for i, T in enumerate([6, 8, 12, 24], start=1): col_name = f'a{i}' if col_name in params_df_month.columns: plt.plot(params_df_month[col_name], label=f'{T}h', linewidth=2) else: print(f"Warning: Column '{col_name}' not found in the dataframe.") else: # 处理行星波单分量 if 'a' in params_df_month.columns: plt.plot(params_df_month['a'], label=f'{model_name}', color='orange', linewidth=2) else: print(f"Warning: Column 'a' not found in the dataframe.") # 设置标题和标签 plt.title(f'{year}年{month}月{wind_type}_{H // 1000}km_{model_name}_强度', fontsize=16) plt.xlabel('日期', fontsize=14) plt.ylabel('幅度 (m/s)', fontsize=14) plt.xticks(rotation=45) plt.legend() plt.tick_params(axis='both', labelsize=12) # 调整刻度字体大小 plt.tight_layout() # 绘制日拟合正弦波图 def plot_sine_wave_for_day(params_df, date, wind_type, H, model_name): """ 根据指定日期的参数,绘制标准的正弦曲线图。 :param params_df: 存储所有模型的参数数据框 :param date: 指定日期(例如 '2024-03-17') :param wind_type: 风速类型(例如 'uwind') :param H: 水平尺度 :param model_name: 模型名称(例如 '5日行星波', '10日行星波', '潮汐波') """ # 将字符串日期转换为 datetime 格式 try: date = datetime.strptime(date, '%Y-%m-%d') except ValueError: print(f"Error: Invalid date format '{date}', expected 'YYYY-MM-DD'.") return # 检查 params_df.index 是否为 datetime 类型,若不是,则将其转换 if not isinstance(params_df.index, pd.DatetimeIndex): print("Warning: params_df index is not of type datetime. Converting index to datetime...") params_df.index = pd.to_datetime(params_df.index) # 获取指定日期的参数 if date not in params_df.index: print(f"Warning: {date.strftime('%Y-%m-%d')} not found in the parameter dataframe.") return params = params_df.loc[date] # 检查参数是否为 NaN 或 Inf if params.isnull().any() or np.isinf(params).any(): print(f"Warning: Parameters for {date.strftime('%Y-%m-%d')} contain NaN or Inf values. Skipping the plot.") return # 判断是否是潮汐波模型 if 'a1' in params: # 如果是潮汐波模型 T1, T2, T3, T4 = 6, 8, 12, 24 # 潮汐波的周期 # 生成时间数据 t = np.linspace(0, 48, 1000) # 生成0到48小时的1000个点 # 从参数中提取值 v0 = params['v0'] a1 = params['a1'] b1 = params['b1'] a2 = params['a2'] b2 = params['b2'] a3 = params['a3'] b3 = params['b3'] a4 = params['a4'] b4 = params['b4'] # 计算正弦波的值 y1 = v0 * np.ones_like(t) # 常量项 y2 = a1 * np.sin((2 * np.pi / T1) * t + b1) y3 = a2 * np.sin((2 * np.pi / T2) * t + b2) y4 = a3 * np.sin((2 * np.pi / T3) * t + b3) y5 = a4 * np.sin((2 * np.pi / T4) * t + b4) # 绘制图形 plt.figure(figsize=(12, 8)) plt.plot(t, y1, label='常量', color='purple') plt.plot(t, y2, label='6h', color='blue') plt.plot(t, y3, label='8h', color='orange') plt.plot(t, y4, label='12h', color='green') plt.plot(t, y5, label='24h', color='red') # 添加图例和标签 plt.title(f'{date.strftime("%Y-%m-%d")} {wind_type}风速拟合潮汐波', fontsize=16) plt.xlabel('时间 (小时)', fontsize=14) plt.ylabel('幅度 (m/s)', fontsize=14) plt.legend() plt.grid(True) plt.xlim(0, 48) # 0到48小时 plt.ylim(min(y1.min(), y2.min(), y3.min(), y4.min(), y5.min()-1), max(y1.max(), y2.max(), y3.max(), y4.max(), y5.max())+1) plt.tight_layout() else: # 处理行星波模型 # 根据传入的模型名称选择周期 if model_name == '2日行星波': T = 48 # 2日行星波的周期为48小时 elif model_name == '5日行星波': T = 120 # 5日行星波的周期为120小时 elif model_name == '10日行星波': T = 240 # 10日行星波的周期为240小时 elif model_name == '16日行星波': T = 384 # 16日行星波的周期为384小时 else: print(f"Error: Unsupported planetary wave model '{model_name}'.") return # 生成时间数据 t = np.linspace(0, T*2, 1000) # 根据周期生成对应的时间范围 # 从参数中提取值 v0 = params['v0'] a = params['a'] b = params['b'] # 计算正弦波的值 y1 = v0 * np.ones_like(t) # 常量项 y2 = a * np.sin((2 * np.pi / T) * t + b) # 绘制图形 plt.figure(figsize=(12, 8)) plt.plot(t, y1, label='常量', color='purple') plt.plot(t, y2, label=f'{model_name} ({T}小时)', color='orange') # 添加图例和标签 plt.title(f'{date.strftime("%Y-%m-%d")} {wind_type}风速拟合 {model_name}', fontsize=16) plt.xlabel('时间 (小时)', fontsize=14) plt.ylabel('幅度 (m/s)', fontsize=14) plt.legend() plt.grid(True) plt.xlim(0, T*2) # 根据周期调整时间范围 plt.ylim(min(y1.min(), y2.min()-1), max(y1.max(), y2.max())+1) plt.tight_layout() def get_final_df(H, year, station): # 下边处理,绘图 # 参数设置,选择高度、时间、站点 # # station = '黑龙江漠河站' # 文件路径设置 file_dir = rf'{DATA_BASEPATH.radar}/{station}/{year}' # 数据文件路径 # output_dir = rf'./out\{station}\{year}' # 参数txt、图片输出路径 # os.makedirs(output_dir, exist_ok=True) # 确保输出路径存在 # 1------------------------提取整个文件夹的数据 # 获取所有 txt 文件 file_list = [f for f in os.listdir(file_dir) if f.endswith('.txt')] # 初始化空的 DataFrame,用于合并所有文件的数据 final_df = pd.DataFrame() # 批量处理每个 txt 文件 for file_name in file_list: file_path = os.path.join(file_dir, file_name) # 拼接文件路径 df = pd.read_csv(file_path, delim_whitespace=True) # 读取文件 date_part = os.path.splitext(file_name)[0][-8:] # 提取日期部分 # 处理缺失值,将 1000000 替换为 NaN df.loc[df['uwind'] == 1000000, 'uwind'] = np.nan df.loc[df['vwind'] == 1000000, 'vwind'] = np.nan # 筛选出 height 列为 90km 的行 filtered_df = df[df['height'] == H] # 重命名列,包含日期信息 filtered_df = filtered_df.rename(columns={ 'uwind': f'{date_part}_uwind', 'vwind': f'{date_part}_vwind' }) # 只保留需要的列 filtered_df = filtered_df[['height', 'time', f'{date_part}_uwind', f'{date_part}_vwind']] # 如果 final_df 为空,则直接赋值 if final_df.empty: final_df = filtered_df else: # 按 'height' 和 'time' 列对齐合并 final_df = pd.merge(final_df, filtered_df, on=['height', 'time'], how='outer') # 生成完整日期范围的列名,补全缺失的天,使得平年365,闰年366 all_dates = pd.date_range(f'{year}-01-01', f'{year}-12-31').strftime('%Y%m%d') expected_columns = [f'{date}_uwind' for date in all_dates] + [f'{date}_vwind' for date in all_dates] # 确保 final_df 包含所有日期的列,缺失列补为 NaN for col in expected_columns: if col not in final_df.columns: final_df[col] = np.nan # 按列名排序(确保日期顺序) final_df = final_df[['height', 'time'] + sorted(expected_columns)] return final_df # 此时,final_df已经是90km包含完整天的观测数据--------------------------------------------------------- # 2---------------------------整年纬向风/经向风拟合参数 # params_dict = process_wind_data_with_models(final_df,'uwind', output_dir, year, H,selected_models=['潮汐波','2日行星波']) # params_dict = process_wind_data_with_models(final_df, # 'uwind', output_dir, year, H, # selected_models=None) class final_render_v2(): def __init__(self, H, year, station, wind_type): self.H , self.year, self.station, self.wind_type = H, year, station, wind_type final_df = get_final_df(H, year, station) self.params_dict = process_wind_data_with_models(final_df, wind_type, year, H, selected_models=None) def render_day(self, date, model_name): plot_sine_wave_for_day(self.params_dict[model_name], date, self.wind_type, self.H, model_name) def render_month(self, month, model_name): plot_month_params(self.params_dict, model_name, self.wind_type, self.year, month, self.H) def render_year(self, model_name): plot_yearly_params(self.params_dict, model_name, self.year, self.wind_type, self.H) @staticmethod def get_all_models(): return list(MODELS.keys()) @staticmethod def get_all_pathes(): # result = glob.glob(f"{DATA_BASEPATH.radar}/**/*.txt", recursive=True) # normalize path result = [os.path.normpath(path).replace("\\", "/") for path in result] return result # todo:此时得到的参数,年份、高度、风的类型已经固定 # 3---------------------------绘图 # 使用示例:绘制全年图形 if __name__ == '__main__': # H = 94000 # 高度为 90km # year = 2022 import matplotlib.font_manager as fm fm.fontManager.addfont("./SimHei.ttf") # station = '武汉左岭镇站' renderer = final_render_v2(94000, 2022, '武汉左岭镇站', 'uwind') renderer.render_year('潮汐波') plt.show() renderer.render_month(3, '潮汐波') plt.show() renderer.render_day('2022-03-17', '潮汐波') plt.show()