import glob import os import re import pandas as pd import time import logging from CONSTANT import DATA_BASEPATH from modules.balloon.extract_wave import extract_wave, is_terrain_wave from modules.balloon.read_data import read_data filter_columns = [ "file_name", "c", "a", "b", "omega_upper", "w_f", "ver_wave_len", "hori_wave_len", "c_x", "c_y", "c_z", "Ek", "E_p", "MFu", "MFv", "u1", "v1", "T1", "zhou_qi", ] lat = 52.21 g = 9.76 combos = {} comboType = [ "探空气球", "流星雷达", "Saber", "TIDI", "COSMIC", ] comboMode = [ ["重力波单次", "重力波统计"], ["重力波月统计", "潮汐波单次", "潮汐波月统计"], ["行星波月统计", "重力波单次", "重力波月统计"], ["行星波月统计"], ["行星波月统计"], ] comboDate = [ [["年", "时间"], ["起始年", "终止年"]], [["年", "月"], ["年", "日期"], ["年", "月"]], [["起始月", "-"], ["月", "日"], ["月", "-"]], [["起始月", "-"]], [["起始月", "-"]], ] def get_dataframe_between_year(all_year_data, start_year, end_year, station): res = all_year_data filtered_res = res[(res['file_name'].str.extract(rf'{station}-(\d{{4}})')[0].astype(int) >= start_year) & (res['file_name'].str.extract(rf'{station}-(\d{{4}})')[0].astype(int) <= end_year)] return filtered_res def get_ballon_files(): try: data = glob.glob(f"{DATA_BASEPATH.balloon}/**/*.nc", recursive=True) except FileNotFoundError: return [] return data def get_ballon_path_by_year(start_year, end_year, station=None): all_ballon_files = get_ballon_files() return list(filter( lambda x: any(f"{station}-{year}" in x for year in range( start_year, end_year + 1)), all_ballon_files )) def get_ballon_full_df_by_year(start_year, end_year, station=None, ignore_cache=False): # Set up logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s: %(message)s') if (station is None): raise ValueError("Station is required") cache_dir = f"{DATA_BASEPATH.balloon}/cache/{station}-b{start_year}-e{end_year}.parquet" cache_base_dir = f"{DATA_BASEPATH.balloon}/cache/" if os.path.exists(cache_dir) and not ignore_cache: logging.debug(f"Reading cache from {cache_dir}") return pd.read_parquet(cache_dir) # check if there is any file begin with cache base dir, or say ,filename begin with station # if so, read the file and return if os.path.exists(cache_base_dir): for file in os.listdir(cache_base_dir): # should match {station}-bxxxx-exxxx.parquet, xxxx is year if re.match(f"{station}-b\d{{4}}-e\d{{4}}.parquet", file): # extract begin year and end year from filename b_year = int(file.split("-")[1][1:]) e_year = int(file.split("-")[2][1:].split(".")[0]) logging.debug( f"Found cache file {file}, b_year={b_year}, e_year={e_year}") if b_year <= start_year and e_year >= end_year: wider_df = pd.read_parquet(f"{cache_base_dir}/{file}") return get_dataframe_between_year(wider_df, start_year, end_year, station) start_time = time.time() logging.debug( f"Starting get_ballon_full_df_by_year with start_year={start_year}, end_year={end_year}") # Timing the path retrieval t0 = time.time() paths = get_ballon_path_by_year(start_year, end_year, station) # if no path found, raise if len(paths) == 0: raise ValueError( f"No balloon data found for {station} between {start_year} and {end_year}") t1 = time.time() logging.debug(f"Retrieved {len(paths)} paths in {t1 - t0:.2f} seconds") # optimization: add cache. only select need to be reprocessed # with open("./cache/ballon_lin_has_wave", "r") as f: # cache_has_waves = f.readlines() # cache_has_waves = [x.strip() for x in cache_has_waves] year_df = pd.DataFrame() for idx, file in enumerate(paths, 1): file_start_time = time.time() logging.debug(f"Processing file {idx}/{len(paths)}: {file}") # Read data data = read_data(file) read_time = time.time() logging.debug( f"Read data in {read_time - file_start_time:.2f} seconds") # Extract wave try: wave = extract_wave(data, lat, g) extract_time = time.time() logging.debug( f"Extracted wave in {extract_time - read_time:.2f} seconds") except Exception as e: logging.error(f"Error extracting wave from {file}: {e}") wave = [] extract_time = time.time() if len(wave) == 0: logging.debug(f"No wave data in {file}, skipping") continue # Determine terrain wave try: c = is_terrain_wave(data, lat, g) terrain_time = time.time() except Exception as e: logging.error(f"Error determining terrain wave from {file}: {e}") continue logging.debug( f"Determined terrain wave in {terrain_time - extract_time:.2f} seconds") # Build DataFrame line wave.insert(0, c) wave.insert(0, file) line = pd.DataFrame([wave], columns=filter_columns) concat_start_time = time.time() # Concatenate DataFrame year_df = pd.concat([year_df, line], ignore_index=True) concat_time = time.time() logging.debug( f"Concatenated DataFrame in {concat_time - concat_start_time:.2f} seconds") logging.debug( f"Total time for {file}: {concat_time - file_start_time:.2f} seconds") total_time = time.time() - start_time logging.debug( f"Completed get_ballon_full_df_by_year in {total_time:.2f} seconds") year_df['hori_wave_len'] = year_df['hori_wave_len'].apply( lambda x: float(x) if x != ' ' else None) # save to cache # if parent folder does not exist, create it if not os.path.exists(os.path.dirname(cache_dir)): os.makedirs(os.path.dirname(cache_dir)) year_df.to_parquet(cache_dir) return year_df def get_has_wave_data_by_year(start_year, end_year): df = get_ballon_full_df_by_year(start_year, end_year) return df[df["b"] == 1]