最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Dynamically update h5py VDS from data written to in parallel -- multiple writer multiple reader case - Stack Overflow

programmeradmin5浏览0评论

I am trying to simulate the creation of multiple h5 files in parallel while trying to read out the results live via a dynamically updated H5 VDS in swmr mode. All h5 files are processing different chunks of the same dataset and the data is being written to each in parallel. The event that I want to watch for is the file size increasing to know to update the VDS. I tried to implement something similar to what is shown here but it doesn't appear that the size my virtual dataset is increasing and the plot doesn't update.

The major issue seems to stem from the virtual source is not updating the virtual dataset. I am seeing the watchdog triggering the update_source when the individual part files change size. My next guess of what to try is that all files must remain open for all of the data writing and that the VDS manager must create the part files and keep them open in order to have the virtual source update.

*Note that in the end I will track the data through events so that I can properly reconstruct the data in the end. For now I just want to see the VDS update properly from the changing part files.

Here is one version of the code I have tried:

import numpy as np
import h5py
import multiprocessing as mp
import time
import matplotlib.pyplot as plt
import math
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class VDSManager:
    def __init__(self, num_files, vds_filename="vds.h5", directory="."):
        self.num_files = num_files
        self.vds_filename = vds_filename
        self.directory = directory
        self.UNLIM = h5py.h5s.UNLIMITED
        self.vds_file = h5py.File(vds_filename, "w", libver="latest")
        self.vds_file.swmr_mode = True
        self.create_layout()
        self.create_vds()
        self.observer = Observer()
        self.handler = self.VDSHandler(self)
        self.observer.schedule(self.handler, self.directory, recursive=False)
        self.observer.start()
        print("VDS Manager started and VDS created.")

    def create_layout(self):
        fname = "output_1.h5"
        if os.path.exists(fname):
            with h5py.File(fname, "r", swmr=True) as f:
                size = f["data"].shape[0]
        else:
            size = 0
        self.virtual_layout = h5py.VirtualLayout((self.num_files, size),
                                                 maxshape=(self.num_files, None),
                                                 dtype='float64')

    def create_vds(self):
        for i in range(self.num_files):
            fname = f"output_{i}.h5"
            if os.path.exists(fname):
                with h5py.File(fname, "r", swmr=True) as f:
                    size = f["data"].shape[0]
            else:
                size = 0
            print(size)
            vsource = h5py.VirtualSource(fname, "data", shape=(size,), maxshape=(None,))
            self.virtual_layout[i, :self.UNLIM] = vsource[:self.UNLIM]
        self.vds_file.create_virtual_dataset("vdata", self.virtual_layout, fillvalue=np.nan)

    def update_source(self, file_index):
        vs = self.vds_file['vdata'].virtual_sources()
        vs[file_index].refresh()
        vs.flush()
        print(self.vds_file['vdata'][:].shape)

    def create_source(self, file_index):
        print("File created but I don't think I actually need to do anything!")

    class VDSHandler(FileSystemEventHandler):
        def __init__(self, manager):
            super().__init__()
            self.manager = manager

        def on_modified(self, event):
            if event.is_directory:
                return
            filename = os.path.basename(event.src_path)
            if filename.startswith("output_") and filename.endswith(".h5"):
                try:
                    idx = int(filename.split("_")[1].split(".")[0])
                    print(f"Detected modification in {filename}")
                    self.manager.update_source(idx)
                except Exception as e:
                    print("Error processing modified event:", e)

        def on_created(self, event):
            if event.is_directory:
                return
            filename = os.path.basename(event.src_path)
            if filename.startswith("output_") and filename.endswith(".h5"):
                try:
                    idx = int(filename.split("_")[1].split(".")[0])
                    print(f"Detected creation of {filename}")
                    self.manager.create_source(idx)
                except Exception as e:
                    print("Error processing created event:", e)

    def close(self):
        self.observer.stop()
        self.observer.join()
        self.vds_file.close()
        print("VDS Manager closed.")


