I was trying to load a netcdf file and convert it into several parquet files as chunk for further processing. Each file contains data for a unique lat, lon pair for a given time period.
So I started with reading the data. I have already precomputed the unique lat, lon pairs:
# Load unique lat/lon pairs
unique_data = np.load("/data/swadhin/processed_imager/filtered/meteosat/interp/unique_lat_lon.npz")
unique_lat = unique_data["latitude"]
unique_lon = unique_data["longitude"]
lat_lon_pairs = list(zip(unique_lat, unique_lon))
# Round lat/lon for consistency with file names
lat_lon_pairs = [(round(lat, 6), round(lon, 6)) for lat, lon in zip(unique_lat, unique_lon)]
The netcdf and parquet files:
# Paths
nc_dir = "/data/swadhin/processed_imager/filtered/meteosat/interp/atm_variables/temporal/t_test"
parquet_dir = "/data/swadhin/processed_imager/filtered/meteosat/interp/atm_variables/temporal/parquet"
# Load all existing Parquet filenames into a set **once**
existing_files = set(os.listdir(parquet_dir)
Then I read the nc file and process it in parallel with, each process working for a set of lat, lon:
def get_optimal_workers():
"""Determine an optimal number of workers based on CPU and memory availability."""
cpu_count = psutil.cpu_count(logical=False) # Physical cores
available_mem = psutil.virtual_memory().available / (1024 ** 3) # GB
max_workers = min(cpu_count, int(available_mem / 10)) # Adjust per workload
return max(30, max_workers) # Ensure at least 10 workers
def save_to_parquet(df, file_path):
"""Handles appending to a Parquet file correctly."""
table = pa.Table.from_pandas(df)
if os.path.exists(file_path):
# Read existing data and append
existing_table = pq.read_table(file_path)
combined_table = pa.concat_tables([existing_table, table])
pq.write_table(combined_table, file_path, compression="snappy")
else:
pq.write_table(table, file_path, compression="snappy")
def process_lat_lon(lat, lon, ds):
"""Extracts data for a single (lat, lon) pair and appends to Parquet."""
# Find the exact index in the 2D latitude/longitude grid
indices = np.where((ds.latitude == lat) & (ds.longitude == lon))
if len(indices[0]) == 0:
return # Skip if not found (shouldn't happen)
lat_idx, lon_idx = indices[0][0], indices[1][0]
# Extract time, levels, and temperature values
time_vals = ds.time.values
temp_vals = ds.t[:, :, lat_idx, lon_idx].values # Shape: (time, levels)
# Skip if all values are NaN
if np.isnan(temp_vals).all():
return
# Prepare DataFrame
df = pd.DataFrame({
"latitude": lat,
"longitude": lon,
"time": time_vals,
"variable": temp_vals.tolist() # Store level values as a list
})
parquet_filename = f"{lat:.6f}_{lon:.6f}.parquet"
if parquet_filename in existing_files:
print(f"File {parquet_filename} exists. Skipping...")
else:
print(f"File {parquet_filename} does not exist. Creating new file...")
save_to_parquet(df, os.path.join(parquet_dir, parquet_filename))
def process_lat_lon_wrapper(lat_lon, nc_file):
"""Each worker process opens the dataset separately."""
file_path = os.path.join(nc_dir, nc_file)
ds = xr.open_dataset(file_path).chunk({"time": -1}) # Load dataset then rechunk
lat, lon = lat_lon
return process_lat_lon(lat, lon, ds) # Pass dataset explicitly
def process_nc_file(nc_file):
file_path = os.path.join(nc_dir, nc_file)
print(f"Processing {nc_file}...")
# Get list of unique lat/lon pairs
lat_lon_pairs = list(zip(unique_lat, unique_lon))
# Parallel processing using ProcessPoolExecutor
with ProcessPoolExecutor(max_workers= max(40, get_optimal_workers())) as executor:
list(tqdm(executor.map(process_lat_lon_wrapper, lat_lon_pairs, itertools.repeat(nc_file)),
total=len(lat_lon_pairs),
desc=f"Processing {nc_file}"))
print(f"✅ {nc_file} processed.")
del ds # Close the dataset
gc.collect()
# Process each NetCDF file
for nc_file in os.listdir(nc_dir):
if nc_file.endswith(".nc"):
process_nc_file(nc_file)
But this code works very slowly, almost taking 7-8 hours for a single netcdf file. Is there any way it can be made faster, or am I doing it inefficiently?