最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Using a dict for caching async function result does not use cached results - Stack Overflow

programmeradmin7浏览0评论
BASE_URL = "/"
DATA_URL_PART = "v8/finance/chart/{ticker}"

async def fetch_close_price(aiosession: aiohttp.ClientSession,
                            ticker: str, start: datetime, end: datetime, ticker_price_dict):
    params = {
        'period1': int(to_midnight(start, naive=False).timestamp()),
        'period2': int(to_midnight(end, naive=False).timestamp()),
        'interval': '1d',
        'includeAdjustedClose': 'false'  # Explicitly exclude adjusted close prices
    }

    print(f'Checking cache for ticker {ticker}')
    if ticker in ticker_price_dict:
        print(f'Found cache for ticker {ticker}')
        return ticker_price_dict[ticker]

    async with aiosession.get(DATA_URL_PART.format(ticker=ticker), params=params) as response:
        if response.status == 200:
            print(f"Downloading {ticker}", response.status)
            data = await response.json()
            ....
            ....
            df = pd.DataFrame({ticker: close_prices},
                              index=dates)
            ticker_price_dict[ticker] = df
            print(f"Downloaded and setting {ticker}", response.status)
            return df
        else:
            print(f"Failed to fetch data for {ticker}", response.status)
            return pd.DataFrame()

async def download_close_prices(aiosession: aiohttp.ClientSession,
                                tickers: list[str], start: datetime, end: datetime):
    all_ticker_close = pd.DataFrame()
    ticker_price_dict = {}
    tasks = [fetch_close_price(aiosession, ticker, start, end, ticker_price_dict) for ticker in tickers]
    results = await asyncio.gather(*tasks)

async def download_close_prices_all(tickers, start, end):
    connector = aiohttp.TCPConnector(limit=50)
    async with aiohttp.ClientSession(BASE_URL, connector=connector) as aiosession:
        return await download_close_prices(aiosession, tickers, start, end) 

if __name__ == "__main__":
    tickers = ["AAPL", "USDINR=X", "XCN18679-USD", "XCN18679-USD", "XCN18679-USD"]
    close_prices = asyncio.run(download_close_prices_all(tickers, start, end))

I have this asyncio function which uses aiohttp to make requests and return dataframes, sometimes I might have lots of repeated tickers to fetch like the example given where I am calling "XCN18679-USD" many times, I tried passing in a simple dict to cache the results but it never seems to find the ticker in dict and always downloads, I don't know what to do next, maybe use cache, but sometimes I can get download failed and I don't want to cache that. Can anyone point me in the right direction what I might be doing wrong? thanks a lot to everyone in advance.

BASE_URL = "https://query2.finance.yahoo.com/"
DATA_URL_PART = "v8/finance/chart/{ticker}"

async def fetch_close_price(aiosession: aiohttp.ClientSession,
                            ticker: str, start: datetime, end: datetime, ticker_price_dict):
    params = {
        'period1': int(to_midnight(start, naive=False).timestamp()),
        'period2': int(to_midnight(end, naive=False).timestamp()),
        'interval': '1d',
        'includeAdjustedClose': 'false'  # Explicitly exclude adjusted close prices
    }

    print(f'Checking cache for ticker {ticker}')
    if ticker in ticker_price_dict:
        print(f'Found cache for ticker {ticker}')
        return ticker_price_dict[ticker]

    async with aiosession.get(DATA_URL_PART.format(ticker=ticker), params=params) as response:
        if response.status == 200:
            print(f"Downloading {ticker}", response.status)
            data = await response.json()
            ....
            ....
            df = pd.DataFrame({ticker: close_prices},
                              index=dates)
            ticker_price_dict[ticker] = df
            print(f"Downloaded and setting {ticker}", response.status)
            return df
        else:
            print(f"Failed to fetch data for {ticker}", response.status)
            return pd.DataFrame()

async def download_close_prices(aiosession: aiohttp.ClientSession,
                                tickers: list[str], start: datetime, end: datetime):
    all_ticker_close = pd.DataFrame()
    ticker_price_dict = {}
    tasks = [fetch_close_price(aiosession, ticker, start, end, ticker_price_dict) for ticker in tickers]
    results = await asyncio.gather(*tasks)

async def download_close_prices_all(tickers, start, end):
    connector = aiohttp.TCPConnector(limit=50)
    async with aiohttp.ClientSession(BASE_URL, connector=connector) as aiosession:
        return await download_close_prices(aiosession, tickers, start, end) 

if __name__ == "__main__":
    tickers = ["AAPL", "USDINR=X", "XCN18679-USD", "XCN18679-USD", "XCN18679-USD"]
    close_prices = asyncio.run(download_close_prices_all(tickers, start, end))

