最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - How do I fix asyncio.run() not working if called from a running event loop? - Stack Overflow

programmeradmin3浏览0评论

I'm trying to create a python script which streams a whole bunch of data live from the alpaca trade api. At first I got an error about not being able to call asyncio.run() from a running event loop, so I used asyncio.new_event_loop() and asyncio.set_event_loop(loop) to no avail (got basically the same error). For reference here is my code:

import os
import pandas as pd
import numpy as np
import asyncio
import json
from collections import deque
from alpaca_trade_api.stream import Stream
from alpaca_trade_api.rest import REST
from dotenv import load_dotenv

load_dotenv()
ALPACA_API_KEY = os.getenv("ALPACA_API_KEY")
ALPACA_SECRET_KEY = os.getenv("ALPACA_SECRET_KEY")
BASE_URL = "/v2"  

SYMBOL = "AAPL"
DATA_QUEUE = deque(maxlen=1000)  

rest_api = REST(ALPACA_API_KEY, ALPACA_SECRET_KEY, BASE_URL)

PROCESSED_DATA_PATH = "data/processed/"
os.makedirs(PROCESSED_DATA_PATH, exist_ok=True)

COLUMNS = [
    "timestamp", "close", "high", "low", "trade_count", "open", "volume", "vwap",
    "ema_9", "ema_21", "rsi", "macd", "macd_signal", "bollinger_h", "bollinger_l", "order_flow"
]

def preprocess_live_data(df):
    
    nan_threshold = 0.2 * len(df.columns)
    df.dropna(thresh=len(df.columns) - nan_threshold, inplace=True)
    
    df.fillna(method='ffill', inplace=True)  
    df.fillna(method='bfill', inplace=True)  
    df.fillna(df.mean(), inplace=True) 

    return df

def save_live_data(df):
    file_path = os.path.join(PROCESSED_DATA_PATH, f"{SYMBOL}_live_data.csv")
    df.to_csv(file_path, mode='a', header=not os.path.exists(file_path), index=False)

async def handle_trade_update(trade):
    global DATA_QUEUE

    try:
        data = {
            "timestamp": str(trade.timestamp),  
            "close": trade.price,
            "high": trade.price,  
            "low": trade.price,
            "trade_count": trade.size,
            "open": trade.price,
            "volume": trade.size,
            "vwap": trade.price,
            "ema_9": np.nan,
            "ema_21": np.nan,
            "rsi": np.nan,
            "macd": np.nan,
            "macd_signal": np.nan,
            "bollinger_h": np.nan,
            "bollinger_l": np.nan,
            "order_flow": trade.size * np.sign(trade.price)  
        }

        df = pd.DataFrame([data], columns=COLUMNS)
        df = preprocess_live_data(df)

        if not df.empty:
            DATA_QUEUE.append(df)
            save_live_data(df)
            print("Live data processed and saved:", df)
    
    except Exception as e:
        print(f"Error processing trade update: {e}")

async def start_stream():
    stream = Stream(ALPACA_API_KEY, ALPACA_SECRET_KEY, base_url=BASE_URL, data_feed='iex')

    stream.subscribe_trades(handle_trade_update, SYMBOL)
    
    await stream.run()

if __name__ == "__main__":
    try:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(start_stream())
    except KeyboardInterrupt:
        print("Stream interrupted by user.")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        loop.close()

And here is the error im getting:
Error: asyncio.run() cannot be called from a running event loop

<sys>:0: RuntimeWarning: coroutine 'Stream._run_forever' was never awaited

RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Also I'm aware that the error occurs because alpaca api uses asyncio already but my problem is I don't know the fix to it.

I tried running the script provided above and instead of working as intended I got this error:
Error: asyncio.run() cannot be called from a running event loop

<sys>:0: RuntimeWarning: coroutine 'Stream._run_forever' was never awaited

RuntimeWarning: Enable tracemalloc to get the object allocation traceback

I'm trying to create a python script which streams a whole bunch of data live from the alpaca trade api. At first I got an error about not being able to call asyncio.run() from a running event loop, so I used asyncio.new_event_loop() and asyncio.set_event_loop(loop) to no avail (got basically the same error). For reference here is my code:

import os
import pandas as pd
import numpy as np
import asyncio
import json
from collections import deque
from alpaca_trade_api.stream import Stream
from alpaca_trade_api.rest import REST
from dotenv import load_dotenv

load_dotenv()
ALPACA_API_KEY = os.getenv("ALPACA_API_KEY")
ALPACA_SECRET_KEY = os.getenv("ALPACA_SECRET_KEY")
BASE_URL = "https://paper-api.alpaca.markets/v2"  

SYMBOL = "AAPL"
DATA_QUEUE = deque(maxlen=1000)  

rest_api = REST(ALPACA_API_KEY, ALPACA_SECRET_KEY, BASE_URL)

PROCESSED_DATA_PATH = "data/processed/"
os.makedirs(PROCESSED_DATA_PATH, exist_ok=True)

