最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Python WebSocket with DhanHQ Market Feed: Issues with Dynamic Watchlist Updates and asyncio Event Loop - Stack Overflow

programmeradmin3浏览0评论

I am developing a real-time market feed system using DhanHQ’s WebSocket API, where I need to:

Subscribe to instruments dynamically (based on a watchlist stored in SQLite). Unsubscribe from removed instruments in real-time. Store market data in a SQLite database for further processing. Ensure WebSocket connection stability without excessive reconnections. However, I am facing multiple issues when integrating WebSocket with dynamic watchlist updates in Python.

System Setup API Provider: DhanHQ WebSocket API Python Version: 3.13 Database: SQLite (storing watchlist & market data) WebSocket Library: websockets (used internally by dhanhq.marketfeed) Asynchronous Handling: asyncio Current Implementation #WebSocket Connection (Using DhanHQ API)


import asyncio
from dhanhq import marketfeed

client_id = "MY_CLIENT_ID"
access_token = "MY_ACCESS_TOKEN"

# Instruments are dynamically updated from watchlist.db
instruments = [(marketfeed.NSE, "3787", marketfeed.Full), 
               (marketfeed.NSE, "2885", marketfeed.Full)]

def start_websocket_feed():
    """Starts WebSocket connection in a separate thread."""
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    data = marketfeed.DhanFeed(client_id, access_token, instruments, version="v2")
    loop.run_until_complete(data.connect())  # WebSocket connection
2️⃣ Dynamic Watchlist Updates (Using SQLite)
I store my watchlist in a SQLite database (watchlist.db), and I need to:

Check for watchlist updates every X seconds.
Unsubscribe removed instruments.
Subscribe to new instruments without restarting the WebSocket.
python
Copy
Edit
import sqlite3

def get_watchlist():
    """Fetch the latest watchlist from SQLite database."""
    conn = sqlite3.connect("watchlist.db")
    cursor = conn.cursor()
    cursor.execute("SELECT Security_ID, EXCH_ID FROM watchlist")  
    watchlist = cursor.fetchall()
    conn.close()
    return [(marketfeed.NSE if ex == 'NSE' else marketfeed.BSE, str(sec), marketfeed.Full) for sec, ex in watchlist]

def update_watchlist():
    """Update WebSocket subscriptions based on watchlist changes."""
    global instruments
    new_watchlist = set(get_watchlist())
    current_watchlist = set(instruments)

    # Unsubscribe removed instruments
    to_unsubscribe = current_watchlist - new_watchlist
    if to_unsubscribe:
        print(f"

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论