I recently started learning asyncio in python and faced a problem when I was practicing Queues. My task was to write a code that gets random pics from site, parses img links and downloads those images asynchronously using Queues. Unfortunately, my programm just freezes trying to read html of a response and I have no idea what to do with it. Here is my code:
async def make_request(url, session):
async with session.get(url) as response:
if response.ok:
return response
else:
print(f'{url} returned: {response.status}')
async def get_image_page(queue, session):
url = "/"
response = await make_request(url, session)
await queue.put(str(response.url))
def parse_link(html):
soup = BeautifulSoup(html, 'lxml')
u = soup.select_one('div#comic>img').get('src')
image_url = f'https:{u}'
return image_url
async def get_image_url(pages_queue, image_urls_queue, session):
while True:
url = await pages_queue.get()
response = await make_request(url, session)
html = await response.text()
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
image_url = await loop.run_in_executor(
pool, parse_link, html
)
await image_urls_queue.put(image_url)
pages_queue.task_done()
async def download_image(url):
pass
async def main():
async with aiohttp.ClientSession() as session:
pages_queue = asyncio.Queue()
image_urls_queue = asyncio.Queue()
page_getters = []
for _ in range(4):
task = asyncio.create_task(
get_image_page(pages_queue, session)
)
page_getters.append(task)
url_gettes = []
for _ in range(4):
task = asyncio.create_task(
get_image_url(pages_queue, image_urls_queue, session)
)
url_gettes.append(task)
await asyncio.gather(*page_getters)
await pages_queue.join()
for task in page_getters:
task.cancel()
print(image_urls_queue)
if __name__ == '__main__':
asyncio.run(main())
I tried to debug get_image_url() coroutine and figured that problem is in line 36
html = await response.text()
Can someone point out to me what I did wrong and how can I fix this?
I recently started learning asyncio in python and faced a problem when I was practicing Queues. My task was to write a code that gets random pics from site, parses img links and downloads those images asynchronously using Queues. Unfortunately, my programm just freezes trying to read html of a response and I have no idea what to do with it. Here is my code:
async def make_request(url, session):
async with session.get(url) as response:
if response.ok:
return response
else:
print(f'{url} returned: {response.status}')
async def get_image_page(queue, session):
url = "https://c.xkcd.com/random/comic/"
response = await make_request(url, session)
await queue.put(str(response.url))
def parse_link(html):
soup = BeautifulSoup(html, 'lxml')
u = soup.select_one('div#comic>img').get('src')
image_url = f'https:{u}'
return image_url
async def get_image_url(pages_queue, image_urls_queue, session):
while True:
url = await pages_queue.get()
response = await make_request(url, session)
html = await response.text()
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
image_url = await loop.run_in_executor(
pool, parse_link, html
)
await image_urls_queue.put(image_url)
pages_queue.task_done()
async def download_image(url):
pass
async def main():
async with aiohttp.ClientSession() as session:
pages_queue = asyncio.Queue()
image_urls_queue = asyncio.Queue()
page_getters = []
for _ in range(4):
task = asyncio.create_task(
get_image_page(pages_queue, session)
)
page_getters.append(task)
url_gettes = []
for _ in range(4):
task = asyncio.create_task(
get_image_url(pages_queue, image_urls_queue, session)
)
url_gettes.append(task)
await asyncio.gather(*page_getters)
await pages_queue.join()
for task in page_getters:
task.cancel()
print(image_urls_queue)
if __name__ == '__main__':
asyncio.run(main())
I tried to debug get_image_url() coroutine and figured that problem is in line 36
html = await response.text()
Can someone point out to me what I did wrong and how can I fix this?
Share Improve this question asked Feb 7 at 17:33 MushintoMushinto 11 silver badge New contributor Mushinto is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct.2 Answers
Reset to default 1I believe I found the problem.
When you get out of make_request()
, the response closes, if you go to ClientResponse.read(), then you can see the raised exception 'Connection closed'.
What you must do is to return the html in make_request()
rather than the object.
But that interrupts get_image_page()
function, so instead you can have a parameter that decides to return html or object:
async def make_request(url, session, get_html=False): # get_html = False means we want response, True means we want html
async with session.get(url) as response:
if response.ok:
if get_html:
return await response.text() # Return html
return response # Return response object
else:
print(f'{url} returned: {response.status}')
async def get_image_page(queue, session):
url = "https://c.xkcd.com/random/comic/"
response = await make_request(url, session)
await queue.put(str(response.url))
def parse_link(html):
soup = BeautifulSoup(html, 'lxml')
u = soup.select_one('div#comic>img').get('src')
image_url = f'https:{u}'
return image_url
async def get_image_url(pages_queue, image_urls_queue, session):
while True:
url = await pages_queue.get()
html = await make_request(url, session, True) # Get html isntead of response object
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
image_url = await loop.run_in_executor(
pool, parse_link, html
)
await image_urls_queue.put(image_url)
pages_queue.task_done()
@NewToLife64 has already identified the reason why you are unable to retrieve text, i.e. after the block async with session.get(url) as response:
exits, the connection is closed and you can no longer do await response.text()
. But there are several other issues with your code that should be pointed out.
First, in the code I will present I have slightly simplified the code. Now get_image_page
just puts the same URL "https://c.xkcd.com/random/comic/" in the pages queue instead of first fetching this URL and putting in the pages queue the actual URL returned. It seems to me that you get the same random results you would otherwise get and it avoids retrieving the each page twice. So here are the issues:
- Your function
make_request
tests for a possible error in which case it prints an error message and implicitly returnsNone
. Yet your callers of this function, e.g.get_image_page
does not test forNone
possibly being returned and just goes ahead and accessesresponse.url
. This will raise an exception wheneverresponse
is None. - In testing I discovered instances where a page was successfully returned but the statement
u = soup.select_one('div#comic>img').get('src')
in functionparse_link
raises an exception because the expressionsoup.select_one('div#comic>img')
evaluates toNone
. - You wait for all page getter tasks to complete with
await asyncio.gather(*page_getters)
but you shortly follow this with a loop to cancel all of these tasks. But why? The tasks have already terminated. - You have
print(image_urls_queue)
, but that is not going to reveal what is actually in the queue, which is the whole point of doing these downloads. - In
get_image_url
you are creating a multiprocessing pool for each downloaded page to be processed. Creating such a pool is a fairly expensive operation. If you are going to do this (I wouldn't), at least create a pool size of 1 since you are only submitting a single task to the pool for the pool's lifetime. Better would be to create the pool once inmain
and pass it toget_image_url
. In this case you do not want a pool size of 1 since the same pool will have multiple tasks submitted to it.
The following code incorporates what I believe are necessary changes. Please also read the comments:
import asyncio
from concurrent.futures import ProcessPoolExecutor
from bs4 import BeautifulSoup
import aiohttp
async def get_page_text(url, session):
async with session.get(url) as response:
if response.ok:
return await response.text()
else:
print(f'{url} returned: {response.status}')
# There is an implcit return None here:
#return None
# Rename queue to pages_queue so we know which queue this is:
async def get_image_page(pages_queue, session):
# We have simplified the code so in this implementation
# the session argument is not used.
url = "https://c.xkcd.com/random/comic/"
await pages_queue.put(url)
def parse_link(html):
soup = BeautifulSoup(html, 'lxml')
img = soup.select_one('div#comic>img')
return f"https:{img.get('src')}" if img else None
async def get_image_url(pages_queue, image_urls_queue, session, pool):
while True:
url = await pages_queue.get()
html = await get_page_text(url, session)
# get_page_text can return None, so test for it:
if html:
loop = asyncio.get_running_loop()
image_url = await loop.run_in_executor(
pool, parse_link, html
)
await image_urls_queue.put(image_url)
pages_queue.task_done()
async def main():
async with aiohttp.ClientSession() as session:
# Create the pool only once and pass it as an argument
with ProcessPoolExecutor() as pool:
pages_queue = asyncio.Queue()
image_urls_queue = asyncio.Queue()
# Use a list comprehension:
page_getters = [
asyncio.create_task(
get_image_page(pages_queue, session)
)
for _ in range(4)
]
# Use a list comprehension (and a slightly renamed variable):
url_getters = [
asyncio.create_task(
get_image_url(pages_queue, image_urls_queue, session, pool)
)
for _ in range(4)
]
# Wait for all URLs to placed in the pages queue:
await asyncio.gather(*page_getters)
# Now we can be sure that when we join the queue, all pages have
# been retrieved:
await pages_queue.join()
# No reason to cancel these tasks, which have completed:
"""
for task in url_getters:
task.cancel()
"""
# Retrive all entries:
try:
while True:
item = image_urls_queue.get_nowait()
print(item)
except asyncio.QueueEmpty:
pass
if __name__ == '__main__':
asyncio.run(main())
Prints:
https://imgs.xkcd.com/comics/familiar.jpg
https://imgs.xkcd.com/comics/prometheus.png
https://imgs.xkcd.com/comics/vaccine_research.png
https://imgs.xkcd.com/comics/evidence_of_alien_life.png
Notes
I understand that you wanted practice with queues. I just want to mention that you don't really need to use queues. Likewise, the code in parse_link
is too trivial to be using multiprocessing for because of the additional overhead in creating a pool and submitting tasks to it.