最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

phoenix framework - Elixir Umbrella App: Oban Notification Not Properly Updating Product Cache Across Repos - Stack Overflow

programmeradmin1浏览0评论

I have umbrella app where i have two repos in witch one needs some data from second and that's why.And app here is just poc to check if it has any sense...

I am facing an issue with product cache synchronization in my Elixir application. I have implemented caching for product data using the Oban background job processing library, but the cache is not being updated correctly after a product is updated in the database. Below is a brief overview of the setup and what I have tried.

Problem:

When a product is updated in the Catalog.Products module, I want to update the cache as well. The product sync job is supposed to handle this by sending notifications through Oban, which should trigger a sync job in the Orders app to update the cached data. However, the cache does not seem to be updated correctly after the sync job runs. Instead, it just logs that the sync job is completed but the actual cache data is not reflecting the changes.

Configuration:

I have configured both Catalog.Oban and Orders.Oban in their respective config files:

config :catalog, Catalog.Oban,
  repo: Catalog.Repo,
  plugins: [
    {Oban.Plugins.Pruner, max_age: 300}
  ],
  notifier: Oban.Notifiers.PG,
  queues: [default: 10, sync: 5]

config :orders, Orders.Oban,
  repo: Orders.Repo,
  plugins: [
    {Oban.Plugins.Pruner, max_age: 300}
  ],
  notifier: Oban.Notifiers.PG,
  queues: [default: 10]

and application configs

Product

defmodule Catalog.Application do
  # See .html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      Catalog.Repo,
      Catalog.Oban,
      {DNSCluster, query: Application.get_env(:catalog, :dns_cluster_query) || :ignore},
      {Phoenix.PubSub, name: Catalog.PubSub}
      # Start a worker by calling: Catalog.Worker.start_link(arg)
      # {Catalog.Worker, arg}
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: Catalog.Supervisor)
  end
end

Orders

defmodule Orders.Application do
  # See .html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      Orders.Repo,
      Orders.Oban,
      Orders.Sync.Listener,
      {DNSCluster, query: Application.get_env(:orders, :dns_cluster_query) || :ignore},
      {Phoenix.PubSub, name: Orders.PubSub}
      # Start a worker by calling: Orders.Worker.start_link(arg)
      # {Orders.Worker, arg}
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: Orders.Supervisor)
  end
end

Workflow:

The product data is updated in the Catalog.Products module, which triggers a sync job in the Catalog.Jobs.ProductSyncJob module.
The ProductSyncJob sends a notification (:catalog_product_updated) through Oban.
The Orders.Sync.Listener listens for this notification and schedules a sync job to update the product cache.

Issue:

After a product is updated in the Catalog.Products module, the logs show that the sync job was processed, but the cache is not actually updated.

Example log snippet after a product update:

11:39:05.159 [info] Syncing product ID: 1, action: updated
11:39:05.159 [info] Catalog.Oban notifier status: :solitary
11:39:05.160 [debug] QUERY OK source="products" db=1.0ms idle=94.3ms
SELECT p0."id", p0."name", p0."description", p0."price", p0."sku", p0."inventory_count", p0."active", p0."inserted_at", p0."updated_at" FROM "products" AS p0 WHERE (p0."id" = $1) [1]
↳ Catalog.Jobs.ProductSyncJob.perform/1, at: lib/catalog/jobs/product_sync_job.ex:18
11:39:05.161 [info] Sending notification on channel :catalog_product_updated with payload: %{active: true, name: "Smartphoned", description: "Latest model with high-resolution camera", action: "updated", product_id: 1, price: Decimal.new("799.99"), sku: "PHONE001", inventory_count: 100}

Code:

ProductSyncJob:

defmodule Catalog.Jobs.ProductSyncJob do
  use Oban.Worker,
    queue: :sync,
    max_attempts: 3

  require Logger
  alias Catalog.Products

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"product_id" => product_id, "action" => action}}) do
    Logger.info("Syncing product ID: #{product_id}, action: #{action}")
    notifier_status = Oban.Notifier.status(Catalog.Oban)
    Logger.info("Catalog.Oban notifier status: #{inspect(notifier_status)}")

    product =
      case action do
        "deleted" -> %{id: product_id, deleted: true}
        _ -> Products.get_product!(product_id)
      end

    # Send notification with relevant product data
    payload =
      case action do
        "deleted" ->
          %{
            product_id: product_id,
            action: "deleted"
          }

        _ ->
          %{
            product_id: product.id,
            name: product.name,
            description: product.description,
            price: product.price,
            sku: product.sku,
            inventory_count: product.inventory_count,
            active: product.active,
            action: action
          }
      end

    Logger.info(
      "Sending notification on channel :catalog_product_updated with payload: #{inspect(payload)}"
    )

    # Use the same channel name format as what Orders.Oban.listen is expecting
    Oban.Notifier.notify(Catalog.Oban, :catalog_product_updated, payload)
    :ok
  end

  def schedule_sync(product_id, action) do
    %{product_id: product_id, action: action}
    |> __MODULE__.new()
    |> Catalog.Oban.insert()
  end
end

Product module:

