I have a script1.py to process videos one by one frequently The extract is saved in a folder.
script1.py - process videos
Output_Videos\namevideo1_Output
And the second script monitoring.py When running it monitors the following folders:
monitoring.py - monitoring
Output_Videos\namevideo1_Output\Images
Output_Videos\namevideo1_Output\Text
But when script1.py processing the second video automatic
Output_Videos\namevideo1_Output - is complete
Output_Videos\namevideo2_Output
Script monitoring.py stops monitoring Because when it was run for the first time, "monitoring.py" it only found one folder to monitor. how to run the "monitoring.py" script once and it checks repeatedly if any new folders Output_Videos and monitoring
example
Output_Videos\namevideo1_Output\Images
Output_Videos\namevideo1_Output\Text
Output_Videos\namevideo2_Output\Images
Output_Videos\namevideo2_Output\Text
Output_Videos\namevideo3_Output\Images
Output_Videos\namevideo3_Output\Text
...
monitoring.py
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from glob import glob
class EventHandler(FileSystemEventHandler):
def __init__(self, text):
self.text = text
def on_created(self, event):
action, path = (event.event_type, event.src_path)
print(self.text.format(path))
def monitor_directories(data):
observer = Observer()
for directory, text in data:
event_handler = EventHandler(text)
observer.schedule(event_handler, directory, recursive=True)
observer.start()
observer = PollingObserver()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
print("Crop sub area\n")
data = [
# always select a dir path (if using glob, be careful, it returns list), then custom text with {} in it that will become path
[glob("Output_Videos/*/Images")[0], " Crop images [Images] : {} Done"],
[glob("Output_Videos/*/Text")[0], " Crop images [Text] : {} Done"]
]
monitor_directories(data)
I have a script1.py to process videos one by one frequently The extract is saved in a folder.
script1.py - process videos
Output_Videos\namevideo1_Output
And the second script monitoring.py When running it monitors the following folders:
monitoring.py - monitoring
Output_Videos\namevideo1_Output\Images
Output_Videos\namevideo1_Output\Text
But when script1.py processing the second video automatic
Output_Videos\namevideo1_Output - is complete
Output_Videos\namevideo2_Output
Script monitoring.py stops monitoring Because when it was run for the first time, "monitoring.py" it only found one folder to monitor. how to run the "monitoring.py" script once and it checks repeatedly if any new folders Output_Videos and monitoring
example
Output_Videos\namevideo1_Output\Images
Output_Videos\namevideo1_Output\Text
Output_Videos\namevideo2_Output\Images
Output_Videos\namevideo2_Output\Text
Output_Videos\namevideo3_Output\Images
Output_Videos\namevideo3_Output\Text
...
monitoring.py
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from glob import glob
class EventHandler(FileSystemEventHandler):
def __init__(self, text):
self.text = text
def on_created(self, event):
action, path = (event.event_type, event.src_path)
print(self.text.format(path))
def monitor_directories(data):
observer = Observer()
for directory, text in data:
event_handler = EventHandler(text)
observer.schedule(event_handler, directory, recursive=True)
observer.start()
observer = PollingObserver()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
print("Crop sub area\n")
data = [
# always select a dir path (if using glob, be careful, it returns list), then custom text with {} in it that will become path
[glob("Output_Videos/*/Images")[0], " Crop images [Images] : {} Done"],
[glob("Output_Videos/*/Text")[0], " Crop images [Text] : {} Done"]
]
monitor_directories(data)
Share
Improve this question
edited Feb 8 at 2:07
Oussama Gamer
asked Feb 8 at 1:58
Oussama GamerOussama Gamer
513 bronze badges
2
- From what I understood, you need a script to observe any new file created under a folder and apply some operation over it. Am I right? – Rakesh B Sirvi Commented Feb 8 at 2:12
- Yes. Folders with different names are saved in the same directory. "Output_Videos" Using glob Folders with different names can be monitored. "Output_Videos/*/Images" & "Output_Videos/*/Text" – Oussama Gamer Commented Feb 8 at 2:17
1 Answer
Reset to default 0Try out this solution, It uses the same watch dog library and watches for any changes in the given folder. It triggers when the file is created in the root folder or the sub-folders also. I have added 2's delay once the file is created since we don't want to access a file just as its created as it may lead to problems if there was some data that was being written and we prematurely access it.
import time
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class StableFileHandler(FileSystemEventHandler):
def __init__(self, delay=2):
"""
:param delay: Time in seconds to wait for the file to stabilize (default: 2s)
"""
self.delay = delay
def on_created(self, event):
if not event.is_directory: # Ignore folder creation
file_path = event.src_path
print(f"New file detected: {file_path}. Waiting for completion...")
while True:
try:
initial_size = os.path.getsize(file_path)
time.sleep(self.delay)
final_size = os.path.getsize(file_path)
if initial_size == final_size: # File size stabilized
print(f"File fully written: {file_path}")
# add your processing part here
break
except FileNotFoundError:
# Handle cases where file might be removed before stabilization
print(f"File {file_path} was removed before completion.")
break
def monitor_directory(path_to_watch):
"""
Monitors the given directory for file creation events.
"""
event_handler = StableFileHandler()
observer = Observer()
observer.schedule(event_handler, path_to_watch, recursive=True)
observer.start()
try:
print(f"Monitoring directory: {path_to_watch}")
while True:
time.sleep(1) # Keep script running
except KeyboardInterrupt:
observer.stop()
print("Stopping monitoring...")
observer.join()
if __name__ == "__main__":
directory_to_watch = "./folder" # Change this to your target folder and ensure the folder exists
monitor_directory(directory_to_watch)