
Python WebSocket with DhanHQ Market Feed: Issues with Dynamic Watchlist Updates and asyncio Event Loop - Stack Overflow


I am developing a real-time market feed system using DhanHQ’s WebSocket API, where I need to:

Subscribe to instruments dynamically (based on a watchlist stored in SQLite). Unsubscribe from removed instruments in real-time. Store market data in a SQLite database for further processing. Ensure WebSocket connection stability without excessive reconnections. However, I am facing multiple issues when integrating WebSocket with dynamic watchlist updates in Python.

System Setup API Provider: DhanHQ WebSocket API Python Version: 3.13 Database: SQLite (storing watchlist & market data) WebSocket Library: websockets (used internally by dhanhq.marketfeed) Asynchronous Handling: asyncio Current Implementation #WebSocket Connection (Using DhanHQ API)

import asyncio
from dhanhq import marketfeed

client_id = "MY_CLIENT_ID"
access_token = "MY_ACCESS_TOKEN"

# Instruments are dynamically updated from watchlist.db
instruments = [(marketfeed.NSE, "3787", marketfeed.Full), 
               (marketfeed.NSE, "2885", marketfeed.Full)]

def start_websocket_feed():
    """Starts WebSocket connection in a separate thread."""
    loop = asyncio.new_event_loop()

    data = marketfeed.DhanFeed(client_id, access_token, instruments, version="v2")
    loop.run_until_complete(data.connect())  # WebSocket connection
2️⃣ Dynamic Watchlist Updates (Using SQLite)
I store my watchlist in a SQLite database (watchlist.db), and I need to:

Check for watchlist updates every X seconds.
Unsubscribe removed instruments.
Subscribe to new instruments without restarting the WebSocket.
import sqlite3

def get_watchlist():
    """Fetch the latest watchlist from SQLite database."""
    conn = sqlite3.connect("watchlist.db")
    cursor = conn.cursor()
    cursor.execute("SELECT Security_ID, EXCH_ID FROM watchlist")  
    watchlist = cursor.fetchall()
    return [(marketfeed.NSE if ex == 'NSE' else marketfeed.BSE, str(sec), marketfeed.Full) for sec, ex in watchlist]

def update_watchlist():
    """Update WebSocket subscriptions based on watchlist changes."""
    global instruments
    new_watchlist = set(get_watchlist())
    current_watchlist = set(instruments)

    # Unsubscribe removed instruments
    to_unsubscribe = current_watchlist - new_watchlist
    if to_unsubscribe:




  1. 暂无评论