I am trying to simulate the creation of multiple h5 files in parallel while trying to read out the results live via a dynamically updated H5 VDS in swmr mode. All h5 files are processing different chunks of the same dataset and the data is being written to each in parallel. The event that I want to watch for is the file size increasing to know to update the VDS. I tried to implement something similar to what is shown here but it doesn't appear that the size my virtual dataset is increasing and the plot doesn't update.
The major issue seems to stem from the virtual source is not updating the virtual dataset. I am seeing the watchdog triggering the update_source when the individual part files change size. My next guess of what to try is that all files must remain open for all of the data writing and that the VDS manager must create the part files and keep them open in order to have the virtual source update.
*Note that in the end I will track the data through events so that I can properly reconstruct the data in the end. For now I just want to see the VDS update properly from the changing part files.
Here is one version of the code I have tried:
import numpy as np
import h5py
import multiprocessing as mp
import time
import matplotlib.pyplot as plt
import math
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class VDSManager:
def __init__(self, num_files, vds_filename="vds.h5", directory="."):
self.num_files = num_files
self.vds_filename = vds_filename
self.directory = directory
self.UNLIM = h5py.h5s.UNLIMITED
self.vds_file = h5py.File(vds_filename, "w", libver="latest")
self.vds_file.swmr_mode = True
self.create_layout()
self.create_vds()
self.observer = Observer()
self.handler = self.VDSHandler(self)
self.observer.schedule(self.handler, self.directory, recursive=False)
self.observer.start()
print("VDS Manager started and VDS created.")
def create_layout(self):
fname = "output_1.h5"
if os.path.exists(fname):
with h5py.File(fname, "r", swmr=True) as f:
size = f["data"].shape[0]
else:
size = 0
self.virtual_layout = h5py.VirtualLayout((self.num_files, size),
maxshape=(self.num_files, None),
dtype='float64')
def create_vds(self):
for i in range(self.num_files):
fname = f"output_{i}.h5"
if os.path.exists(fname):
with h5py.File(fname, "r", swmr=True) as f:
size = f["data"].shape[0]
else:
size = 0
print(size)
vsource = h5py.VirtualSource(fname, "data", shape=(size,), maxshape=(None,))
self.virtual_layout[i, :self.UNLIM] = vsource[:self.UNLIM]
self.vds_file.create_virtual_dataset("vdata", self.virtual_layout, fillvalue=np.nan)
def update_source(self, file_index):
vs = self.vds_file['vdata'].virtual_sources()
vs[file_index].refresh()
vs.flush()
print(self.vds_file['vdata'][:].shape)
def create_source(self, file_index):
print("File created but I don't think I actually need to do anything!")
class VDSHandler(FileSystemEventHandler):
def __init__(self, manager):
super().__init__()
self.manager = manager
def on_modified(self, event):
if event.is_directory:
return
filename = os.path.basename(event.src_path)
if filename.startswith("output_") and filename.endswith(".h5"):
try:
idx = int(filename.split("_")[1].split(".")[0])
print(f"Detected modification in {filename}")
self.manager.update_source(idx)
except Exception as e:
print("Error processing modified event:", e)
def on_created(self, event):
if event.is_directory:
return
filename = os.path.basename(event.src_path)
if filename.startswith("output_") and filename.endswith(".h5"):
try:
idx = int(filename.split("_")[1].split(".")[0])
print(f"Detected creation of {filename}")
self.manager.create_source(idx)
except Exception as e:
print("Error processing created event:", e)
def close(self):
self.observer.stop()
self.observer.join()
self.vds_file.close()
print("VDS Manager closed.")
def worker(file_index, task_queue):
"""
Each worker writes its processed data to its own file.
"""
filename = f"output_{file_index}.h5"
print(f"Worker {file_index}: Opening {filename} for writing.")
chunk = 0
with h5py.File(filename, "w", libver="latest") as f:
# create a dataset with unlimited growth axis
dset = f.create_dataset("data", (0,), maxshape=(None,), dtype="float64")
f.swmr_mode = True
while True:
print(f"Working on chunk {chunk} for file {file_index}")
task = task_queue.get()
if task is None:
print(f"Worker {file_index}: Terminating.")
break
processed_chunk = np.sin(task)
n = processed_chunk.shape[0]
old_size = dset.shape[0]
new_size = old_size + n
dset.resize((new_size,))
dset[old_size:new_size] = processed_chunk
print(f"dataset size for dataset {file_index}: {dset.shape}")
try:
f.flush() # Flush updates so SWMR readers see them.
except Exception as e:
print(f"Worker {file_index}: flush failed: {e}")
file_size = os.path.getsize(filename)
print(f"Worker {file_index}: File size is {file_size} bytes.")
print(f"Worker {file_index}: Wrote chunk of {n} elements.")
time.sleep(0.1)
chunk += 1
def live_plot_vds(vds, poll_interval=0.1):
plt.ion()
fig, ax = plt.subplots()
line, = ax.plot([], [], "b-", lw=2)
ax.set_xlabel("Index")
ax.set_ylabel("sin(value)")
ax.set_title("Live VDS Data")
while True:
time.sleep(poll_interval)
try:
ds = vds.vds_file['vdata']
ds.id.refresh()
data = ds[:]
print(data.shape)
except Exception as e:
print("Error reading VDS:", e)
data = np.array([])
if data.size > 0:
flat_data = data.flatten()
x = np.arange(flat_data.size)
line.set_data(x, flat_data)
ax.set_xlim(0, flat_data.size)
ax.set_ylim(-1.1, 1.1)
fig.canvas.draw()
fig.canvas.flush_events()
if not plt.fignum_exists(fig.number):
print("Plot window closed. Exiting live plot.")
break
plt.ioff()
plt.show()
def main():
N = 100_000 # Total number of data points.
chunk_size = 1000 # Size of each chunk.
NUM_FILES = 4 # Number of worker processes / output files.
data = np.linspace(0, 10 * np.pi, N)
# Delete existing VDS and output files before starting.
files_to_delete = ["vds.h5"] + [f"output_{i}.h5" for i in range(NUM_FILES)]
for fname in files_to_delete:
if os.path.exists(fname):
os.remove(fname)
print(f"Deleted existing file: {fname}")
# Launch worker processes.
queues = [mp.Queue() for _ in range(NUM_FILES)]
processes = []
for i in range(NUM_FILES):
p = mp.Process(target=worker, args=(i, queues[i]))
p.start()
processes.append(p)
# Dispatch data chunks in round-robin order.
num_chunks = math.ceil(N / chunk_size)
for i in range(num_chunks):
chunk = data[i * chunk_size: (i + 1) * chunk_size]
file_idx = i % NUM_FILES
queues[file_idx].put(chunk)
print(f"Main: Sent chunk {i} (size {chunk.shape[0]}) to worker {file_idx}.")
# Create the VDSManager (which opens the VDS file and starts watchdog).
vds_manager = VDSManager(NUM_FILES, vds_filename="vds.h5", directory=".")
# Start live plotting.
live_plot_vds(vds=vds_manager, poll_interval=0.1)
# Signal termination to each worker.
for q in queues:
q.put(None)
# Wait for workers to finish.
for p in processes:
p.join()
vds_manager.close()
print("All workers finished.")
if __name__ == "__main__":
main()
I am trying to simulate the creation of multiple h5 files in parallel while trying to read out the results live via a dynamically updated H5 VDS in swmr mode. All h5 files are processing different chunks of the same dataset and the data is being written to each in parallel. The event that I want to watch for is the file size increasing to know to update the VDS. I tried to implement something similar to what is shown here but it doesn't appear that the size my virtual dataset is increasing and the plot doesn't update.
The major issue seems to stem from the virtual source is not updating the virtual dataset. I am seeing the watchdog triggering the update_source when the individual part files change size. My next guess of what to try is that all files must remain open for all of the data writing and that the VDS manager must create the part files and keep them open in order to have the virtual source update.
*Note that in the end I will track the data through events so that I can properly reconstruct the data in the end. For now I just want to see the VDS update properly from the changing part files.
Here is one version of the code I have tried:
import numpy as np
import h5py
import multiprocessing as mp
import time
import matplotlib.pyplot as plt
import math
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class VDSManager:
def __init__(self, num_files, vds_filename="vds.h5", directory="."):
self.num_files = num_files
self.vds_filename = vds_filename
self.directory = directory
self.UNLIM = h5py.h5s.UNLIMITED
self.vds_file = h5py.File(vds_filename, "w", libver="latest")
self.vds_file.swmr_mode = True
self.create_layout()
self.create_vds()
self.observer = Observer()
self.handler = self.VDSHandler(self)
self.observer.schedule(self.handler, self.directory, recursive=False)
self.observer.start()
print("VDS Manager started and VDS created.")
def create_layout(self):
fname = "output_1.h5"
if os.path.exists(fname):
with h5py.File(fname, "r", swmr=True) as f:
size = f["data"].shape[0]
else:
size = 0
self.virtual_layout = h5py.VirtualLayout((self.num_files, size),
maxshape=(self.num_files, None),
dtype='float64')
def create_vds(self):
for i in range(self.num_files):
fname = f"output_{i}.h5"
if os.path.exists(fname):
with h5py.File(fname, "r", swmr=True) as f:
size = f["data"].shape[0]
else:
size = 0
print(size)
vsource = h5py.VirtualSource(fname, "data", shape=(size,), maxshape=(None,))
self.virtual_layout[i, :self.UNLIM] = vsource[:self.UNLIM]
self.vds_file.create_virtual_dataset("vdata", self.virtual_layout, fillvalue=np.nan)
def update_source(self, file_index):
vs = self.vds_file['vdata'].virtual_sources()
vs[file_index].refresh()
vs.flush()
print(self.vds_file['vdata'][:].shape)
def create_source(self, file_index):
print("File created but I don't think I actually need to do anything!")
class VDSHandler(FileSystemEventHandler):
def __init__(self, manager):
super().__init__()
self.manager = manager
def on_modified(self, event):
if event.is_directory:
return
filename = os.path.basename(event.src_path)
if filename.startswith("output_") and filename.endswith(".h5"):
try:
idx = int(filename.split("_")[1].split(".")[0])
print(f"Detected modification in {filename}")
self.manager.update_source(idx)
except Exception as e:
print("Error processing modified event:", e)
def on_created(self, event):
if event.is_directory:
return
filename = os.path.basename(event.src_path)
if filename.startswith("output_") and filename.endswith(".h5"):
try:
idx = int(filename.split("_")[1].split(".")[0])
print(f"Detected creation of {filename}")
self.manager.create_source(idx)
except Exception as e:
print("Error processing created event:", e)
def close(self):
self.observer.stop()
self.observer.join()
self.vds_file.close()
print("VDS Manager closed.")
def worker(file_index, task_queue):
"""
Each worker writes its processed data to its own file.
"""
filename = f"output_{file_index}.h5"
print(f"Worker {file_index}: Opening {filename} for writing.")
chunk = 0
with h5py.File(filename, "w", libver="latest") as f:
# create a dataset with unlimited growth axis
dset = f.create_dataset("data", (0,), maxshape=(None,), dtype="float64")
f.swmr_mode = True
while True:
print(f"Working on chunk {chunk} for file {file_index}")
task = task_queue.get()
if task is None:
print(f"Worker {file_index}: Terminating.")
break
processed_chunk = np.sin(task)
n = processed_chunk.shape[0]
old_size = dset.shape[0]
new_size = old_size + n
dset.resize((new_size,))
dset[old_size:new_size] = processed_chunk
print(f"dataset size for dataset {file_index}: {dset.shape}")
try:
f.flush() # Flush updates so SWMR readers see them.
except Exception as e:
print(f"Worker {file_index}: flush failed: {e}")
file_size = os.path.getsize(filename)
print(f"Worker {file_index}: File size is {file_size} bytes.")
print(f"Worker {file_index}: Wrote chunk of {n} elements.")
time.sleep(0.1)
chunk += 1
def live_plot_vds(vds, poll_interval=0.1):
plt.ion()
fig, ax = plt.subplots()
line, = ax.plot([], [], "b-", lw=2)
ax.set_xlabel("Index")
ax.set_ylabel("sin(value)")
ax.set_title("Live VDS Data")
while True:
time.sleep(poll_interval)
try:
ds = vds.vds_file['vdata']
ds.id.refresh()
data = ds[:]
print(data.shape)
except Exception as e:
print("Error reading VDS:", e)
data = np.array([])
if data.size > 0:
flat_data = data.flatten()
x = np.arange(flat_data.size)
line.set_data(x, flat_data)
ax.set_xlim(0, flat_data.size)
ax.set_ylim(-1.1, 1.1)
fig.canvas.draw()
fig.canvas.flush_events()
if not plt.fignum_exists(fig.number):
print("Plot window closed. Exiting live plot.")
break
plt.ioff()
plt.show()
def main():
N = 100_000 # Total number of data points.
chunk_size = 1000 # Size of each chunk.
NUM_FILES = 4 # Number of worker processes / output files.
data = np.linspace(0, 10 * np.pi, N)
# Delete existing VDS and output files before starting.
files_to_delete = ["vds.h5"] + [f"output_{i}.h5" for i in range(NUM_FILES)]
for fname in files_to_delete:
if os.path.exists(fname):
os.remove(fname)
print(f"Deleted existing file: {fname}")
# Launch worker processes.
queues = [mp.Queue() for _ in range(NUM_FILES)]
processes = []
for i in range(NUM_FILES):
p = mp.Process(target=worker, args=(i, queues[i]))
p.start()
processes.append(p)
# Dispatch data chunks in round-robin order.
num_chunks = math.ceil(N / chunk_size)
for i in range(num_chunks):
chunk = data[i * chunk_size: (i + 1) * chunk_size]
file_idx = i % NUM_FILES
queues[file_idx].put(chunk)
print(f"Main: Sent chunk {i} (size {chunk.shape[0]}) to worker {file_idx}.")
# Create the VDSManager (which opens the VDS file and starts watchdog).
vds_manager = VDSManager(NUM_FILES, vds_filename="vds.h5", directory=".")
# Start live plotting.
live_plot_vds(vds=vds_manager, poll_interval=0.1)
# Signal termination to each worker.
for q in queues:
q.put(None)
# Wait for workers to finish.
for p in processes:
p.join()
vds_manager.close()
print("All workers finished.")
if __name__ == "__main__":
main()
Share
Improve this question
asked Mar 11 at 18:58
Amanda.pyAmanda.py
1131 silver badge11 bronze badges
1
|
1 Answer
Reset to default 0I found an old post that refreshed my memory. When you use h5py.h5s.UNLIMITED
to map the VirtualSource
values to the Virtual Dataset file, the values in the VDS file auto-magically update when you add values to the source file. See this post for an example:
Creating HDF5 virtual dataset for dynamic data using h5py
In the solution above, the source data and the virtual dataset are all in the same H5 file. However, that is not required. In fact,the primary VDS use case is to assemble data from separate files into a single file. I have modified that solution to show how to do it. See code below.
num_files = 3
num_rows = 10
UNLIMITED = h5py.h5s.UNLIMITED
# create source data files
arr = np.ones((num_rows,))
for f_cnt in range(num_files):
with h5py.File(f'source_{f_cnt}.h5','w') as h5sf:
ds = h5sf.create_dataset('test_data', data=f_cnt*arr, maxshape=(None,))
with h5py.File('vds_file.h5', 'w') as h5vdsf:
# Create virtual layout
vds_layout = h5py.VirtualLayout(shape=(num_files,num_rows),
maxshape=(num_files,None), dtype='float')
for f_cnt in range(num_files):
# Define virtual sources and map to layout
with h5py.File(f'source_{f_cnt}.h5','r') as h5sf:
ds = h5sf['test_data']
v_source = h5py.VirtualSource(ds)
vds_layout[f_cnt,:UNLIMITED] = v_source[:UNLIMITED]
# Add virtual layout to virtual dataset
h5vdsf.create_virtual_dataset('v_data', vds_layout)
for f_cnt in range(num_files):
print(f'for {f_cnt}:')
print(h5vdsf['v_data'][f_cnt])
# add more data to source data files
arr = np.ones((num_rows,))
for f_cnt in range(num_files):
with h5py.File(f'source_{f_cnt}.h5','a') as h5sf:
h5sf['test_data'].resize((2*num_rows,))
h5sf['test_data'][num_rows:2*num_rows] = 10.+f_cnt*arr
with h5py.File('vds_file.h5','r') as h5vdsf:
for f_cnt in range(num_files):
print(f'for {f_cnt}:')
print(h5vdsf['v_data'][f_cnt])
VirtualLayout
andVirtualSource
definitions), does it grow when the size of the source files increase? – kcw78 Commented Mar 12 at 14:16