def worker(file_index, task_queue):
    """
    Each worker writes its processed data to its own file.
    """
    filename = f"output_{file_index}.h5"
    print(f"Worker {file_index}: Opening {filename} for writing.")
    chunk = 0
    with h5py.File(filename, "w", libver="latest") as f:
        # create a dataset with unlimited growth axis
        dset = f.create_dataset("data", (0,), maxshape=(None,), dtype="float64")
        f.swmr_mode = True
        while True:
            print(f"Working on chunk {chunk} for file {file_index}")
            task = task_queue.get()
            if task is None:
                print(f"Worker {file_index}: Terminating.")
                break
            processed_chunk = np.sin(task)
            n = processed_chunk.shape[0]
            old_size = dset.shape[0]
            new_size = old_size + n
            dset.resize((new_size,))
            dset[old_size:new_size] = processed_chunk
            print(f"dataset size for dataset {file_index}: {dset.shape}")
            try:
                f.flush() # Flush updates so SWMR readers see them.
            except Exception as e:
                print(f"Worker {file_index}: flush failed: {e}")
            file_size = os.path.getsize(filename)
            print(f"Worker {file_index}: File size is {file_size} bytes.")
            print(f"Worker {file_index}: Wrote chunk of {n} elements.")
            time.sleep(0.1)
            chunk += 1


def live_plot_vds(vds, poll_interval=0.1):
    plt.ion()
    fig, ax = plt.subplots()
    line, = ax.plot([], [], "b-", lw=2)
    ax.set_xlabel("Index")
    ax.set_ylabel("sin(value)")
    ax.set_title("Live VDS Data")
    while True:
        time.sleep(poll_interval)
        try:
            ds = vds.vds_file['vdata']
            ds.id.refresh()
            data = ds[:]
            print(data.shape)
        except Exception as e:
            print("Error reading VDS:", e)
            data = np.array([])
        if data.size > 0:
            flat_data = data.flatten()
            x = np.arange(flat_data.size)
            line.set_data(x, flat_data)
            ax.set_xlim(0, flat_data.size)
            ax.set_ylim(-1.1, 1.1)
            fig.canvas.draw()
            fig.canvas.flush_events()
        if not plt.fignum_exists(fig.number):
            print("Plot window closed. Exiting live plot.")
            break
    plt.ioff()
    plt.show()


def main():
    N = 100_000         # Total number of data points.
    chunk_size = 1000   # Size of each chunk.
    NUM_FILES = 4       # Number of worker processes / output files.
    data = np.linspace(0, 10 * np.pi, N)

    # Delete existing VDS and output files before starting.
    files_to_delete = ["vds.h5"] + [f"output_{i}.h5" for i in range(NUM_FILES)]
    for fname in files_to_delete:
        if os.path.exists(fname):
            os.remove(fname)
            print(f"Deleted existing file: {fname}")

    # Launch worker processes.
    queues = [mp.Queue() for _ in range(NUM_FILES)]
    processes = []
    for i in range(NUM_FILES):
        p = mp.Process(target=worker, args=(i, queues[i]))
        p.start()
        processes.append(p)

    # Dispatch data chunks in round-robin order.
    num_chunks = math.ceil(N / chunk_size)
    for i in range(num_chunks):
        chunk = data[i * chunk_size: (i + 1) * chunk_size]
        file_idx = i % NUM_FILES
        queues[file_idx].put(chunk)
        print(f"Main: Sent chunk {i} (size {chunk.shape[0]}) to worker {file_idx}.")

    # Create the VDSManager (which opens the VDS file and starts watchdog).
    vds_manager = VDSManager(NUM_FILES, vds_filename="vds.h5", directory=".")

    # Start live plotting.
    live_plot_vds(vds=vds_manager, poll_interval=0.1)

    # Signal termination to each worker.
    for q in queues:
        q.put(None)

    # Wait for workers to finish.
    for p in processes:
        p.join()

    vds_manager.close()
    print("All workers finished.")


if __name__ == "__main__":
    main()

I am trying to simulate the creation of multiple h5 files in parallel while trying to read out the results live via a dynamically updated H5 VDS in swmr mode. All h5 files are processing different chunks of the same dataset and the data is being written to each in parallel. The event that I want to watch for is the file size increasing to know to update the VDS. I tried to implement something similar to what is shown here but it doesn't appear that the size my virtual dataset is increasing and the plot doesn't update.

The major issue seems to stem from the virtual source is not updating the virtual dataset. I am seeing the watchdog triggering the update_source when the individual part files change size. My next guess of what to try is that all files must remain open for all of the data writing and that the VDS manager must create the part files and keep them open in order to have the virtual source update.

*Note that in the end I will track the data through events so that I can properly reconstruct the data in the end. For now I just want to see the VDS update properly from the changing part files.

Here is one version of the code I have tried:

