I am trying to test python asyncio
and aiohttp
. Idea is to fetch the data from API parallely and store the .html file in a local drive. Below is my code.
import asyncio
import aiohttp
import time
import os
url_i = "<some_urls>-"
file_path = "<local_drive>\\asynciotest"
async def download_pep(pep_number: int) -> bytes:
url = url + f"{pep_number}/"
print(f"Begin downloading {url}")
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
content = await resp.read()
print(f"Finished downloading {url}")
return content
async def write_to_file(pep_number: int, content: bytes) -> None:
with open(os.path.join(file_path,f"{pep_number}"+'-async.html'), "wb") as pep_file:
print(f"{pep_number}_Begin writing ")
pep_file.write(content)
print(f"Finished writing")
async def web_scrape_task(pep_number: int) -> None:
content = await download_pep(pep_number)
await write_to_file(pep_number, content)
async def main() -> None:
tasks = []
for i in range(8010, 8016):
tasks.append(web_scrape_task(i))
await asyncio.wait(tasks)
if __name__ == "__main__":
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"Execution time: {elapsed:0.2f} seconds.")
The above code is throwing an error
TypeError: Passing coroutines is forbidden, use tasks explicitly.
sys:1: RuntimeWarning: coroutine 'web_scrape_task' was never awaited
I am completely novish in asyncio
hence not getting any clue. I have followed documentation but have not got any clue.
Am I missing here things?
Edit
I am trying to call APIs sequentially with each concurrent / parallel call. For this I am using asyncio.Semaphore() and restricting the concurrency into 2. I got the clue from here and from the comments below.
I have made the revision in the code below:
async def web_scrape_task(pep_number: int) -> None:
for i in range(8010, 8016):
content = await download_pep(i)
await write_to_file(pep_number, content)
##To limit concurrent call 2##
sem = asyncio.Semaphore(2)
async def main() -> None:
tasks = []
for i in range(8010, 8016):
async with sem:
tasks.append(asyncio.create_task(web_scrape_task(i)))
await asyncio.gather(*tasks)
if __name__ == "__main__":
s = time.perf_counter()
asyncio.run(main())
#await main()
elapsed = time.perf_counter() - s
print(f"Execution time: {elapsed:0.2f} seconds.")
Now the question is whether this is the correct approach?
I am trying to test python asyncio
and aiohttp
. Idea is to fetch the data from API parallely and store the .html file in a local drive. Below is my code.
import asyncio
import aiohttp
import time
import os
url_i = "<some_urls>-"
file_path = "<local_drive>\\asynciotest"
async def download_pep(pep_number: int) -> bytes:
url = url + f"{pep_number}/"
print(f"Begin downloading {url}")
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
content = await resp.read()
print(f"Finished downloading {url}")
return content
async def write_to_file(pep_number: int, content: bytes) -> None:
with open(os.path.join(file_path,f"{pep_number}"+'-async.html'), "wb") as pep_file:
print(f"{pep_number}_Begin writing ")
pep_file.write(content)
print(f"Finished writing")
async def web_scrape_task(pep_number: int) -> None:
content = await download_pep(pep_number)
await write_to_file(pep_number, content)
async def main() -> None:
tasks = []
for i in range(8010, 8016):
tasks.append(web_scrape_task(i))
await asyncio.wait(tasks)
if __name__ == "__main__":
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"Execution time: {elapsed:0.2f} seconds.")
The above code is throwing an error
TypeError: Passing coroutines is forbidden, use tasks explicitly.
sys:1: RuntimeWarning: coroutine 'web_scrape_task' was never awaited
I am completely novish in asyncio
hence not getting any clue. I have followed documentation but have not got any clue.
Am I missing here things?
Edit
I am trying to call APIs sequentially with each concurrent / parallel call. For this I am using asyncio.Semaphore() and restricting the concurrency into 2. I got the clue from here and from the comments below.
I have made the revision in the code below:
async def web_scrape_task(pep_number: int) -> None:
for i in range(8010, 8016):
content = await download_pep(i)
await write_to_file(pep_number, content)
##To limit concurrent call 2##
sem = asyncio.Semaphore(2)
async def main() -> None:
tasks = []
for i in range(8010, 8016):
async with sem:
tasks.append(asyncio.create_task(web_scrape_task(i)))
await asyncio.gather(*tasks)
if __name__ == "__main__":
s = time.perf_counter()
asyncio.run(main())
#await main()
elapsed = time.perf_counter() - s
print(f"Execution time: {elapsed:0.2f} seconds.")
Now the question is whether this is the correct approach?
Share Improve this question edited Feb 17 at 4:50 pythondumb asked Feb 16 at 8:51 pythondumbpythondumb 1,2471 gold badge16 silver badges39 bronze badges1 Answer
Reset to default 2The error occurs because you're passing raw coroutines to asyncio.wait()
instead of scheduling them as tasks. All you have to do is wrap your web_scrape_task()
call inside main()
with asyncio.create_task()
, like so:
async def main() -> None:
tasks = []
for i in range(8010, 8016):
tasks.append(asyncio.create_task(web_scrape_task(i)))
await asyncio.wait(tasks)
that way the coroutine is converted into an asyncio task and is correctly awaited.
Hope this helps :)
EDIT: Full Code
w/ concurrent method calls & a single aiohttp Client Session
import asyncio
import aiohttp
import time
import os
url_i = "<some_urls>-"
file_path = "<local_drive>\\asynciotest"
async def download_pep(session: aiohttp.ClientSession, pep_number: int) -> bytes:
url = url_i + f"{pep_number}/"
print(f"Begin downloading {url}")
async with session.get(url) as resp:
content = await resp.read()
print(f"Finished downloading {url}")
return content
async def write_to_file(pep_number: int, content: bytes) -> None:
with open(os.path.join(file_path,f"{pep_number}"+'-async.html'), "wb") as pep_file:
print(f"{pep_number}_Begin writing ")
pep_file.write(content)
print(f"Finished writing")
async def web_scrape_task(session: aiohttp.ClientSession, pep_number: int) -> None:
content = await download_pep(session, pep_number)
await write_to_file(pep_number, content)
async def main() -> None:
async with aiohttp.ClientSession() as session:
tasks = [web_scrape_task(session, i) for i in range(8010, 8016)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"Execution time: {elapsed:0.2f} seconds.")
EDIT 2: Semaphore Handling
Small modification for limiting the concurrency (is that a word?) to two.
async def web_scrape_task(session: aiohttp.ClientSession, pep_number: int, semaphore: asyncio.Semaphore) -> None: # take in semaphore as a prop
async with semaphore: # synchronise with the semaphore here
content = await download_pep(session, pep_number)
await write_to_file(pep_number, content)
async def main() -> None:
semaphore = asyncio.Semaphore(2) # initialise a semaphore
async with aiohttp.ClientSession() as session:
tasks = [web_scrape_task(session, i, semaphore) for i in range(8010, 8016)] # pass semaphore as a prop
await asyncio.gather(*tasks)