I'm using gremlin-python (v3.7.3) in a multi-threaded Python application, sharing a single gremlin_python.driver.client.Client instance across threads. The client is configured with pool_size = 8 by default.
Occasionally, a connection within the client's internal pool (self._pool) gets closed by the server or an intermediary (e.g., idle timeout, TooManyRequests from CosmosDB - Gremlin). When client.submit() picks up this dead connection, it fails.
My current solution is to catch the exception, verify the connection is likely dead (e.g., using a quick g.inject(1) check), acquire a lock, close() the old client instance, and create an entirely new Client instance to replace it.
My question:
Is recreating the entire Client the intended way to handle individual closed connections within its pool? Does gremlin-python offer a built-in way to detect, discard, and replace only the failed connection in the pool automatically, allowing other healthy connections to remain active? If not, what's the recommended best practice for robustly handling this?
I've reviewed the documentation and source (client.py) but don't see an explicit mechanism for automatic replacement of bad connections in the pool.
_refresh_connection refreshes all the client (so all the pool --> inefficient)
@telemetry
def _is_connection_open(self) -> bool:
"""
Checks if the Gremlin connection is open and responsive using g.inject(1).
Args:
graph_client: The Gremlin client instance.
Returns:
True if the connection is open (and the server responded), False otherwise.
"""
try:
logger.debug("Checking if connection is open...")
# Use g.inject(1) for a reliable, lightweight connection check.
results = self.client.submit("g.inject(1)")
result = results.all().result() # Get the actual result list
if result == [1]:
return True # Connection is open and responsive.
else:
msg = f"Unexpected response to g.inject(1): {result}. Connection may be unstable."
logger.warning(msg)
raise GremlinConnectionException(msg)
except Exception as e:
logger.exception("Connection check failed. Connection is likely closed or the server is unreachable.")
return False
@telemetry
def _refresh_connection(self):
"""Safely closes and re-establishes the Gremlin connection."""
with self._LOCK:
logger.warning("Refreshing Gremlin connection...")
try:
# Introduce a timeout for the close operation
try:
close_thread = threading.Thread(target=self.client.close)
close_thread.daemon = True # Allow the main thread to exit even if this thread is running
close_thread.start()
close_thread.join(timeout=3) # Wait for the thread to complete, with a timeout
if close_thread.is_alive():
logger.warning("Connection close timed out. Attempting to re-establish.")
# No way to reliably kill the thread, so just log a warning
logger.info("Connection closed successfully within timeout.")
except asyncio.TimeoutError:
logger.warning("Connection close timed out. Assuming connection is dead.")
except Exception as close_e:
logger.exception(f"Error closing connection before refresh: {close_e}. Trying to re-enstablish it anyways...")
self.client = get_connection(self._config) # Reestablish the connection
logger.warning("Connection to Gremlin refreshed.")
@telemetry
def _submit_query(self, q: str):
try:
return self.client.submit(q)
except Exception as e:
logger.error(f"Query failed, trying to recreate the connection...", exc_info=True)
"""
Only if the connection is not alive, we re-open it and directly retry the query, Otherwise, raise.
"""
if not self._is_connection_open():
logger.warning("Connection was not open. Refreshing Gremlin connection...")
self._refresh_connection()
return self.client.submit(q)
else:
raise
I'm using gremlin-python (v3.7.3) in a multi-threaded Python application, sharing a single gremlin_python.driver.client.Client instance across threads. The client is configured with pool_size = 8 by default.
Occasionally, a connection within the client's internal pool (self._pool) gets closed by the server or an intermediary (e.g., idle timeout, TooManyRequests from CosmosDB - Gremlin). When client.submit() picks up this dead connection, it fails.
My current solution is to catch the exception, verify the connection is likely dead (e.g., using a quick g.inject(1) check), acquire a lock, close() the old client instance, and create an entirely new Client instance to replace it.
My question:
Is recreating the entire Client the intended way to handle individual closed connections within its pool? Does gremlin-python offer a built-in way to detect, discard, and replace only the failed connection in the pool automatically, allowing other healthy connections to remain active? If not, what's the recommended best practice for robustly handling this?
I've reviewed the documentation and source (client.py) but don't see an explicit mechanism for automatic replacement of bad connections in the pool.
_refresh_connection refreshes all the client (so all the pool --> inefficient)
@telemetry
def _is_connection_open(self) -> bool:
"""
Checks if the Gremlin connection is open and responsive using g.inject(1).
Args:
graph_client: The Gremlin client instance.
Returns:
True if the connection is open (and the server responded), False otherwise.
"""
try:
logger.debug("Checking if connection is open...")
# Use g.inject(1) for a reliable, lightweight connection check.
results = self.client.submit("g.inject(1)")
result = results.all().result() # Get the actual result list
if result == [1]:
return True # Connection is open and responsive.
else:
msg = f"Unexpected response to g.inject(1): {result}. Connection may be unstable."
logger.warning(msg)
raise GremlinConnectionException(msg)
except Exception as e:
logger.exception("Connection check failed. Connection is likely closed or the server is unreachable.")
return False
@telemetry
def _refresh_connection(self):
"""Safely closes and re-establishes the Gremlin connection."""
with self._LOCK:
logger.warning("Refreshing Gremlin connection...")
try:
# Introduce a timeout for the close operation
try:
close_thread = threading.Thread(target=self.client.close)
close_thread.daemon = True # Allow the main thread to exit even if this thread is running
close_thread.start()
close_thread.join(timeout=3) # Wait for the thread to complete, with a timeout
if close_thread.is_alive():
logger.warning("Connection close timed out. Attempting to re-establish.")
# No way to reliably kill the thread, so just log a warning
logger.info("Connection closed successfully within timeout.")
except asyncio.TimeoutError:
logger.warning("Connection close timed out. Assuming connection is dead.")
except Exception as close_e:
logger.exception(f"Error closing connection before refresh: {close_e}. Trying to re-enstablish it anyways...")
self.client = get_connection(self._config) # Reestablish the connection
logger.warning("Connection to Gremlin refreshed.")
@telemetry
def _submit_query(self, q: str):
try:
return self.client.submit(q)
except Exception as e:
logger.error(f"Query failed, trying to recreate the connection...", exc_info=True)
"""
Only if the connection is not alive, we re-open it and directly retry the query, Otherwise, raise.
"""
if not self._is_connection_open():
logger.warning("Connection was not open. Refreshing Gremlin connection...")
self._refresh_connection()
return self.client.submit(q)
else:
raise
Share
Improve this question
edited Apr 2 at 12:04
marcoleino
asked Apr 1 at 9:32
marcoleinomarcoleino
112 bronze badges
New contributor
marcoleino is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
1
- Could you share the code what you tried – Balaji Commented Apr 1 at 10:33
1 Answer
Reset to default 0gremlin-python
does not open a mechanism to Monitor or reset individual connections within its internal connection pool. Discard only one dead connection from self._pool
. When any pooled WebSocket connection dies, it remains in the pool until it's picked for use and then causes a failure. This is the case which you are now facing with.
Try with the below code, it handles closed connections in gremlin-python client's internal pool without full recreation. It defines a thread-safe wrapper around the Gremlin client gremlin_python
for Azure CosmosDB Gremlin API. It automatically detects and recovers from dead WebSocket connections within the client’s internal connection pool. Also, checks connection health with g.inject(1)
and compatible with Windows asyncio event loop handling. Uses retry with exponential backoff on any cause of a failure. It Auto-reconnects when connections are closed.
logging.basicConfig(level=logging.DEBUG)
class ThreadSafeGremlinClient:
def __init__(self, gremlin_endpoint, username, password, pool_size=8, max_workers=4):
self.gremlin_endpoint = gremlin_endpoint
self.username = username
self.password = password
self.pool_size = pool_size
self.max_workers = max_workers
self._lock = threading.Lock()
self._create_client()
def _create_client(self):
logging.info("Creating new Gremlin client...")
self.client = client.Client(
self.gremlin_endpoint, 'g',
username=self.username,
password=self.password,
pool_size=self.pool_size,
max_workers=self.max_workers,
message_serializer=serializer.GraphSONSerializersV2d0()
)
def _is_connection_open(self):
try:
results = self.client.submit("g.inject(1)").all().result()
return results == [1]
except Exception as e:
logging.warning(f"Connection check failed: {e}")
return False
def _refresh_connection(self):
with self._lock:
logging.warning("Refreshing Gremlin connection...")
try:
self.client.close()
except Exception as e:
logging.warning(f"Error closing client: {e}")
self._create_client()
def run_query_with_retry(self, query, retries=3):
for attempt in range(retries):
try:
return self.client.submit(query).all().result()
except (WebSocketClosedError, OSError) as e:
logging.error(f"Query failed due to connection issue: {e}")
if attempt < retries - 1:
time.sleep(2 ** attempt)
with self._lock:
if not self._is_connection_open():
self._refresh_connection()
else:
raise
except Exception as e:
logging.error(f"Unexpected query failure: {e}")
raise
if __name__ == "__main__":
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
gremlin_client = ThreadSafeGremlinClient(
gremlin_endpoint="wss://azcgremapidb.gremlin.cosmos.azure:443/",
username="/dbs/<db>/colls/<col>",
password="<primaryKey>"
)
query = "g.V().count()"
try:
result = gremlin_client.run_query_with_retry(query)
print("Query result:", result)
except Exception as e:
print("Final failure:", e)
Output:
INFO:root:Creating new Gremlin client...
INFO:gremlinpython:Creating Client with url 'wss://azcgremapidb.gremlin.cosmos.azure:443/'
DEBUG:gremlinpython:message 'g.V().count()'
DEBUG:gremlinpython:processor='', op='eval', args='{'gremlin': 'g.V().count()', 'aliases': {'g': 'g'}}'
DEBUG:asyncio:Using selector: SelectSelector
Query result: [0]