最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Creating a stoppable background task in Python - Stack Overflow

programmeradmin0浏览0评论

I want to run some function in the background, but be able to shut it down at will. I tried a bunch of solutions, the most obvious of which is asyncio tasks:

# Data source
DataSource = Generator[int, Any, None]
def create_data_source() -> DataSource
   ...

# Data processing - should be able to run in the background until being shut down
async def data_processor(source: DataSource) -> None:
    try:
        for item in source:
            print("Working")

    except Exception as e:
        print("Exception", e)
        raise

    finally:
        print("Cancelled")

async def main():
    # Run the function
    data_source = create_data_source()
    processing_task = asyncio.create_task(data_processor(data_source))

    # Do other stuff
    await asyncio.sleep(3)

    # Shut the function down
    processing_task.cancel()

asyncio.run(main())

The problem is - once the task starts, the "Working" loop never terminates. I tried making the loop async, I tried sending it a shutdown signal using asyncio.Event, and I even tried rewriting the function using concurrent.futures, but nothing works. What am I missing?

Note:

  1. I searched through previous Q&As, but haven't found a solution.
  2. This is running on Python 3.11 for dependencies reasons, but if newer versions have the solution then I might be able to update.

I want to run some function in the background, but be able to shut it down at will. I tried a bunch of solutions, the most obvious of which is asyncio tasks:

# Data source
DataSource = Generator[int, Any, None]
def create_data_source() -> DataSource
   ...

# Data processing - should be able to run in the background until being shut down
async def data_processor(source: DataSource) -> None:
    try:
        for item in source:
            print("Working")

    except Exception as e:
        print("Exception", e)
        raise

    finally:
        print("Cancelled")

async def main():
    # Run the function
    data_source = create_data_source()
    processing_task = asyncio.create_task(data_processor(data_source))

    # Do other stuff
    await asyncio.sleep(3)

    # Shut the function down
    processing_task.cancel()

asyncio.run(main())

The problem is - once the task starts, the "Working" loop never terminates. I tried making the loop async, I tried sending it a shutdown signal using asyncio.Event, and I even tried rewriting the function using concurrent.futures, but nothing works. What am I missing?

Note:

  1. I searched through previous Q&As, but haven't found a solution.
  2. This is running on Python 3.11 for dependencies reasons, but if newer versions have the solution then I might be able to update.
Share Improve this question asked Feb 2 at 14:39 DocomDocom 256 bronze badges 2
  • 1 An asyncio Task can only be canceled at an await expression. Your data_processor function doesn't contain any await expression, so asyncio cannot cancel it once it starts. Threads are not a solution since they cannot be canceled. A Python Process might work for you, since it has a terminate function. docs.python./3/library/… – Paul Cornelius Commented Feb 3 at 4:39
  • 1 In general: an await asyncio.sleep(0) (or several of them depending on the code) could be inserted into the main loop. The question is how long does it take to reach such point in average or in worst case. – VPfB Commented Feb 3 at 9:39
Add a comment  | 

2 Answers 2

Reset to default 2

Your data_processing coroutine, which is not issuing any await statements seems to be a poor candidate for being an async function. You should consider running it as a sync function using run_in_executor using either a thread pool (the default) or a multiprocessing pool created with concurrent.futures.ProcessPoolExecutor according to its CPU usage. For example:

Using a Multithreading Pool

import asyncio
import time
from threading import Event

# Data processing - should be able to run in the background until being shut down
def data_processor(stop_event) -> None:
    start_time = time.time()
    try:
        while True:
            if stop_event.is_set():
                break
            print("Working")
    except Exception as e:
        print("Exception", e)
        raise
    finally:
        print(f"Started at {start_time}, ended at {time.time()}")

async def main():
    stop_event = Event()
    # We don't need a larger pool:
    loop = asyncio.get_running_loop()
    awaitable = loop.run_in_executor(None, data_processor, stop_event)

    # Do other stuff
    await asyncio.sleep(3)

    # Shut the function down
    stop_event.set()
    await awaitable  # Wait for task to complete

if __name__ == '__main__':
    asyncio.run(main())

Prints:

Working
Working
...
Working
Started at 1738589602.7931993, ended at 1738589605.79333

Using a Multiprocessing Pool

Here we cannot directly pass a multiprocessing.Event instance to the worker function and must instead use a pool initializer to initialize each pool process with a global reference to the event:

import asyncio
import time
from multiprocessing import Event
from concurrent.futures import ProcessPoolExecutor

def init_pool(_stop_event: Event) -> None:
    global stop_event

    stop_event = _stop_event

# Data processing - should be able to run in the background until being shut down
def data_processor() -> None:
    start_time = time.time()
    try:
        while True:
            if stop_event.is_set():
                break
            print("Working")
    except Exception as e:
        print("Exception", e)
        raise
    finally:
        print(f"Started at {start_time}, ended at {time.time()}")

async def main():
    stop_event = Event()
    # We don't need a larger pool:
    executor = ProcessPoolExecutor(1, initializer=init_pool, initargs=(stop_event,))
    loop = asyncio.get_running_loop()
    awaitable = loop.run_in_executor(executor, data_processor)

    # Do other stuff
    await asyncio.sleep(3)

    # Shut the function down
    stop_event.set()
    await awaitable  # Wait for task to complete

if __name__ == '__main__':
    asyncio.run(main())

Easy, But Perhaps Not the Best Solution

Last and least, you could just insert calls to await asyncio.sleep(0) to give othe async tasks a chance to run:

import asyncio
import time

# Data processing - should be able to run in the background until being shut down
async def data_processor() -> None:
    start_time = time.time()
    try:
        while True:
            print("Working")
            await asyncio.sleep(0)
    except Exception as e:
        print("Exception", e)
        raise
    finally:
        print(f"Started at {start_time}, ended at {time.time()}")

async def main():
    processing_task = asyncio.create_task(data_processor())

    # Do other stuff
    await asyncio.sleep(3)

    # Shut the function down
    processing_task.cancel()

asyncio.run(main())

Perhaps something like this will work:

import threading
import time


class MyTask:
    def __init__(self):
        self.thread = None
        self.stop_event = threading.Event()

    def _wrapper(self, func, *args, **kwargs):
        """Wrapper function to run the task and check for stop signal."""
        try:
            func(self.stop_event, *args, **kwargs)
        except Exception as e:
            print(f"Task encountered an error: {e}")

    def start_task(self, func, *args, **kwargs):
        """Starts the function in a separate thread."""
        if self.thread and self.thread.is_alive():
            print("Task is already running.")
            return

        self.stop_event.clear()
        self.thread = threading.Thread(target=self._wrapper, args=(func, *args), kwargs=kwargs)
        self.thread.start()

    def stop_task(self):
        """Stops the running task."""
        if self.thread and self.thread.is_alive():
            self.stop_event.set()  # Signal the function to stop
            print("Stopping the task...")
        else:
            print("No active task to stop.")


def example_task(stop_event):
    """Example function that stops when the stop_event is set."""
    for i in range(10):
        if stop_event.is_set():
            print("Task stopping early.")
            return
        print(f"Running... {i}")
        time.sleep(1)
    print("Task completed.")


if __name__ == "__main__":
    task = MyTask()
    task.start_task(example_task)
    time.sleep(3)
    task.stop_task()

Using the MyTask class you can create a background task on a different thread. The general idea is that you need to write the background task in a function (the example_task function) that accepts stop_event as an argument. It can accept other arguments as well, of course. In this function you must manually implement some logic that checks if stop_event is set and return if it is.

发布评论

评论列表(0)

  1. 暂无评论