最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Python asyncio not able to run the tasks - Stack Overflow

programmeradmin3浏览0评论

I am trying to test python asyncio and aiohttp. Idea is to fetch the data from API parallely and store the .html file in a local drive. Below is my code.

import asyncio
import aiohttp
import time
import os

url_i = "<some_urls>-"
file_path = "<local_drive>\\asynciotest"

async def download_pep(pep_number: int) -> bytes:
  url = url + f"{pep_number}/"
  print(f"Begin downloading {url}")
  async with aiohttp.ClientSession() as session:
    async with session.get(url) as resp:
      content = await resp.read()
      print(f"Finished downloading {url}")
      return content
    
async def write_to_file(pep_number: int, content: bytes) -> None:
  with open(os.path.join(file_path,f"{pep_number}"+'-async.html'), "wb") as pep_file:
    print(f"{pep_number}_Begin writing ")
    pep_file.write(content)
    print(f"Finished writing")

async def web_scrape_task(pep_number: int) -> None:
  content = await download_pep(pep_number)
  await write_to_file(pep_number, content)

async def main() -> None:
  tasks = []
  for i in range(8010, 8016):
    tasks.append(web_scrape_task(i))
  await asyncio.wait(tasks)

if __name__ == "__main__": 
  s = time.perf_counter()
  asyncio.run(main())     
  elapsed = time.perf_counter() - s
  print(f"Execution time: {elapsed:0.2f} seconds.")

The above code is throwing an error

TypeError: Passing coroutines is forbidden, use tasks explicitly.
sys:1: RuntimeWarning: coroutine 'web_scrape_task' was never awaited

I am completely novish in asyncio hence not getting any clue. I have followed documentation but have not got any clue.

Am I missing here things?

Edit

I am trying to call APIs sequentially with each concurrent / parallel call. For this I am using asyncio.Semaphore() and restricting the concurrency into 2. I got the clue from here and from the comments below.

I have made the revision in the code below:

async def web_scrape_task(pep_number: int) -> None:
  for i in range(8010, 8016):
    content = await download_pep(i)
    await write_to_file(pep_number, content)
    
##To limit concurrent call 2##
sem = asyncio.Semaphore(2)

async def main() -> None:
  tasks = []
  for i in range(8010, 8016):
    async with sem:
        tasks.append(asyncio.create_task(web_scrape_task(i)))
  await asyncio.gather(*tasks)

if __name__ == "__main__": 
  s = time.perf_counter()
  asyncio.run(main())
  #await main()
  elapsed = time.perf_counter() - s
  print(f"Execution time: {elapsed:0.2f} seconds.")

Now the question is whether this is the correct approach?

I am trying to test python asyncio and aiohttp. Idea is to fetch the data from API parallely and store the .html file in a local drive. Below is my code.

import asyncio
import aiohttp
import time
import os

url_i = "<some_urls>-"
file_path = "<local_drive>\\asynciotest"

async def download_pep(pep_number: int) -> bytes:
  url = url + f"{pep_number}/"
  print(f"Begin downloading {url}")
  async with aiohttp.ClientSession() as session:
    async with session.get(url) as resp:
      content = await resp.read()
      print(f"Finished downloading {url}")
      return content
    
async def write_to_file(pep_number: int, content: bytes) -> None:
  with open(os.path.join(file_path,f"{pep_number}"+'-async.html'), "wb") as pep_file:
    print(f"{pep_number}_Begin writing ")
    pep_file.write(content)
    print(f"Finished writing")

async def web_scrape_task(pep_number: int) -> None:
  content = await download_pep(pep_number)
  await write_to_file(pep_number, content)

async def main() -> None:
  tasks = []
  for i in range(8010, 8016):
    tasks.append(web_scrape_task(i))
  await asyncio.wait(tasks)

if __name__ == "__main__": 
  s = time.perf_counter()
  asyncio.run(main())     
  elapsed = time.perf_counter() - s
  print(f"Execution time: {elapsed:0.2f} seconds.")

