最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

asynchronous - Asynchronously running a function in the background while sending results in Python - Stack Overflow

programmeradmin1浏览0评论

I have a Python script which I would like to a) Continuously listen for requests from an API, and b) Run an expensive algorithm continuously in the background. The results from the algorithm are saved locally to a file (results.json), and if the API requests the result, I would like to load current results from disk and send it to the endpoint.

I've provided a simplified sketch of the code logic below. However I'm not sure how to structure it so that algo.run runs in the background constantly, while Main.send periodically does the API calls. Since both are blocking operations, should I be using async def in the methods in Algorithm as well? In addition, how would I handle the edge case where Main.send tries to read the file from disk, while algo.run is trying to save the file at the same time?

Any suggestions would be greatly appreciated!

import json
import random
import time

ENDPOINT = ''  # Some external API

class Main:
    def __init__(self):
        algo = Algorithm()
    
        # self.send() ## These would block
        # algo.run()
    
    async def send(self):

        # Continuously listen on an endpoint
        while True:

            response = requests.get(ENDPOINT).json()
            if response['should_send']:
    
                # Load the result from disk
                with open('results.json', 'r') as file:
                    outputs = json.load(file)

                # Send results to API
                requests.post(ENDPOINT, outputs)
    
            time.sleep(60)

class Algorithm:
    def __init__(self):
        pass
    def run(self):
        # Some expensive computations, running repeatedly in background
        while True:
            outputs = {'result' : random.random()}
            time.sleep(60)

            # Save result to disk
            with open('results.json', 'w') as file:
                json.dump(outputs, file)

I have a Python script which I would like to a) Continuously listen for requests from an API, and b) Run an expensive algorithm continuously in the background. The results from the algorithm are saved locally to a file (results.json), and if the API requests the result, I would like to load current results from disk and send it to the endpoint.

I've provided a simplified sketch of the code logic below. However I'm not sure how to structure it so that algo.run runs in the background constantly, while Main.send periodically does the API calls. Since both are blocking operations, should I be using async def in the methods in Algorithm as well? In addition, how would I handle the edge case where Main.send tries to read the file from disk, while algo.run is trying to save the file at the same time?

Any suggestions would be greatly appreciated!

import json
import random
import time

ENDPOINT = 'http://some_api/endpoint'  # Some external API

class Main:
    def __init__(self):
        algo = Algorithm()
    
        # self.send() ## These would block
        # algo.run()
    
    async def send(self):

        # Continuously listen on an endpoint
        while True:

            response = requests.get(ENDPOINT).json()
            if response['should_send']:
    
                # Load the result from disk
                with open('results.json', 'r') as file:
                    outputs = json.load(file)

                # Send results to API
                requests.post(ENDPOINT, outputs)
    
            time.sleep(60)

class Algorithm:
    def __init__(self):
        pass
    def run(self):
        # Some expensive computations, running repeatedly in background
        while True:
            outputs = {'result' : random.random()}
            time.sleep(60)

            # Save result to disk
            with open('results.json', 'w') as file:
                json.dump(outputs, file)
Share Improve this question asked Mar 14 at 2:44 AdamAdam 3891 gold badge4 silver badges19 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

If your CPU dependent algorithm doesn't pause for I/O (or even if it does), simply run it into another thread.

