zephyr-backend/modules/cosmic/planetw_daily_process.py

201 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 高度和target_latitude可以自己选择纬度可以选+-30+-60高度可以每10km
import logging
import netCDF4 as nc
import pandas as pd
import numpy as np
import os
from scipy.interpolate import interp1d
import matplotlib.pyplot as plt
from CONSTANT import DATA_BASEPATH
def _process_per_folder(base_folder_path, i, target_h):
# 根据i的值调整文件夹名称
if i < 10:
folder_name = f"atmPrf_repro2021_2008_00{i}" # 一位数前面加两个0
elif i < 100:
folder_name = f"atmPrf_repro2021_2008_0{i}" # 两位数前面加一个0
else:
folder_name = f"atmPrf_repro2021_2008_{i}" # 三位数不加0
# 构建当前文件夹的路径
folder_path = os.path.join(base_folder_path, folder_name)
# 检查文件夹是否存在
if not os.path.exists(folder_path):
print(f"文件夹 {folder_path} 不存在。")
return
# 遍历文件夹中的文件
result_folder_level = []
folder_level_cache_path = f"{folder_path}/planet_{target_h}.parquet"
if os.path.exists(folder_level_cache_path):
cache_content = pd.read_parquet(folder_level_cache_path)
# logging.info(f"读取缓存文件 {folder_level_cache_path}")
# dfs.append(cache_content)
return [cache_content]
logging.info(f"处理文件夹 {folder_path}")
for file_name in os.listdir(folder_path):
if not file_name.endswith('.0390_nc'):
continue
finfo = os.path.join(folder_path, file_name)
try:
dataset = nc.Dataset(finfo, 'r')
# 提取变量数据
temp = dataset.variables['Temp'][:]
altitude = dataset.variables['MSL_alt'][:]
lat = dataset.variables['Lat'][:]
lon = dataset.variables['Lon'][:]
# 读取month, hour, minute
month = dataset.month
hour = dataset.hour
minute = dataset.minute
# 将i的值作为day的值
day = i
# 将day, hour, minute拼接成一个字符串
datetime_str = f"{day:03d}{hour:02d}{minute:02d}"
# 确保所有变量都是一维数组
temp = np.squeeze(temp)
altitude = np.squeeze(altitude)
lat = np.squeeze(lat)
lon = np.squeeze(lon)
# 检查所有数组的长度是否相同
assert len(temp) == len(altitude) == len(lat) == len(
lon), "Arrays must be the same length"
# 创建DataFrame并将datetime_str作为第一列添加
df = pd.DataFrame({
'Datetime_str': datetime_str,
'Longitude': lon,
'Latitude': lat,
'Altitude': altitude,
'Temperature': temp
})
dataset.close()
# 仅筛选高度
df_filtered = df[(df['Altitude'] >= target_h - 0.5)
& (df['Altitude'] <= target_h + 0.5)]
result_folder_level.append(df_filtered)
except Exception as e:
print(f"处理文件 {finfo} 时出错: {e}")
# save to parquet
# merge them to a big dataframe, save to parquet
result_folder_level = pd.concat(
result_folder_level, axis=0, ignore_index=True)
logging.info(f"保存缓存文件 {folder_level_cache_path}")
logging.info(f"result_folder_length: {result_folder_level.__len__()}")
result_folder_level.to_parquet(folder_level_cache_path)
return [result_folder_level]
def cosmic_planet_daily_process(
year=2008,
target_latitude=30,
target_h=40
):
# 用于存储所有DataFrame的列表
dfs = []
# 设置文件夹的路径模板
base_folder_path = f"{DATA_BASEPATH.cosmic}/{year}"
cache_path = f"{base_folder_path}/planet_{target_h}_{target_latitude}.parquet"
if os.path.exists(cache_path):
return pd.read_parquet(cache_path)
# 遍历文件夹序号1到365
for i in range(1, 365):
# 根据i的值调整文件夹名称
if i < 10:
folder_name = f"atmPrf_repro2021_2008_00{i}" # 一位数前面加两个0
elif i < 100:
folder_name = f"atmPrf_repro2021_2008_0{i}" # 两位数前面加一个0
else:
folder_name = f"atmPrf_repro2021_2008_{i}" # 三位数不加0
# 构建当前文件夹的路径
folder_path = os.path.join(base_folder_path, folder_name)
# 检查文件夹是否存在
if not os.path.exists(folder_path):
print(f"文件夹 {folder_path} 不存在。")
continue
# 遍历文件夹中的文件
result_folder_level = _process_per_folder(
base_folder_path, i, target_h)
if result_folder_level is not None:
dfs.extend(result_folder_level)
# 按行拼接所有DataFrame
final_df = pd.concat(dfs, axis=0, ignore_index=True)
# 假设 final_df 是你的大 DataFrame
# 计算差值找到最接近的索引来筛选纬度为30的数据
# 目标值
# 计算 'Latitude' 列与目标值的差异
final_df['Latitude_diff'] = np.abs(final_df['Latitude'] - target_latitude)
# 按天分组并找到每一天最接近的行
def find_closest_row(group):
return group.loc[group['Latitude_diff'].idxmin()]
logging.info(f"final_df: {final_df.head()}")
# 使用groupby按Day分组并找到每个分组的最接近的记录
closest_per_day = final_df.groupby('Datetime_str', as_index=False).apply(
find_closest_row, include_groups=False)
# 删除不需要的列
closest_per_day = closest_per_day.drop(columns=['Latitude_diff'])
closest_per_day = closest_per_day[(closest_per_day['Latitude'] >= (
target_latitude - 2)) & (closest_per_day['Latitude'] <= (target_latitude + 2))]
# 将经度转换为弧度制单位
closest_per_day['Longitude_rad'] = np.radians(closest_per_day['Longitude'])
print(closest_per_day)
# 函数:将 Datetime_str 转换为以天为单位的时间
def convert_to_days(datetime_str):
# 获取日期、小时和分钟
try:
dd = int(datetime_str[:3]) # 日期部分 (DDD)
hh = int(datetime_str[3:5]) # 小时部分 (HH)
mm = int(datetime_str[5:]) # 分钟部分 (MM)
# 计算总分钟数
total_minutes = dd * 1440 + hh * 60 + mm
# 转换为天数
return total_minutes / 1440
except Exception as e:
logging.error(f"转换失败: {datetime_str}")
# 应用函数转换
try:
closest_per_day['Time'] = closest_per_day['Datetime_str'].apply(
convert_to_days)
except Exception as e:
print(e)
# 时间t
# day_data = closest_per_day['Time']
# 计算背景温度时间Day和弧度经度的函数
background_temperature = closest_per_day['Temperature'].mean()
# 计算温度扰动
closest_per_day['Temperature_perturbation'] = closest_per_day['Temperature'] - \
background_temperature
# 选择需要的列并重命名
selected_columns = closest_per_day[[
'Longitude_rad', 'Time', 'Temperature']]
selected_columns.columns = ['Longitude_Radians', 'Time', 'Temperature']
# 输出到 txt 文件
# save to parquet
selected_columns.to_parquet(cache_path)
return selected_columns
# output_file_path = 'cosmic.txt' # 你可以修改这个路径来指定文件的保存位置
# selected_columns.to_csv(output_file_path, sep='\t', index=False)
# # 输出成功提示
# print("输出成功,文件已保存为 'cosmic.txt'")