SQL Query - Get the Most Recent Queue Assignment Before Agent Availability I have two tables: Assignment and Status.
The Assignment table records when an AgentID was assigned to a queue (queue_started_at). The Status table records when an AgentID became available (status_started_at). I need an SQL query that returns the most recent queue_started_at for an agent that is immediately before status_started_at.
Example Data: Assignment Table (Tabela_Atribuicao)
agent_id queue_started_at queue
1 2024-12-31 queueA
1 2025-01-03 queueA
1 2025-01-12 queueB
Status Table (Tabela_Status)
agent_id status_started_at
1 2025-01-01
1 2025-01-02
1 2025-01-10
1 2025-01-13
Expected Result:
agent_id queue_started_at status_started_at queue
1 2024-12-31 2025-01-01 queueA
1 2024-12-31 2025-01-02 queueA
1 2025-01-03 2025-01-10 queueB
1 2025-01-12 2025-01-13 queueB
SQL Query - Get the Most Recent Queue Assignment Before Agent Availability I have two tables: Assignment and Status.
The Assignment table records when an AgentID was assigned to a queue (queue_started_at). The Status table records when an AgentID became available (status_started_at). I need an SQL query that returns the most recent queue_started_at for an agent that is immediately before status_started_at.
Example Data: Assignment Table (Tabela_Atribuicao)
agent_id queue_started_at queue
1 2024-12-31 queueA
1 2025-01-03 queueA
1 2025-01-12 queueB
Status Table (Tabela_Status)
agent_id status_started_at
1 2025-01-01
1 2025-01-02
1 2025-01-10
1 2025-01-13
Expected Result:
agent_id queue_started_at status_started_at queue
1 2024-12-31 2025-01-01 queueA
1 2024-12-31 2025-01-02 queueA
1 2025-01-03 2025-01-10 queueB
1 2025-01-12 2025-01-13 queueB
Share
asked Feb 14 at 17:02
marceloasrmarceloasr
3752 gold badges3 silver badges12 bronze badges
3
|
2 Answers
Reset to default 01.First we find all the possible combination of assignment and status.
df_joined = df_status.alias('s').crossJoin(df_assignment.alias('a'))
2.All the valid pairs are then filtered based on when the agent was assigned to a queue versus when the agent became available.
df_filtered = df_joined.filter(col("a.queue_started_at") < col("s.status_started_at"))
3.Finally using a window function, we identify the latest queue_started_at
before each status_started_at
for each agent.
Sample
df_joined = df_status.alias('s').crossJoin(df_assignment.alias('a'))
df_filtered = df_joined.filter(col("a.queue_started_at") < col("s.status_started_at"))
window_spec = Window.partitionBy(col("s.agent_id"), col("s.status_started_at")).orderBy(col("a.queue_started_at").desc())
df_result = df_filtered.withColumn("max_queue_started_at", max_("a.queue_started_at").over(window_spec)) \
.filter(col("a.queue_started_at") == col("max_queue_started_at")) \
.select(col("s.agent_id"), col("a.queue_started_at"), col("s.status_started_at"), col("a.queue"))
Output
+--------+----------------+-----------------+------+
|agent_id|queue_started_at|status_started_at| queue|
+--------+----------------+-----------------+------+
| 1| 2024-12-31| 2025-01-01|queueA|
| 1| 2024-12-31| 2025-01-02|queueA|
| 1| 2025-01-03| 2025-01-10|queueA|
| 1| 2025-01-12| 2025-01-13|queueB|
+--------+----------------+-----------------+------+
Instead of using join, you can combine two dataframes into one using unionAll:
from pyspark.sql import functions as F
# in case more than one queues started at the same date for the same agent_id
assignments = assignment \
.groupby('agent_id','queue_started_at') \
.agg(F.collect_set('queue').alias('queues'))
merged = status.selectExpr(
'agent_id',
'NULL as queue_started_at',
'status_started_at',
'NULL as queues',
'1 as flag'
).unionAll(
assignments.selectExpr(
'agent_id',
'queue_started_at',
'queue_started_at as status_started_at',
'queues',
'0 as flag'
)
)
merged.sort('agent_id','status_started_at','flag').show()
+--------+----------------+-----------------+----------------+----+
|agent_id|queue_started_at|status_started_at| queues|flag|
+--------+----------------+-----------------+----------------+----+
| 1| 2024-12-31| 2024-12-31| [queueA]| 0|
| 1| NULL| 2025-01-01| NULL| 1|
| 1| NULL| 2025-01-02| NULL| 1|
| 1| 2025-01-03| 2025-01-03| [queueA]| 0|
| 1| NULL| 2025-01-10| NULL| 1|
| 1| 2025-01-12| 2025-01-12| [queueB]| 0|
| 1| NULL| 2025-01-13| NULL| 1|
| 2| 2025-01-10| 2025-01-10|[queueA, queueB]| 0|
| 2| NULL| 2025-01-11| NULL| 1|
+--------+----------------+-----------------+----------------+----+
Now your problem becomes a typical fillna
issue (the values flag
will decide how you want to process when the status_started_at
and queue_started_at
are on the same date, and also used for the final filter)
from pyspark.sql import Window
w1 = Window.partitionBy('agent_id').orderBy('status_started_at','flag')
merged.select(
'agent_id',
'status_started_at',
*[F.last(c,True).over(w1).alias(c) for c in ['queue_started_at', 'queues']]
).filter('flag=1').show()
+--------+-----------------+----------------+----------------+
|agent_id|status_started_at|queue_started_at| queues|
+--------+-----------------+----------------+----------------+
| 1| 2025-01-01| 2024-12-31| [queueA]|
| 1| 2025-01-02| 2024-12-31| [queueA]|
| 1| 2025-01-10| 2025-01-03| [queueA]|
| 1| 2025-01-13| 2025-01-12| [queueB]|
| 2| 2025-01-11| 2025-01-10|[queueA, queueB]|
+--------+-----------------+----------------+----------------+
Next you just need to use explode_out(queues)
to convert that array into rows.
As a side note, it's better to combine two columns queue_started_at
,queues
into a struct
column when creating merged
, so only one Window function is required in the final select list. The above example is easier for illustration purpose.
queueA
instead ofqueueB
? – lihao Commented Feb 14 at 19:04