Asyncio has some functionality to call code in other threads, which can be great (and even easier to use than concurrent.futures.

In your case, this might work

import json
import random
import time
import asyncio
import httpx  # (you will use this to replace requests)

ENDPOINT = 'http://some_api/endpoint'  # Some external API

# let's keep things simple: 
# in Python things that don't need to be
# a class, don't need to be a class! 

async def main():
    
    # this now being async, it can orchestrate the 
    # lifetime of Algorithm classes:
    
    algo = Algorithm()
    
    # create the "send" task, which then will 
    # run as the async-loop in the main thread
    # is idle!
    send_task = asyncio.create_task(send())
    
    # sets a task that will run "algo" in another thread: 
    algo_task = asyncio.create_task(asyncio.to_thread(algo.run))
    
    # pass the control to the loop, staying idle and allowing
    # both tasks to run:
    asyncio.gather(send_task, algo_task)
                                    
                                    

async def send(self):

    # Continuously listen on an endpoint
    with httpx.AsyncClient as client:
        while True:

            # "requests" is not really an asynchronous lib
            # use httpx instead:
            # 
            response = (await httpx.get(ENDPOINT)).json()
            if response['should_send']:

                # Load the result from disk
                with open('results.json', 'r') as file:
                    outputs = json.load(file)

                # Send results to API
                requests.post(ENDPOINT, outputs)
            # pauses 60 seconds while allowing other
            # async tasks to run in the same thread:
            await asyncio.sleep(60)

class Algorithm:
    def __init__(self):
        pass
    def run(self):
        # Some expensive computations, running repeatedly in background
        while True:
            outputs = {'result' : random.random()}
            # this is running in other thread, no problem using synchronous sleep:
            time.sleep(60)

            # Save result to disk
            with open('results.json', 'w') as file:
                json.dump(outputs, file)

if __name__ == "__main__":
    asyncio.run(main())

Actually, here, asyncio would not even be needed; I just ended up using it because you mentioned it on the question and your send method was marked as asynchronous. The real deal is to have the non-collaborative "algo" code in a different thread.

But the asyncio example can be the base for you to interleave other i/o bound tasks in the same project.

As for your other question, of the file being written while it is simultaneousy written is of no concern: since you are issuing separate open calls, the OS will do the right thing for you (ok, Windows might give you an OSError, if you try to open the file while the other open is in course, but MacOS, Linux, and all other conceivable OSes will just work): Let's say that a write would hit just after the file was open for reading: if the "open" operation already resolved, the program will read normally from the previous version of the file, even while the filesystem will show the new version as it's been written to.

If writting takes time, however, there is the risk of sending a partial file (or a 0 lenght file) in the converse case: when saving has started and is ongoing, and the file is open for reading. If writing is fast, and sending the file can always wait its completion, all you need is a lock (in this case, a threading.Lock):

import json
import random
import time
import asyncio
import httpx  # (you will use this to replace requests)
import threading

ENDPOINT = 'http://some_api/endpoint'  # Some external API

# let's keep things simple: 
# in Python things that don't need to be
# a class, don't need to be a class! 
# if "send" has some state it would like
async def main():
    
    # this now being async, it can orchestrate the 
    # lifetime of Algorithm classes:
    
    lock = threading.Lock()
    
    algo = Algorithm(lock)
    
    # create the "send" task, which then will 
    # run as the async-loop in the main thread
    # is idle!
    send_task = asyncio.create_task(send(lock))
    
    # sets a task that will run "algo" in another thread: 
    algo_task = asyncio.create_task(asyncio.to_thread(algo.run))
    
    # pass the control to the loop, staying idle and allowing
    # both tasks to run:
    asyncio.gather(send_task, algo_task)
                                    
                                    

async def send(self, lock):

    # Continuously listen on an endpoint
    with httpx.AsyncClient as client:
        while True:

            # "requests" is not really an asynchronous lib
            # use httpx instead:
            # 
            response = (await httpx.get(ENDPOINT)).json()
            if response['should_send']:

                # Load the result from disk
                with lock, open('results.json', 'r') as file:
                    outputs = json.load(file)

                # Send results to API
                requests.post(ENDPOINT, outputs)
            # pauses 60 seconds while allowing other
            # async tasks to run in the same thread:
            await asyncio.sleep(60)

class Algorithm:
    def __init__(self, lock):
        self.lock = lock
    def run(self):
        # Some expensive computations, running repeatedly in background
        while True:
            outputs = {'result' : random.random()}
            # this is running in other thread, no problem using synchronous sleep:
            time.sleep(60)

            # Save result to disk
            with lock, open('results.json', 'w') as file:
                json.dump(outputs, file)

if __name__ == "__main__":
    asyncio.run(main())

Otherwise, say, writting to the file takes several seconds, and you rather not wait, the pattern would be to write to a different file name, and then, once writing is done, in the writer task, rename the new file over the old one, using the lock mechanism:

import os
...

class Algorithm:

    ...
    
    def run(self):
        while True:
            ...
            with open('new_results.json', 'w') as file:
                json.dump(outputs, file)
                
            with lock:
                os.rename("new_results.json", "results.json")

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论