I have umbrella app where i have two repos in witch one needs some data from second and that's why.And app here is just poc to check if it has any sense...
I am facing an issue with product cache synchronization in my Elixir application. I have implemented caching for product data using the Oban background job processing library, but the cache is not being updated correctly after a product is updated in the database. Below is a brief overview of the setup and what I have tried.
Problem:
When a product is updated in the Catalog.Products module, I want to update the cache as well. The product sync job is supposed to handle this by sending notifications through Oban, which should trigger a sync job in the Orders app to update the cached data. However, the cache does not seem to be updated correctly after the sync job runs. Instead, it just logs that the sync job is completed but the actual cache data is not reflecting the changes.
Configuration:
I have configured both Catalog.Oban and Orders.Oban in their respective config files:
config :catalog, Catalog.Oban,
repo: Catalog.Repo,
plugins: [
{Oban.Plugins.Pruner, max_age: 300}
],
notifier: Oban.Notifiers.PG,
queues: [default: 10, sync: 5]
config :orders, Orders.Oban,
repo: Orders.Repo,
plugins: [
{Oban.Plugins.Pruner, max_age: 300}
],
notifier: Oban.Notifiers.PG,
queues: [default: 10]
and application configs
Product
defmodule Catalog.Application do
# See .html
# for more information on OTP Applications
@moduledoc false
use Application
@impl true
def start(_type, _args) do
children = [
Catalog.Repo,
Catalog.Oban,
{DNSCluster, query: Application.get_env(:catalog, :dns_cluster_query) || :ignore},
{Phoenix.PubSub, name: Catalog.PubSub}
# Start a worker by calling: Catalog.Worker.start_link(arg)
# {Catalog.Worker, arg}
]
Supervisor.start_link(children, strategy: :one_for_one, name: Catalog.Supervisor)
end
end
Orders
defmodule Orders.Application do
# See .html
# for more information on OTP Applications
@moduledoc false
use Application
@impl true
def start(_type, _args) do
children = [
Orders.Repo,
Orders.Oban,
Orders.Sync.Listener,
{DNSCluster, query: Application.get_env(:orders, :dns_cluster_query) || :ignore},
{Phoenix.PubSub, name: Orders.PubSub}
# Start a worker by calling: Orders.Worker.start_link(arg)
# {Orders.Worker, arg}
]
Supervisor.start_link(children, strategy: :one_for_one, name: Orders.Supervisor)
end
end
Workflow:
The product data is updated in the Catalog.Products module, which triggers a sync job in the Catalog.Jobs.ProductSyncJob module.
The ProductSyncJob sends a notification (:catalog_product_updated) through Oban.
The Orders.Sync.Listener listens for this notification and schedules a sync job to update the product cache.
Issue:
After a product is updated in the Catalog.Products module, the logs show that the sync job was processed, but the cache is not actually updated.
Example log snippet after a product update:
11:39:05.159 [info] Syncing product ID: 1, action: updated
11:39:05.159 [info] Catalog.Oban notifier status: :solitary
11:39:05.160 [debug] QUERY OK source="products" db=1.0ms idle=94.3ms
SELECT p0."id", p0."name", p0."description", p0."price", p0."sku", p0."inventory_count", p0."active", p0."inserted_at", p0."updated_at" FROM "products" AS p0 WHERE (p0."id" = $1) [1]
↳ Catalog.Jobs.ProductSyncJob.perform/1, at: lib/catalog/jobs/product_sync_job.ex:18
11:39:05.161 [info] Sending notification on channel :catalog_product_updated with payload: %{active: true, name: "Smartphoned", description: "Latest model with high-resolution camera", action: "updated", product_id: 1, price: Decimal.new("799.99"), sku: "PHONE001", inventory_count: 100}
Code:
ProductSyncJob:
defmodule Catalog.Jobs.ProductSyncJob do
use Oban.Worker,
queue: :sync,
max_attempts: 3
require Logger
alias Catalog.Products
@impl Oban.Worker
def perform(%Oban.Job{args: %{"product_id" => product_id, "action" => action}}) do
Logger.info("Syncing product ID: #{product_id}, action: #{action}")
notifier_status = Oban.Notifier.status(Catalog.Oban)
Logger.info("Catalog.Oban notifier status: #{inspect(notifier_status)}")
product =
case action do
"deleted" -> %{id: product_id, deleted: true}
_ -> Products.get_product!(product_id)
end
# Send notification with relevant product data
payload =
case action do
"deleted" ->
%{
product_id: product_id,
action: "deleted"
}
_ ->
%{
product_id: product.id,
name: product.name,
description: product.description,
price: product.price,
sku: product.sku,
inventory_count: product.inventory_count,
active: product.active,
action: action
}
end
Logger.info(
"Sending notification on channel :catalog_product_updated with payload: #{inspect(payload)}"
)
# Use the same channel name format as what Orders.Oban.listen is expecting
Oban.Notifier.notify(Catalog.Oban, :catalog_product_updated, payload)
:ok
end
def schedule_sync(product_id, action) do
%{product_id: product_id, action: action}
|> __MODULE__.new()
|> Catalog.Oban.insert()
end
end
Product module:
defmodule Catalog.Products do
import Ecto.Query
alias Catalog.Repo
alias Catalog.Products.Product
def create_product(attrs \\ %{}, skip_sync \\ false) do
result =
%Product{}
|> Product.changeset(attrs)
|> Repo.insert()
case result do
{:ok, product} ->
unless skip_sync do
Catalog.Jobs.ProductSyncJob.schedule_sync(product.id, "created")
end
{:ok, product}
error ->
error
end
end
def update_product(%Product{} = product, attrs, skip_sync \\ false) do
result =
product
|> Product.changeset(attrs)
|> Repo.update()
case result do
{:ok, updated_product} ->
unless skip_sync do
Catalog.Jobs.ProductSyncJob.schedule_sync(updated_product.id, "updated")
end
{:ok, updated_product}
error ->
error
end
end
def delete_product(%Product{} = product, skip_sync \\ false) do
result = Repo.delete(product)
case result do
{:ok, deleted_product} ->
unless skip_sync do
Catalog.Jobs.ProductSyncJob.schedule_sync(deleted_product.id, "deleted")
end
{:ok, deleted_product}
error ->
error
end
end
end
and orders (second repo in umbrella) code
listiner
defmodule Orders.Sync.Listener do
use GenServer
require Logger
def start_link(_) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
@impl true
def init(_) do
Logger.info("Orders.Sync.Listener initializing...")
notifier_status = Oban.Notifier.status(Orders.Oban)
Logger.info("Orders.Oban notifier status: #{inspect(notifier_status)}")
:ok = Oban.Notifier.listen(Orders.Oban, :catalog_product_updated)
Logger.info("Listening for :catalog_product_updated notifications")
{:ok, %{}}
end
@impl true
def handle_info({:notification, :catalog_product_updated, payload}, state) do
Logger.info("Received product update notification: #{inspect(payload)}")
# Schedule a sync job to update the product cache
%{
product_id: payload["product_id"],
action: payload["action"],
data: payload
}
|> Orders.Sync.Jobs.SyncProductJob.new()
|> Orders.Oban.insert()
{:noreply, state}
end
@impl true
def handle_info(msg, state) do
Logger.debug("Unhandled message in Orders.Sync.Listener: #{inspect(msg)}")
{:noreply, state}
end
end
SyncProductJob
defmodule Orders.Sync.Jobs.SyncProductJob do
use Oban.Worker, queue: :default, max_attempts: 3
require Logger
alias Orders.ProductCache
@impl Oban.Worker
def perform(%Oban.Job{args: %{"product_id" => product_id, "action" => action, "data" => data}}) do
Logger.info("Processing product sync for product ID: #{product_id}, action: #{action}")
case action do
"deleted" ->
handle_deletion(product_id)
"created" ->
handle_creation(data)
"updated" ->
handle_update(data)
_ ->
Logger.error("Unknown action: #{action}")
{:error, "Unknown action"}
end
end
defp handle_deletion(product_id) do
case ProductCache.get_product_cache_by_catalog_id(product_id) do
nil ->
Logger.warning("Product #{product_id} not found in cache, nothing to delete")
:ok
product_cache ->
Logger.info("Deleting product #{product_id} from cache")
ProductCache.delete_product_cache(product_cache)
end
end
defp handle_creation(data) do
# Add more error logging and validation here
Logger.debug("Creation data: #{inspect(data)}")
attrs = %{
catalog_id: data["product_id"],
name: data["name"],
description: data["description"],
price: data["price"],
sku: data["sku"],
inventory_count: data["inventory_count"],
active: data["active"],
last_synced_at: DateTime.utc_now()
}
Logger.info("Creating product #{data["product_id"]} in cache")
case ProductCache.create_product_cache(attrs) do
{:ok, cache} ->
Logger.info("Successfully created product cache for #{data["product_id"]}")
{:ok, cache}
{:error, changeset} ->
Logger.error("Failed to create product cache: #{inspect(changeset)}")
{:error, changeset}
end
end
defp handle_update(data) do
# Add more error logging and validation here
Logger.debug("Update data: #{inspect(data)}")
case ProductCache.get_product_cache_by_catalog_id(data["product_id"]) do
nil ->
handle_creation(data)
product_cache ->
attrs = %{
name: data["name"],
description: data["description"],
price: data["price"],
sku: data["sku"],
inventory_count: data["inventory_count"],
active: data["active"],
last_synced_at: DateTime.utc_now()
}
Logger.info("Updating product #{data["product_id"]} in cache")
case ProductCache.update_product_cache(product_cache, attrs) do
{:ok, cache} ->
Logger.info("Successfully updated product cache for #{data["product_id"]}")
{:ok, cache}
{:error, changeset} ->
Logger.error("Failed to update product cache: #{inspect(changeset)}")
{:error, changeset}
end
end
end
end
What I Expect:
The product cache should be updated after the sync job runs. However, I don’t see the changes in the product cache even though the job executes correctly.