defmodule Catalog.Products do
  import Ecto.Query
  alias Catalog.Repo
  alias Catalog.Products.Product

 

  def create_product(attrs \\ %{}, skip_sync \\ false) do
    result =
      %Product{}
      |> Product.changeset(attrs)
      |> Repo.insert()

    case result do
      {:ok, product} ->
        unless skip_sync do
          Catalog.Jobs.ProductSyncJob.schedule_sync(product.id, "created")
        end

        {:ok, product}

      error ->
        error
    end
  end

  def update_product(%Product{} = product, attrs, skip_sync \\ false) do
    result =
      product
      |> Product.changeset(attrs)
      |> Repo.update()

    case result do
      {:ok, updated_product} ->
        unless skip_sync do
          Catalog.Jobs.ProductSyncJob.schedule_sync(updated_product.id, "updated")
        end

        {:ok, updated_product}

      error ->
        error
    end
  end

  def delete_product(%Product{} = product, skip_sync \\ false) do
    result = Repo.delete(product)

    case result do
      {:ok, deleted_product} ->
        unless skip_sync do
          Catalog.Jobs.ProductSyncJob.schedule_sync(deleted_product.id, "deleted")
        end

        {:ok, deleted_product}

      error ->
        error
    end
  end
end

and orders (second repo in umbrella) code

listiner

defmodule Orders.Sync.Listener do
  use GenServer
  require Logger

  def start_link(_) do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  @impl true
  def init(_) do
    Logger.info("Orders.Sync.Listener initializing...")
    notifier_status = Oban.Notifier.status(Orders.Oban)
    Logger.info("Orders.Oban notifier status: #{inspect(notifier_status)}")

    :ok = Oban.Notifier.listen(Orders.Oban, :catalog_product_updated)
    Logger.info("Listening for :catalog_product_updated notifications")
    {:ok, %{}}
  end

  @impl true
  def handle_info({:notification, :catalog_product_updated, payload}, state) do
    Logger.info("Received product update notification: #{inspect(payload)}")

    # Schedule a sync job to update the product cache
    %{
      product_id: payload["product_id"],
      action: payload["action"],
      data: payload
    }
    |> Orders.Sync.Jobs.SyncProductJob.new()
    |> Orders.Oban.insert()

    {:noreply, state}
  end

  @impl true
  def handle_info(msg, state) do
    Logger.debug("Unhandled message in Orders.Sync.Listener: #{inspect(msg)}")
    {:noreply, state}
  end
end

SyncProductJob

defmodule Orders.Sync.Jobs.SyncProductJob do
  use Oban.Worker, queue: :default, max_attempts: 3
  require Logger
  alias Orders.ProductCache

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"product_id" => product_id, "action" => action, "data" => data}}) do
    Logger.info("Processing product sync for product ID: #{product_id}, action: #{action}")

    case action do
      "deleted" ->
        handle_deletion(product_id)

      "created" ->
        handle_creation(data)

      "updated" ->
        handle_update(data)

      _ ->
        Logger.error("Unknown action: #{action}")
        {:error, "Unknown action"}
    end
  end

  defp handle_deletion(product_id) do
    case ProductCache.get_product_cache_by_catalog_id(product_id) do
      nil ->
        Logger.warning("Product #{product_id} not found in cache, nothing to delete")
        :ok

      product_cache ->
        Logger.info("Deleting product #{product_id} from cache")
        ProductCache.delete_product_cache(product_cache)
    end
  end

  defp handle_creation(data) do
    # Add more error logging and validation here
    Logger.debug("Creation data: #{inspect(data)}")

    attrs = %{
      catalog_id: data["product_id"],
      name: data["name"],
      description: data["description"],
      price: data["price"],
      sku: data["sku"],
      inventory_count: data["inventory_count"],
      active: data["active"],
      last_synced_at: DateTime.utc_now()
    }

    Logger.info("Creating product #{data["product_id"]} in cache")

    case ProductCache.create_product_cache(attrs) do
      {:ok, cache} ->
        Logger.info("Successfully created product cache for #{data["product_id"]}")
        {:ok, cache}

      {:error, changeset} ->
        Logger.error("Failed to create product cache: #{inspect(changeset)}")
        {:error, changeset}
    end
  end

  defp handle_update(data) do
    # Add more error logging and validation here
    Logger.debug("Update data: #{inspect(data)}")

    case ProductCache.get_product_cache_by_catalog_id(data["product_id"]) do
      nil ->
        handle_creation(data)

      product_cache ->
        attrs = %{
          name: data["name"],
          description: data["description"],
          price: data["price"],
          sku: data["sku"],
          inventory_count: data["inventory_count"],
          active: data["active"],
          last_synced_at: DateTime.utc_now()
        }

        Logger.info("Updating product #{data["product_id"]} in cache")

        case ProductCache.update_product_cache(product_cache, attrs) do
          {:ok, cache} ->
            Logger.info("Successfully updated product cache for #{data["product_id"]}")
            {:ok, cache}

          {:error, changeset} ->
            Logger.error("Failed to update product cache: #{inspect(changeset)}")
            {:error, changeset}
        end
    end
  end
end

What I Expect:

The product cache should be updated after the sync job runs. However, I don’t see the changes in the product cache even though the job executes correctly.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论