COLUMNS = [
    "timestamp", "close", "high", "low", "trade_count", "open", "volume", "vwap",
    "ema_9", "ema_21", "rsi", "macd", "macd_signal", "bollinger_h", "bollinger_l", "order_flow"
]

def preprocess_live_data(df):
    
    nan_threshold = 0.2 * len(df.columns)
    df.dropna(thresh=len(df.columns) - nan_threshold, inplace=True)
    
    df.fillna(method='ffill', inplace=True)  
    df.fillna(method='bfill', inplace=True)  
    df.fillna(df.mean(), inplace=True) 

    return df

def save_live_data(df):
    file_path = os.path.join(PROCESSED_DATA_PATH, f"{SYMBOL}_live_data.csv")
    df.to_csv(file_path, mode='a', header=not os.path.exists(file_path), index=False)

async def handle_trade_update(trade):
    global DATA_QUEUE

    try:
        data = {
            "timestamp": str(trade.timestamp),  
            "close": trade.price,
            "high": trade.price,  
            "low": trade.price,
            "trade_count": trade.size,
            "open": trade.price,
            "volume": trade.size,
            "vwap": trade.price,
            "ema_9": np.nan,
            "ema_21": np.nan,
            "rsi": np.nan,
            "macd": np.nan,
            "macd_signal": np.nan,
            "bollinger_h": np.nan,
            "bollinger_l": np.nan,
            "order_flow": trade.size * np.sign(trade.price)  
        }

        df = pd.DataFrame([data], columns=COLUMNS)
        df = preprocess_live_data(df)

        if not df.empty:
            DATA_QUEUE.append(df)
            save_live_data(df)
            print("Live data processed and saved:", df)
    
    except Exception as e:
        print(f"Error processing trade update: {e}")

async def start_stream():
    stream = Stream(ALPACA_API_KEY, ALPACA_SECRET_KEY, base_url=BASE_URL, data_feed='iex')

    stream.subscribe_trades(handle_trade_update, SYMBOL)
    
    await stream.run()

if __name__ == "__main__":
    try:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(start_stream())
    except KeyboardInterrupt:
        print("Stream interrupted by user.")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        loop.close()

And here is the error im getting:
Error: asyncio.run() cannot be called from a running event loop

<sys>:0: RuntimeWarning: coroutine 'Stream._run_forever' was never awaited

RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Also I'm aware that the error occurs because alpaca api uses asyncio already but my problem is I don't know the fix to it.

I tried running the script provided above and instead of working as intended I got this error:
Error: asyncio.run() cannot be called from a running event loop

<sys>:0: RuntimeWarning: coroutine 'Stream._run_forever' was never awaited

RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Share Improve this question asked Mar 14 at 6:53 nass holenass hole 12 bronze badges 2
  • 1 This error message means you cannot have two event loops (in one thread) at the same time. If this error is occuring, there must be some problem in the program logic or in the way it uses external async libraries. Trying to start the second event loop differently won't help. – VPfB Commented Mar 14 at 10:07
  • Since I don't see a call to Stream.run_forever in your code nor a call to asyncio.run, it would be good to know from wherein the code you posted these calls arose. Printing a stack trace and the actual code from which the stack trace was generated would be helpful. – Booboo Commented Mar 14 at 19:25
Add a comment  | 

1 Answer 1

Reset to default 0

Based on the source code for Stream class, stream.run() is a call for regular function, not a coroutine, and internally it calls asyncio.run(self._run_forever())

    async def _run_forever(self):
        await asyncio.gather(self._trading_ws._run_forever(),
                             self._data_ws._run_forever(),
                             self._crypto_ws._run_forever(),
                             self._news_ws._run_forever())

    def run(self):
        try:
            asyncio.run(self._run_forever())
        except KeyboardInterrupt:
            print('keyboard interrupt, bye')
            pass

That could be the reason for "asyncio.run() cannot be called from a running event loop" error. Two possible solutions:

I. Don't use asyncio by yourself and let the Stream class do asyncronous stuff

def start_stream(): # regular function, no awaits inside 
    stream = Stream(ALPACA_API_KEY, ALPACA_SECRET_KEY, base_url=BASE_URL, data_feed='iex')

    stream.subscribe_trades(handle_trade_update, SYMBOL)
    
    stream.run() # regular call

if __name__ == "__main__":
    try:
        start_stream() # no messing with asyncio
    except KeyboardInterrupt:
        print("Stream interrupted by user.")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        pass
        # maybe no cleanup needed?

II. Wait for awaitable object of Stream

async def start_stream():
    stream = Stream(ALPACA_API_KEY, ALPACA_SECRET_KEY, base_url=BASE_URL, data_feed='iex')

    stream.subscribe_trades(handle_trade_update, SYMBOL)
    
    await stream._run_forever() # that's what Stream.run runs internally

if __name__ == "__main__":
    try:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(start_stream())
    except KeyboardInterrupt:
        print("Stream interrupted by user.")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        loop.close()

The first one is probably the intended by API providers, but the second lets you use asycio for other tasks.

发布评论

评论列表(0)

  1. 暂无评论