We are using google cloud pub/sub with Python pubsub_v1.PublisherClient
. We have also enabled key ordering on both the subscription and the Python publisher. After the enablement of the key ordering, we are seeing duplicate published messages quite often-- anywhere from 30 to 50% of the messages.
Here is the code with some sample messages:
from google.cloud import pubsub_v1
import dataclasses
import json
import typing
class GCPubsubPublisher:
def __init__(self, publisher_client: pubsub_v1.PublisherClient):
self.publisher_client = publisher_client
def serialize_data(self, data: typing.Any) -> bytes:
if data is None:
raise ValueError("data cannot be None")
try:
if dataclasses.is_dataclass(data):
data = dataclasses.asdict(data)
if not isinstance(data, str):
data = json.dumps(data)
return data.encode("utf-8") # type: ignore[no-any-return]
except Exception as exc:
raise Exception(f"{data} cannot be serialized, Error: {exc}") from exc
def publish(
self,
topic: str,
data: typing.Any,
async_wait_for_result: bool,
ordering_key: str = "",
**kwargs: typing.Dict[str, typing.Any],
) -> str | None:
serialized_data = self.serialize_data(data)
future = self.publisher_client.publish(
topic=topic, data=serialized_data, ordering_key=ordering_key, **kwargs
)
# Calling future.result() waits asynchronously until the message has been published successfully
# and returns the generated message_id
return None if not async_wait_for_result else future.result()
@staticmethod
def build() -> "GCPubsubPublisher":
publisher_options = pubsub_v1.types.PublisherOptions(
enable_message_ordering=True, timeout=300
)
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)
return GCPubsubPublisher(publisher)
@staticmethod
def get_topic_path(project_name: str, topic_name: str) -> str:
return pubsub_v1.PublisherClient.topic_path(project_name, topic_name)
if __name__ == "__main__":
pubsub_client = GCPubsubPublisher.build()
ordering_key1 = "50030469_bill_of_sales"
form_payload1 = {
"application_id": "50030469",
"form_path": "gs://form_path/bill_of_sales.pdf",
"timestamp": "2025-01-31T19:24:22+0000",
"form_hash": "9191b4f4ea85d3a2be0f6c2f1cf513a5a40e301bc6306fdca49ee8c88144b696",
}
future = pubsub_client.publish(
topic="projects/project_name/topics/form_submission",
data=form_payload1,
async_wait_for_result=True,
ordering_key=ordering_key1,
)
ordering_key2 = "50030469_csc"
form_payload2 = {
"application_id": "50030469",
"form_path": "gs://form_path/csc.pdf",
"timestamp": "2025-01-31T19:24:22+0000",
"form_hash": "9191b4f4ea85d3a2be0f6c2f1cf513a5a40e301bc6306fdca49ee8c88144b696",
}
future = pubsub_client.publish(
topic="projects/project_name/topics/form_submission",
data=form_payload2,
async_wait_for_result=True,
ordering_key=ordering_key2,
)
Here are the duplicate messages-- you can see the message id is exactly the same for both of these: log output
Not sure if this is related to this or not-- but the message id comes in as both message_id
as well as messageId
. We never explicitly pass the message id-- not sure why we have two different variables for this?