I'm trying to create a python script which streams a whole bunch of data live from the alpaca trade api. At first I got an error about not being able to call asyncio.run() from a running event loop, so I used asyncio.new_event_loop() and asyncio.set_event_loop(loop) to no avail (got basically the same error). For reference here is my code:
import os
import pandas as pd
import numpy as np
import asyncio
import json
from collections import deque
from alpaca_trade_api.stream import Stream
from alpaca_trade_api.rest import REST
from dotenv import load_dotenv
load_dotenv()
ALPACA_API_KEY = os.getenv("ALPACA_API_KEY")
ALPACA_SECRET_KEY = os.getenv("ALPACA_SECRET_KEY")
BASE_URL = "/v2"
SYMBOL = "AAPL"
DATA_QUEUE = deque(maxlen=1000)
rest_api = REST(ALPACA_API_KEY, ALPACA_SECRET_KEY, BASE_URL)
PROCESSED_DATA_PATH = "data/processed/"
os.makedirs(PROCESSED_DATA_PATH, exist_ok=True)
COLUMNS = [
"timestamp", "close", "high", "low", "trade_count", "open", "volume", "vwap",
"ema_9", "ema_21", "rsi", "macd", "macd_signal", "bollinger_h", "bollinger_l", "order_flow"
]
def preprocess_live_data(df):
nan_threshold = 0.2 * len(df.columns)
df.dropna(thresh=len(df.columns) - nan_threshold, inplace=True)
df.fillna(method='ffill', inplace=True)
df.fillna(method='bfill', inplace=True)
df.fillna(df.mean(), inplace=True)
return df
def save_live_data(df):
file_path = os.path.join(PROCESSED_DATA_PATH, f"{SYMBOL}_live_data.csv")
df.to_csv(file_path, mode='a', header=not os.path.exists(file_path), index=False)
async def handle_trade_update(trade):
global DATA_QUEUE
try:
data = {
"timestamp": str(trade.timestamp),
"close": trade.price,
"high": trade.price,
"low": trade.price,
"trade_count": trade.size,
"open": trade.price,
"volume": trade.size,
"vwap": trade.price,
"ema_9": np.nan,
"ema_21": np.nan,
"rsi": np.nan,
"macd": np.nan,
"macd_signal": np.nan,
"bollinger_h": np.nan,
"bollinger_l": np.nan,
"order_flow": trade.size * np.sign(trade.price)
}
df = pd.DataFrame([data], columns=COLUMNS)
df = preprocess_live_data(df)
if not df.empty:
DATA_QUEUE.append(df)
save_live_data(df)
print("Live data processed and saved:", df)
except Exception as e:
print(f"Error processing trade update: {e}")
async def start_stream():
stream = Stream(ALPACA_API_KEY, ALPACA_SECRET_KEY, base_url=BASE_URL, data_feed='iex')
stream.subscribe_trades(handle_trade_update, SYMBOL)
await stream.run()
if __name__ == "__main__":
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(start_stream())
except KeyboardInterrupt:
print("Stream interrupted by user.")
except Exception as e:
print(f"Error: {e}")
finally:
loop.close()
And here is the error im getting:
Error: asyncio.run() cannot be called from a running event loop
<sys>:0: RuntimeWarning: coroutine 'Stream._run_forever' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Also I'm aware that the error occurs because alpaca api uses asyncio already but my problem is I don't know the fix to it.
I tried running the script provided above and instead of working as intended I got this error:
Error: asyncio.run() cannot be called from a running event loop
<sys>:0: RuntimeWarning: coroutine 'Stream._run_forever' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
I'm trying to create a python script which streams a whole bunch of data live from the alpaca trade api. At first I got an error about not being able to call asyncio.run() from a running event loop, so I used asyncio.new_event_loop() and asyncio.set_event_loop(loop) to no avail (got basically the same error). For reference here is my code:
import os
import pandas as pd
import numpy as np
import asyncio
import json
from collections import deque
from alpaca_trade_api.stream import Stream
from alpaca_trade_api.rest import REST
from dotenv import load_dotenv
load_dotenv()
ALPACA_API_KEY = os.getenv("ALPACA_API_KEY")
ALPACA_SECRET_KEY = os.getenv("ALPACA_SECRET_KEY")
BASE_URL = "https://paper-api.alpaca.markets/v2"
SYMBOL = "AAPL"
DATA_QUEUE = deque(maxlen=1000)
rest_api = REST(ALPACA_API_KEY, ALPACA_SECRET_KEY, BASE_URL)
PROCESSED_DATA_PATH = "data/processed/"
os.makedirs(PROCESSED_DATA_PATH, exist_ok=True)
COLUMNS = [
"timestamp", "close", "high", "low", "trade_count", "open", "volume", "vwap",
"ema_9", "ema_21", "rsi", "macd", "macd_signal", "bollinger_h", "bollinger_l", "order_flow"
]
def preprocess_live_data(df):
nan_threshold = 0.2 * len(df.columns)
df.dropna(thresh=len(df.columns) - nan_threshold, inplace=True)
df.fillna(method='ffill', inplace=True)
df.fillna(method='bfill', inplace=True)
df.fillna(df.mean(), inplace=True)
return df
def save_live_data(df):
file_path = os.path.join(PROCESSED_DATA_PATH, f"{SYMBOL}_live_data.csv")
df.to_csv(file_path, mode='a', header=not os.path.exists(file_path), index=False)
async def handle_trade_update(trade):
global DATA_QUEUE
try:
data = {
"timestamp": str(trade.timestamp),
"close": trade.price,
"high": trade.price,
"low": trade.price,
"trade_count": trade.size,
"open": trade.price,
"volume": trade.size,
"vwap": trade.price,
"ema_9": np.nan,
"ema_21": np.nan,
"rsi": np.nan,
"macd": np.nan,
"macd_signal": np.nan,
"bollinger_h": np.nan,
"bollinger_l": np.nan,
"order_flow": trade.size * np.sign(trade.price)
}
df = pd.DataFrame([data], columns=COLUMNS)
df = preprocess_live_data(df)
if not df.empty:
DATA_QUEUE.append(df)
save_live_data(df)
print("Live data processed and saved:", df)
except Exception as e:
print(f"Error processing trade update: {e}")
async def start_stream():
stream = Stream(ALPACA_API_KEY, ALPACA_SECRET_KEY, base_url=BASE_URL, data_feed='iex')
stream.subscribe_trades(handle_trade_update, SYMBOL)
await stream.run()
if __name__ == "__main__":
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(start_stream())
except KeyboardInterrupt:
print("Stream interrupted by user.")
except Exception as e:
print(f"Error: {e}")
finally:
loop.close()
And here is the error im getting:
Error: asyncio.run() cannot be called from a running event loop
<sys>:0: RuntimeWarning: coroutine 'Stream._run_forever' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Also I'm aware that the error occurs because alpaca api uses asyncio already but my problem is I don't know the fix to it.
I tried running the script provided above and instead of working as intended I got this error:
Error: asyncio.run() cannot be called from a running event loop
<sys>:0: RuntimeWarning: coroutine 'Stream._run_forever' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Share Improve this question asked Mar 14 at 6:53 nass holenass hole 12 bronze badges 2 |1 Answer
Reset to default 0Based on the source code for Stream class, stream.run() is a call for regular function, not a coroutine, and internally it calls asyncio.run(self._run_forever())
async def _run_forever(self):
await asyncio.gather(self._trading_ws._run_forever(),
self._data_ws._run_forever(),
self._crypto_ws._run_forever(),
self._news_ws._run_forever())
def run(self):
try:
asyncio.run(self._run_forever())
except KeyboardInterrupt:
print('keyboard interrupt, bye')
pass
That could be the reason for "asyncio.run() cannot be called from a running event loop" error. Two possible solutions:
I. Don't use asyncio by yourself and let the Stream class do asyncronous stuff
def start_stream(): # regular function, no awaits inside
stream = Stream(ALPACA_API_KEY, ALPACA_SECRET_KEY, base_url=BASE_URL, data_feed='iex')
stream.subscribe_trades(handle_trade_update, SYMBOL)
stream.run() # regular call
if __name__ == "__main__":
try:
start_stream() # no messing with asyncio
except KeyboardInterrupt:
print("Stream interrupted by user.")
except Exception as e:
print(f"Error: {e}")
finally:
pass
# maybe no cleanup needed?
II. Wait for awaitable object of Stream
async def start_stream():
stream = Stream(ALPACA_API_KEY, ALPACA_SECRET_KEY, base_url=BASE_URL, data_feed='iex')
stream.subscribe_trades(handle_trade_update, SYMBOL)
await stream._run_forever() # that's what Stream.run runs internally
if __name__ == "__main__":
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(start_stream())
except KeyboardInterrupt:
print("Stream interrupted by user.")
except Exception as e:
print(f"Error: {e}")
finally:
loop.close()
The first one is probably the intended by API providers, but the second lets you use asycio for other tasks.
Stream.run_forever
in your code nor a call toasyncio.run
, it would be good to know from wherein the code you posted these calls arose. Printing a stack trace and the actual code from which the stack trace was generated would be helpful. – Booboo Commented Mar 14 at 19:25