import numpy as np
import h5py
import multiprocessing as mp
import time
import matplotlib.pyplot as plt
import math
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class VDSManager:
    def __init__(self, num_files, vds_filename="vds.h5", directory="."):
        self.num_files = num_files
        self.vds_filename = vds_filename
        self.directory = directory
        self.UNLIM = h5py.h5s.UNLIMITED
        self.vds_file = h5py.File(vds_filename, "w", libver="latest")
        self.vds_file.swmr_mode = True
        self.create_layout()
        self.create_vds()
        self.observer = Observer()
        self.handler = self.VDSHandler(self)
        self.observer.schedule(self.handler, self.directory, recursive=False)
        self.observer.start()
        print("VDS Manager started and VDS created.")

    def create_layout(self):
        fname = "output_1.h5"
        if os.path.exists(fname):
            with h5py.File(fname, "r", swmr=True) as f:
                size = f["data"].shape[0]
        else:
            size = 0
        self.virtual_layout = h5py.VirtualLayout((self.num_files, size),
                                                 maxshape=(self.num_files, None),
                                                 dtype='float64')

    def create_vds(self):
        for i in range(self.num_files):
            fname = f"output_{i}.h5"
            if os.path.exists(fname):
                with h5py.File(fname, "r", swmr=True) as f:
                    size = f["data"].shape[0]
            else:
                size = 0
            print(size)
            vsource = h5py.VirtualSource(fname, "data", shape=(size,), maxshape=(None,))
            self.virtual_layout[i, :self.UNLIM] = vsource[:self.UNLIM]
        self.vds_file.create_virtual_dataset("vdata", self.virtual_layout, fillvalue=np.nan)

    def update_source(self, file_index):
        vs = self.vds_file['vdata'].virtual_sources()
        vs[file_index].refresh()
        vs.flush()
        print(self.vds_file['vdata'][:].shape)

    def create_source(self, file_index):
        print("File created but I don't think I actually need to do anything!")

    class VDSHandler(FileSystemEventHandler):
        def __init__(self, manager):
            super().__init__()
            self.manager = manager

        def on_modified(self, event):
            if event.is_directory:
                return
            filename = os.path.basename(event.src_path)
            if filename.startswith("output_") and filename.endswith(".h5"):
                try:
                    idx = int(filename.split("_")[1].split(".")[0])
                    print(f"Detected modification in {filename}")
                    self.manager.update_source(idx)
                except Exception as e:
                    print("Error processing modified event:", e)

        def on_created(self, event):
            if event.is_directory:
                return
            filename = os.path.basename(event.src_path)
            if filename.startswith("output_") and filename.endswith(".h5"):
                try:
                    idx = int(filename.split("_")[1].split(".")[0])
                    print(f"Detected creation of {filename}")
                    self.manager.create_source(idx)
                except Exception as e:
                    print("Error processing created event:", e)

    def close(self):
        self.observer.stop()
        self.observer.join()
        self.vds_file.close()
        print("VDS Manager closed.")


def worker(file_index, task_queue):
    """
    Each worker writes its processed data to its own file.
    """
    filename = f"output_{file_index}.h5"
    print(f"Worker {file_index}: Opening {filename} for writing.")
    chunk = 0
    with h5py.File(filename, "w", libver="latest") as f:
        # create a dataset with unlimited growth axis
        dset = f.create_dataset("data", (0,), maxshape=(None,), dtype="float64")
        f.swmr_mode = True
        while True:
            print(f"Working on chunk {chunk} for file {file_index}")
            task = task_queue.get()
            if task is None:
                print(f"Worker {file_index}: Terminating.")
                break
            processed_chunk = np.sin(task)
            n = processed_chunk.shape[0]
            old_size = dset.shape[0]
            new_size = old_size + n
            dset.resize((new_size,))
            dset[old_size:new_size] = processed_chunk
            print(f"dataset size for dataset {file_index}: {dset.shape}")
            try:
                f.flush() # Flush updates so SWMR readers see them.
            except Exception as e:
                print(f"Worker {file_index}: flush failed: {e}")
            file_size = os.path.getsize(filename)
            print(f"Worker {file_index}: File size is {file_size} bytes.")
            print(f"Worker {file_index}: Wrote chunk of {n} elements.")
            time.sleep(0.1)
            chunk += 1


def live_plot_vds(vds, poll_interval=0.1):
    plt.ion()
    fig, ax = plt.subplots()
    line, = ax.plot([], [], "b-", lw=2)
    ax.set_xlabel("Index")
    ax.set_ylabel("sin(value)")
    ax.set_title("Live VDS Data")
    while True:
        time.sleep(poll_interval)
        try:
            ds = vds.vds_file['vdata']
            ds.id.refresh()
            data = ds[:]
            print(data.shape)
        except Exception as e:
            print("Error reading VDS:", e)
            data = np.array([])
        if data.size > 0:
            flat_data = data.flatten()
            x = np.arange(flat_data.size)
            line.set_data(x, flat_data)
            ax.set_xlim(0, flat_data.size)
            ax.set_ylim(-1.1, 1.1)
            fig.canvas.draw()
            fig.canvas.flush_events()
        if not plt.fignum_exists(fig.number):
            print("Plot window closed. Exiting live plot.")
            break
    plt.ioff()
    plt.show()


