最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - Why does Google PubSub deliver messages with the same orderingKey in parallel before calling ack()? - Stack Overflo

programmeradmin0浏览0评论

I am working with Google Cloud Pub/Sub, and I am trying to ensure that messages with the same orderingKey are processed in order, as stated in the official documentation. However, I notice that messages with the same orderingKey are delivered in parallel, even though I haven’t yet called message.ack() for the previous message.

Here are the details of my configuration and implementation:

  1. Subscription: Created with --enable-message-ordering to respect message order.
  2. Message Publishing: Messages are published with the exact same orderingKey.
  3. Subscriber: I am using a Node.js subscriber with explicit message handling and asynchronous processing. I ensure that message.ack() is only called after the message has been fully processed.

Code Example:

Publishing:

const { PubSub } = require("@google-cloud/pubsub");
const pubSubClient = new PubSub();

const publishMessage = async (orderingKey, data) => {
  const dataBuffer = Buffer.from(JSON.stringify(data));
  await pubSubClient
    .topic("YOUR_TOPIC_NAME")
    .publishMessage({
      data: dataBuffer,
      orderingKey: orderingKey,
    });
};

publishMessage("my-key", { orderId: "123" });

Subscriber:

const subscription = pubSubClient.subscription("YOUR_SUBSCRIPTION_NAME");

subscription.on("message", async (message) => {
  try {
    console.log(`Message received: ${message.id}, orderingKey: ${message.attributes.orderingKey}`);
    const data = JSON.parse(message.data);

    // Simulate long processing
    await new Promise((resolve) => setTimeout(resolve, 5000));
    console.log(`Message processed: ${data.orderId}`);
  } catch (err) {
    console.error(`Error: ${err}`);
    return;
  } finally {
    console.log(`Calling message.ack() for: ${message.id}`);
    message.ack();
  }
});

Problem:

Despite this configuration:

Messages with the same orderingKey are delivered and executed in parallel. This happens even though I haven’t yet called message.ack().

What I Want:

I want to ensure that Google Pub/Sub processes messages with the same orderingKey sequentially, meaning:

The next message is only delivered after the previous one has been marked as ack().

Questions:

  1. Why does Pub/Sub deliver messages with the same orderingKey in parallel in my case?
  2. How can I configure my subscriber or system to ensure that messages are strictly sequential with orderingKey?

Additional Context:

I am using Node.js. The subscription was created with --enable-message-ordering. I want to avoid adding local logic (like a queue) to manage this manually.

I am working with Google Cloud Pub/Sub, and I am trying to ensure that messages with the same orderingKey are processed in order, as stated in the official documentation. However, I notice that messages with the same orderingKey are delivered in parallel, even though I haven’t yet called message.ack() for the previous message.

Here are the details of my configuration and implementation:

  1. Subscription: Created with --enable-message-ordering to respect message order.
  2. Message Publishing: Messages are published with the exact same orderingKey.
  3. Subscriber: I am using a Node.js subscriber with explicit message handling and asynchronous processing. I ensure that message.ack() is only called after the message has been fully processed.

Code Example:

Publishing:

const { PubSub } = require("@google-cloud/pubsub");
const pubSubClient = new PubSub();

const publishMessage = async (orderingKey, data) => {
  const dataBuffer = Buffer.from(JSON.stringify(data));
  await pubSubClient
    .topic("YOUR_TOPIC_NAME")
    .publishMessage({
      data: dataBuffer,
      orderingKey: orderingKey,
    });
};

publishMessage("my-key", { orderId: "123" });

Subscriber:

const subscription = pubSubClient.subscription("YOUR_SUBSCRIPTION_NAME");

subscription.on("message", async (message) => {
  try {
    console.log(`Message received: ${message.id}, orderingKey: ${message.attributes.orderingKey}`);
    const data = JSON.parse(message.data);

    // Simulate long processing
    await new Promise((resolve) => setTimeout(resolve, 5000));
    console.log(`Message processed: ${data.orderId}`);
  } catch (err) {
    console.error(`Error: ${err}`);
    return;
  } finally {
    console.log(`Calling message.ack() for: ${message.id}`);
    message.ack();
  }
});