The above code is throwing an error

TypeError: Passing coroutines is forbidden, use tasks explicitly.
sys:1: RuntimeWarning: coroutine 'web_scrape_task' was never awaited

I am completely novish in asyncio hence not getting any clue. I have followed documentation but have not got any clue.

Am I missing here things?

Edit

I am trying to call APIs sequentially with each concurrent / parallel call. For this I am using asyncio.Semaphore() and restricting the concurrency into 2. I got the clue from here and from the comments below.

I have made the revision in the code below:

async def web_scrape_task(pep_number: int) -> None:
  for i in range(8010, 8016):
    content = await download_pep(i)
    await write_to_file(pep_number, content)
    
##To limit concurrent call 2##
sem = asyncio.Semaphore(2)

async def main() -> None:
  tasks = []
  for i in range(8010, 8016):
    async with sem:
        tasks.append(asyncio.create_task(web_scrape_task(i)))
  await asyncio.gather(*tasks)

if __name__ == "__main__": 
  s = time.perf_counter()
  asyncio.run(main())
  #await main()
  elapsed = time.perf_counter() - s
  print(f"Execution time: {elapsed:0.2f} seconds.")

Now the question is whether this is the correct approach?

Share Improve this question edited Feb 17 at 4:50 pythondumb asked Feb 16 at 8:51 pythondumbpythondumb 1,2471 gold badge16 silver badges39 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 2

The error occurs because you're passing raw coroutines to asyncio.wait() instead of scheduling them as tasks. All you have to do is wrap your web_scrape_task() call inside main() with asyncio.create_task(), like so:

async def main() -> None:
    tasks = []
    for i in range(8010, 8016):
        tasks.append(asyncio.create_task(web_scrape_task(i)))
    await asyncio.wait(tasks)

that way the coroutine is converted into an asyncio task and is correctly awaited.

Hope this helps :)

EDIT: Full Code

w/ concurrent method calls & a single aiohttp Client Session

import asyncio
import aiohttp
import time
import os

url_i = "<some_urls>-"
file_path = "<local_drive>\\asynciotest"

async def download_pep(session: aiohttp.ClientSession, pep_number: int) -> bytes:
    url = url_i + f"{pep_number}/"
    print(f"Begin downloading {url}")
    async with session.get(url) as resp:
        content = await resp.read()
    print(f"Finished downloading {url}")
    return content

async def write_to_file(pep_number: int, content: bytes) -> None:
  with open(os.path.join(file_path,f"{pep_number}"+'-async.html'), "wb") as pep_file:
    print(f"{pep_number}_Begin writing ")
    pep_file.write(content)
    print(f"Finished writing")

async def web_scrape_task(session: aiohttp.ClientSession, pep_number: int) -> None:
    content = await download_pep(session, pep_number)
    await write_to_file(pep_number, content)

async def main() -> None:
    async with aiohttp.ClientSession() as session:
        tasks = [web_scrape_task(session, i) for i in range(8010, 8016)]
        await asyncio.gather(*tasks)

if __name__ == "__main__": 
    s = time.perf_counter()
    asyncio.run(main())
    elapsed = time.perf_counter() - s
    print(f"Execution time: {elapsed:0.2f} seconds.")

EDIT 2: Semaphore Handling

Small modification for limiting the concurrency (is that a word?) to two.

async def web_scrape_task(session: aiohttp.ClientSession, pep_number: int, semaphore: asyncio.Semaphore) -> None: # take in semaphore as a prop
    async with semaphore:  # synchronise with the semaphore here
        content = await download_pep(session, pep_number)
        await write_to_file(pep_number, content)

async def main() -> None:
    semaphore = asyncio.Semaphore(2) # initialise a semaphore
    async with aiohttp.ClientSession() as session:
        tasks = [web_scrape_task(session, i, semaphore) for i in range(8010, 8016)] # pass semaphore as a prop
        await asyncio.gather(*tasks)
发布评论

评论列表(0)

  1. 暂无评论