I already have a code using threadpool tkiniter and matplotlib to process signals which are getting written to a file from another process. The Synchronization between the two process is by reading and writing to the same file. Using the following function which is busy waiting until a newline is written to the file.
def readline_char_by_char(file,fig):
line = ''
while True:
char = file.read(1)
if not char:
fig.canvas.flush_events()
#time.sleep(0.01)
continue
line += char
if char == '\n':
return line
Then we are computing the values from a chunk of signal. The following is my core loop of the code.
while True:
try:
line = readline_char_by_char(file,fig)
signalx.append(float(line.strip().split()[0]))
signaly.append(float(line.strip().split()[1]))
if len(signaly) == 1000:
num_signal += 1
pulse_label.config(text=f"Number of Pulses: {num_signal}")
if num_signal % 1000 == 0:
print(f"Updating: {num_signal}")
#update_first_plot(axs[0][0])
first_plot_line_removal(axs[0][0],lines)
lines[0].set_data(signalx, signaly)
filtered_signal_y = filter_fft(signaly)
save_filtered_signal(outfile, filtered_signal_y)
success, istart, fintercept, y_sub_bipolar, y_baseline_restored, y_shifted, y_attenuated = find_true_cfd(np.multiply(-1, signaly), d)
lines[1].set_data(signalx, np.multiply(-1,y_shifted))
lines[2].set_data(signalx, y_sub_bipolar)
first_plot_vertical_line_drawing(axs[0][0], istart, l1, d, l3)
if istart < 0 or not success:
signalx = []
signaly = []
continue
i_delayed, i_total, y_filtered_baselineCorrected = charge_comparison_method(signalx, filtered_signal_y, istart, delayed_start=istart+l2, end=istart+l3)
I_Delayed.append(i_delayed)
I_Total.append(i_total)
# if len(I_Delayed) % 1000 == 0:
# # print("Inside I_Delayed vs I_Total scatter")
# sc_return_scatter(I_Delayed, I_Total, axs[0][1])
TOF.append(fintercept)
true_cfd_out_file.write(f"{fintercept}\n")
integration_file.write(f"{i_delayed} {i_total}\n")
energy = slope * i_total + intercept
psd = i_delayed / i_total
energy_file.write(f"{energy}\n")
psd_file.write(f"{psd}\n")
energy_psd_file.write(f"{energy} {psd}\n")
PSD.append(psd)
Energy.append(energy)
# if success and istart > 0:
# pulse_x.append(signalx.copy())
# pulse_y.append(np.roll(np.multiply(-1, y_baseline_restored.copy()), -istart))
# filtered_y_signals.append(np.roll(y_filtered_baselineCorrected.copy(), -istart))
if len(PSD) % 1000 == 0:
# print("Inside PSD")
#app.after(0, update_plots, I_Delayed, I_Total, Energy, PSD, TOF, axs, fig)
update_plots(I_Delayed, I_Total, Energy, PSD, TOF, axs, fig)
I already have a code using threadpool tkiniter and matplotlib to process signals which are getting written to a file from another process. The Synchronization between the two process is by reading and writing to the same file. Using the following function which is busy waiting until a newline is written to the file.
def readline_char_by_char(file,fig):
line = ''
while True:
char = file.read(1)
if not char:
fig.canvas.flush_events()
#time.sleep(0.01)
continue
line += char
if char == '\n':
return line
Then we are computing the values from a chunk of signal. The following is my core loop of the code.
while True:
try:
line = readline_char_by_char(file,fig)
signalx.append(float(line.strip().split()[0]))
signaly.append(float(line.strip().split()[1]))
if len(signaly) == 1000:
num_signal += 1
pulse_label.config(text=f"Number of Pulses: {num_signal}")
if num_signal % 1000 == 0:
print(f"Updating: {num_signal}")
#update_first_plot(axs[0][0])
first_plot_line_removal(axs[0][0],lines)
lines[0].set_data(signalx, signaly)
filtered_signal_y = filter_fft(signaly)
save_filtered_signal(outfile, filtered_signal_y)
success, istart, fintercept, y_sub_bipolar, y_baseline_restored, y_shifted, y_attenuated = find_true_cfd(np.multiply(-1, signaly), d)
lines[1].set_data(signalx, np.multiply(-1,y_shifted))
lines[2].set_data(signalx, y_sub_bipolar)
first_plot_vertical_line_drawing(axs[0][0], istart, l1, d, l3)
if istart < 0 or not success:
signalx = []
signaly = []
continue
i_delayed, i_total, y_filtered_baselineCorrected = charge_comparison_method(signalx, filtered_signal_y, istart, delayed_start=istart+l2, end=istart+l3)
I_Delayed.append(i_delayed)
I_Total.append(i_total)
# if len(I_Delayed) % 1000 == 0:
# # print("Inside I_Delayed vs I_Total scatter")
# sc_return_scatter(I_Delayed, I_Total, axs[0][1])
TOF.append(fintercept)
true_cfd_out_file.write(f"{fintercept}\n")
integration_file.write(f"{i_delayed} {i_total}\n")
energy = slope * i_total + intercept
psd = i_delayed / i_total
energy_file.write(f"{energy}\n")
psd_file.write(f"{psd}\n")
energy_psd_file.write(f"{energy} {psd}\n")
PSD.append(psd)
Energy.append(energy)
# if success and istart > 0:
# pulse_x.append(signalx.copy())
# pulse_y.append(np.roll(np.multiply(-1, y_baseline_restored.copy()), -istart))
# filtered_y_signals.append(np.roll(y_filtered_baselineCorrected.copy(), -istart))
if len(PSD) % 1000 == 0:
# print("Inside PSD")
#app.after(0, update_plots, I_Delayed, I_Total, Energy, PSD, TOF, axs, fig)
update_plots(I_Delayed, I_Total, Energy, PSD, TOF, axs, fig)
Share
Improve this question
edited Jan 30 at 14:23
bigreddot
34.6k5 gold badges73 silver badges127 bronze badges
asked Jan 30 at 12:37
Ayan BanerjeeAyan Banerjee
1612 silver badges12 bronze badges
2
- 1 It sounds like you have an interesting use case, but I'm not quite sure what the question is. – mdurant Commented Jan 30 at 14:18
- I have implemented a file reading mechanism to read a text file line by line and after 1000 line my event loop starts two algorithms to compute two parameters and update a 2D scatter plot with those two values. I want to scale the file reading and Matplotlib updating using Dask. – Ayan Banerjee Commented Jan 30 at 19:08
1 Answer
Reset to default -1Your workflow is quite complex, so I have some thoughts for you rather than an answer.
I am not sure whether it's the data reading, the calculation, or the graph updating which are slow.
Reading bytes should never consume your CPU time, this is pure latency. Polling is fine (and streamz's own file reader does this), but there's also iofiles and similar, or builtin [loop methods]*https://docs.python./3/library/asyncio-eventloop.html#asyncio.loop.add_reader) which you can use.
pure-python code does not parallelise well, and I suspect you are working with very little data here, so function call overheads will dominate
streamz's integration with dask is via the futures interface of distributed, which adds considerable overhead, not least for moving data to and from the cluster. Before trying to integrate with your GUI, I would play with distributed alone, to see if you can achieve better performance.