Problem:

Despite this configuration:

Messages with the same orderingKey are delivered and executed in parallel. This happens even though I haven’t yet called message.ack().

What I Want:

I want to ensure that Google Pub/Sub processes messages with the same orderingKey sequentially, meaning:

The next message is only delivered after the previous one has been marked as ack().

Questions:

  1. Why does Pub/Sub deliver messages with the same orderingKey in parallel in my case?
  2. How can I configure my subscriber or system to ensure that messages are strictly sequential with orderingKey?

Additional Context:

I am using Node.js. The subscription was created with --enable-message-ordering. I want to avoid adding local logic (like a queue) to manage this manually.

Share Improve this question edited Feb 4 at 13:50 Doug Stevenson 318k36 gold badges454 silver badges472 bronze badges Recognized by Google Cloud Collective asked Feb 4 at 13:06 rafikrafik 675 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

The guarantee that Pub/Sub ordering provides is not that the next message for a key will be delivered only after the previous one is acknowledged, but that the next message for a key will be delivered after the previous one has been sent to the user callback and any synchronous work has completed. There are some more details about this behavior for Node specifically in GitHub:

This is unfortunately a "works as intended", because the guarantee in the library is that you'll receive all of the messages in order within one subscriber RPC callback (meaning, the code will not return to the event loop until it's called the user callback for each message in order). But Node makes that guarantee somewhat less useful because of the asynchrony.

In similar cases, we've been recommending using a queue on the user side, just pushing all of the messages for that key into the queue and then executing their callbacks one at a time.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论
ok 不同模板 switch ($forum['model']) { /*case '0': include _include(APP_PATH . 'view/htm/read.htm'); break;*/ default: include _include(theme_load('read', $fid)); break; } } break; case '10': // 主题外链 / thread external link http_location(htmlspecialchars_decode(trim($thread['description']))); break; case '11': // 单页 / single page $attachlist = array(); $imagelist = array(); $thread['filelist'] = array(); $threadlist = NULL; $thread['files'] > 0 and list($attachlist, $imagelist, $thread['filelist']) = well_attach_find_by_tid($tid); $data = data_read_cache($tid); empty($data) and message(-1, lang('data_malformation')); $tidlist = $forum['threads'] ? page_find_by_fid($fid, $page, $pagesize) : NULL; if ($tidlist) { $tidarr = arrlist_values($tidlist, 'tid'); $threadlist = well_thread_find($tidarr, $pagesize); // 按之前tidlist排序 $threadlist = array2_sort_key($threadlist, $tidlist, 'tid'); } $allowpost = forum_access_user($fid, $gid, 'allowpost'); $allowupdate = forum_access_mod($fid, $gid, 'allowupdate'); $allowdelete = forum_access_mod($fid, $gid, 'allowdelete'); $access = array('allowpost' => $allowpost, 'allowupdate' => $allowupdate, 'allowdelete' => $allowdelete); $header['title'] = $thread['subject']; $header['mobile_link'] = $thread['url']; $header['keywords'] = $thread['keyword'] ? $thread['keyword'] : $thread['subject']; $header['description'] = $thread['description'] ? $thread['description'] : $thread['brief']; $_SESSION['fid'] = $fid; if ($ajax) { empty($conf['api_on']) and message(0, lang('closed')); $apilist['header'] = $header; $apilist['extra'] = $extra; $apilist['access'] = $access; $apilist['thread'] = well_thread_safe_info($thread); $apilist['thread_data'] = $data; $apilist['forum'] = $forum; $apilist['imagelist'] = $imagelist; $apilist['filelist'] = $thread['filelist']; $apilist['threadlist'] = $threadlist; message(0, $apilist); } else { include _include(theme_load('single_page', $fid)); } break; default: message(-1, lang('data_malformation')); break; } ?>