Is there a way to isolate pipelines between jobs or threads?
My Redis wrapper has 2 methods:
def queue_redis_message_on_pipeline(message, pipeline):
pipe = pipeline or get_websocket_redis_client().pipeline()
...
pipe.publish(message)
def execute_redis_pipeline(pipeline):
pipe = pipeline or get_websocket_redis_client().pipeline()
pipe.execute
It's all the same pipeline singleton. There are async workers that are using the same pipeline but may either A. flush the pipeline prematurely B. compound the pipeline and cause a network connection Out Of Memory error. Any suggestions?
Is there a way to isolate pipelines between jobs or threads?
My Redis wrapper has 2 methods:
def queue_redis_message_on_pipeline(message, pipeline):
pipe = pipeline or get_websocket_redis_client().pipeline()
...
pipe.publish(message)
def execute_redis_pipeline(pipeline):
pipe = pipeline or get_websocket_redis_client().pipeline()
pipe.execute
It's all the same pipeline singleton. There are async workers that are using the same pipeline but may either A. flush the pipeline prematurely B. compound the pipeline and cause a network connection Out Of Memory error. Any suggestions?
Share Improve this question asked Feb 14 at 22:36 baumannalexjbaumannalexj 8752 gold badges11 silver badges21 bronze badges1 Answer
Reset to default 0Create a separate pipeline for each thread or worker using threading.local():
import threading
thread_local = threading.local()
def get_thread_local_pipeline():
if not hasattr(thread_local, 'pipeline'):
thread_local.pipeline = get_websocket_redis_client().pipeline()
return thread_local.pipeline
def queue_redis_message_on_pipeline(message):
pipe = get_thread_local_pipeline()
#...
pipe.publish(message)
def execute_redis_pipeline():
pipe = get_thread_local_pipeline()
pipe.execute()