I am developing a real-time market feed system using DhanHQ’s WebSocket API, where I need to:
Subscribe to instruments dynamically (based on a watchlist stored in SQLite). Unsubscribe from removed instruments in real-time. Store market data in a SQLite database for further processing. Ensure WebSocket connection stability without excessive reconnections. However, I am facing multiple issues when integrating WebSocket with dynamic watchlist updates in Python.
System Setup API Provider: DhanHQ WebSocket API Python Version: 3.13 Database: SQLite (storing watchlist & market data) WebSocket Library: websockets (used internally by dhanhq.marketfeed) Asynchronous Handling: asyncio Current Implementation #WebSocket Connection (Using DhanHQ API)
import asyncio
from dhanhq import marketfeed
client_id = "MY_CLIENT_ID"
access_token = "MY_ACCESS_TOKEN"
# Instruments are dynamically updated from watchlist.db
instruments = [(marketfeed.NSE, "3787", marketfeed.Full),
(marketfeed.NSE, "2885", marketfeed.Full)]
def start_websocket_feed():
"""Starts WebSocket connection in a separate thread."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
data = marketfeed.DhanFeed(client_id, access_token, instruments, version="v2")
loop.run_until_complete(data.connect()) # WebSocket connection
2️⃣ Dynamic Watchlist Updates (Using SQLite)
I store my watchlist in a SQLite database (watchlist.db), and I need to:
Check for watchlist updates every X seconds.
Unsubscribe removed instruments.
Subscribe to new instruments without restarting the WebSocket.
python
Copy
Edit
import sqlite3
def get_watchlist():
"""Fetch the latest watchlist from SQLite database."""
conn = sqlite3.connect("watchlist.db")
cursor = conn.cursor()
cursor.execute("SELECT Security_ID, EXCH_ID FROM watchlist")
watchlist = cursor.fetchall()
conn.close()
return [(marketfeed.NSE if ex == 'NSE' else marketfeed.BSE, str(sec), marketfeed.Full) for sec, ex in watchlist]
def update_watchlist():
"""Update WebSocket subscriptions based on watchlist changes."""
global instruments
new_watchlist = set(get_watchlist())
current_watchlist = set(instruments)
# Unsubscribe removed instruments
to_unsubscribe = current_watchlist - new_watchlist
if to_unsubscribe:
print(f"