def main():
    N = 100_000         # Total number of data points.
    chunk_size = 1000   # Size of each chunk.
    NUM_FILES = 4       # Number of worker processes / output files.
    data = np.linspace(0, 10 * np.pi, N)

    # Delete existing VDS and output files before starting.
    files_to_delete = ["vds.h5"] + [f"output_{i}.h5" for i in range(NUM_FILES)]
    for fname in files_to_delete:
        if os.path.exists(fname):
            os.remove(fname)
            print(f"Deleted existing file: {fname}")

    # Launch worker processes.
    queues = [mp.Queue() for _ in range(NUM_FILES)]
    processes = []
    for i in range(NUM_FILES):
        p = mp.Process(target=worker, args=(i, queues[i]))
        p.start()
        processes.append(p)

    # Dispatch data chunks in round-robin order.
    num_chunks = math.ceil(N / chunk_size)
    for i in range(num_chunks):
        chunk = data[i * chunk_size: (i + 1) * chunk_size]
        file_idx = i % NUM_FILES
        queues[file_idx].put(chunk)
        print(f"Main: Sent chunk {i} (size {chunk.shape[0]}) to worker {file_idx}.")

    # Create the VDSManager (which opens the VDS file and starts watchdog).
    vds_manager = VDSManager(NUM_FILES, vds_filename="vds.h5", directory=".")

    # Start live plotting.
    live_plot_vds(vds=vds_manager, poll_interval=0.1)

    # Signal termination to each worker.
    for q in queues:
        q.put(None)

    # Wait for workers to finish.
    for p in processes:
        p.join()

    vds_manager.close()
    print("All workers finished.")


if __name__ == "__main__":
    main()
Share Improve this question asked Mar 11 at 18:58 Amanda.pyAmanda.py 1131 silver badge11 bronze badges 1
  • I have only created VDS files a few times. I'm curious -- since the VDS file is a "view" to the source files (via the VirtualLayout and VirtualSource definitions), does it grow when the size of the source files increase? – kcw78 Commented Mar 12 at 14:16
Add a comment  | 

1 Answer 1

Reset to default 0

I found an old post that refreshed my memory. When you use h5py.h5s.UNLIMITED to map the VirtualSource values to the Virtual Dataset file, the values in the VDS file auto-magically update when you add values to the source file. See this post for an example:

Creating HDF5 virtual dataset for dynamic data using h5py

In the solution above, the source data and the virtual dataset are all in the same H5 file. However, that is not required. In fact,the primary VDS use case is to assemble data from separate files into a single file. I have modified that solution to show how to do it. See code below.

num_files = 3
num_rows = 10
UNLIMITED = h5py.h5s.UNLIMITED

# create source data files
arr = np.ones((num_rows,))
for f_cnt in range(num_files):
     with h5py.File(f'source_{f_cnt}.h5','w') as h5sf:
        ds = h5sf.create_dataset('test_data', data=f_cnt*arr, maxshape=(None,))

with h5py.File('vds_file.h5', 'w') as h5vdsf:
    # Create virtual layout
    vds_layout = h5py.VirtualLayout(shape=(num_files,num_rows),
                                    maxshape=(num_files,None), dtype='float')
    for f_cnt in range(num_files):
        # Define virtual sources and map to layout
        with h5py.File(f'source_{f_cnt}.h5','r') as h5sf:
            ds = h5sf['test_data']
            v_source = h5py.VirtualSource(ds)
            vds_layout[f_cnt,:UNLIMITED] = v_source[:UNLIMITED]
    # Add virtual layout to virtual dataset
    h5vdsf.create_virtual_dataset('v_data', vds_layout)
    for f_cnt in range(num_files):
        print(f'for {f_cnt}:')
        print(h5vdsf['v_data'][f_cnt])

# add more data to source data files
arr = np.ones((num_rows,))
for f_cnt in range(num_files):
     with h5py.File(f'source_{f_cnt}.h5','a') as h5sf:
         h5sf['test_data'].resize((2*num_rows,))
         h5sf['test_data'][num_rows:2*num_rows] = 10.+f_cnt*arr

with h5py.File('vds_file.h5','r') as h5vdsf:
    for f_cnt in range(num_files):
        print(f'for {f_cnt}:')
        print(h5vdsf['v_data'][f_cnt])

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论