I have this asyncio function which uses aiohttp to make requests and return dataframes, sometimes I might have lots of repeated tickers to fetch like the example given where I am calling "XCN18679-USD" many times, I tried passing in a simple dict to cache the results but it never seems to find the ticker in dict and always downloads, I don't know what to do next, maybe use cache, but sometimes I can get download failed and I don't want to cache that. Can anyone point me in the right direction what I might be doing wrong? thanks a lot to everyone in advance.

Share Improve this question edited Feb 5 at 8:56 mkrieger1 23.2k7 gold badges63 silver badges79 bronze badges asked Feb 5 at 8:48 SAKSAK 1463 silver badges16 bronze badges 3
  • 1 The dict gets populated with results after each function checks whether the symbol is in there. — Wouldn't it make more sense to simply deduplicate tickers anyway by using a set? – deceze Commented Feb 5 at 9:05
  • 1. Yes that is the issue, I don't know where to place the dict, as you rightly pointed out it get populated after, how can I make this so that down the line when that ticker is called again that I have it available. 2. I need the entire dataframe to be returned not just deduplicate hence using the dict – SAK Commented Feb 5 at 9:29
  • 1 Since all your functions are running in parallel (insofar as they do in asyncio), they all check "at the same time", all see that the symbol hasn't been fetched yet, and all start making requests. You need to deduplicate the list before calling fetch_close_price. With the current logic of fetch_close_price checking for dupes, it's not gonna work. – deceze Commented Feb 5 at 9:37
Add a comment  | 

1 Answer 1

Reset to default 2

For sure the simplest solution is to adopt the comment of @deceze and just ensure that your list of tickers has no duplicates, When that is not practical for whatever reason, then the following technique can be used:

When fetch_close_price discovers that the ticker price is not in the cache, it will go ahead and make the request to fetch it. But it will first create a Future instance representing the completion of the request and store that in the cache. When the price is finally downloaded, it will be placed in the cache but the future that had been there will be set with the price.

If, however, the cache does have an entry for the ticker, then it is either a Future instance or the actual price. In the former case, we simply await the future's completion. In the latter case we already have the price. Either way, no new request to download the price need be made.

The only complication is determining whether the cache, if not empty, contains a future or the actual price. When I create a future with loop.create_future, the actual class is _asyncio.Future. Rather than not having to assume any particular class for futures, I presumably know the class of a price and can check for that (a str instance in my code below).

import asyncio

async def fetch_close_price(ticker: str, ticker_price_dict):
    print(f'Checking cache for ticker {ticker}')
    price = ticker_price_dict.get(ticker)
    if price:
        # We cannot be sure of the class used for a future, so
        # we check to see if this is the class of a price.
        # My price is just a str instance
        if not isinstance(price, str):
            # This is a future
            future = price
            print(f'ticker {ticker} has already been requested.')
            price = await future
            print(f'Got price {price!r} for ticker {ticker} from the future.')
        else:
            print(f'Found price for ticker {ticker} in cache')
        return price

    # The ticker is not in the cache, so we have to make a request
    # Put a future in the cache to signal to other tasks interested in
    # the same ticker that the request has been made.
    future = asyncio.get_running_loop().create_future()
    ticker_price_dict[ticker] = future

    # Download ticker. Here we emulate doing that by sleeping a bit:
    print(f'Making request for ticker {ticker}')
    await asyncio.sleep(1)
    price = f'{ticker} price'
    print(f'Got price {price!r} for ticket {ticker} from the request')
    # Put the 'result" in the cache:
    ticker_price_dict[ticker] = price
    # And tell others interested in this price that we have it:
    future.set_result(price)

async def download_close_prices(tickers: list[str]):
    ticker_price_dict = {}
    tasks = [fetch_close_price(ticker, ticker_price_dict) for ticker in tickers]
    results = await asyncio.gather(*tasks)
    return results

async def download_close_prices_all(tickers: list[str]):
    return await download_close_prices(tickers)

if __name__ == "__main__":
    tickers = ["AAPL", "USDINR=X", "XCN18679-USD", "XCN18679-USD", "XCN18679-USD"]
    close_prices = asyncio.run(download_close_prices_all(tickers))

Prints:

Checking cache for ticker AAPL
Making request for ticker AAPL
Checking cache for ticker USDINR=X
Making request for ticker USDINR=X
Checking cache for ticker XCN18679-USD
Making request for ticker XCN18679-USD
Checking cache for ticker XCN18679-USD
ticker XCN18679-USD has already been requested.
Checking cache for ticker XCN18679-USD
ticker XCN18679-USD has already been requested.
Got price 'AAPL price' for ticket AAPL from the request
Got price 'USDINR=X price' for ticket USDINR=X from the request
Got price 'XCN18679-USD price' for ticket XCN18679-USD from the request
Got price 'XCN18679-USD price' for ticker XCN18679-USD from the future.
Got price 'XCN18679-USD price' for ticker XCN18679-USD from the future.
发布评论

评论列表(0)

  1. 暂无评论