The SQL in question:
WITH
first_cte AS (
SELECT
s.two_id,
s.d_month,
COUNT(*) AS total,
COUNT(DISTINCT one_id) AS unique_one_id_count
FROM iceberg_db.tab s
GROUP BY s.two_id, s.d_month
)
select *
from first_cte
I run it in glue on g8x and 32 workers and it takes long time. The data is about 300 million records which generate 700k records. The problem is that the data is skewed as there as a few two_id
with many one_id
while most of the two_id
doesn't have many. Resulting that most of the tasks waiting on a few.
I was reading about a few techniques to solve this issue. Salting, Storage Partition Join, Broadcast, explode
/COLLECT_SET
/flattrn
technique, and others. SPJ and broadcast are not relevant as they relate to join (although I did try the spark conf and hints). explode
/COLLECT_SET
/flattrn
requires all unique values to be in the memory so I dropped it. which leaves salting, since I am doing count distinct I can't do it with RAND
as I will lose the unique count (I can only get APPOX), so it leaves me with the following solution:
WITH
first_cte AS (
SELECT
s.one_id,
s.two_id,
s.d_month,
COUNT(*) AS total
FROM iceberg_db.tab s
GROUP BY s.one_id, s.two_id, s.d_month
),
second_cte AS (
SELECT
s.two_id,
s.d_month,
SUM(s.total) AS total,
COUNT(*) AS unique_one_id_count,
FROM first_cte s
GROUP BY s.two_id, s.d_month
)
select *
from second_cte
I get a much better performance like this. However, I do wonder if I can get even better performance as the data skew issue did not go away, it just became smaller as we still have some tasks that take longer as s.one_id
have much more data for some s.two_id
than others.
The SQL in question:
WITH
first_cte AS (
SELECT
s.two_id,
s.d_month,
COUNT(*) AS total,
COUNT(DISTINCT one_id) AS unique_one_id_count
FROM iceberg_db.tab s
GROUP BY s.two_id, s.d_month
)
select *
from first_cte
I run it in glue on g8x and 32 workers and it takes long time. The data is about 300 million records which generate 700k records. The problem is that the data is skewed as there as a few two_id
with many one_id
while most of the two_id
doesn't have many. Resulting that most of the tasks waiting on a few.
I was reading about a few techniques to solve this issue. Salting, Storage Partition Join, Broadcast, explode
/COLLECT_SET
/flattrn
technique, and others. SPJ and broadcast are not relevant as they relate to join (although I did try the spark conf and hints). explode
/COLLECT_SET
/flattrn
requires all unique values to be in the memory so I dropped it. which leaves salting, since I am doing count distinct I can't do it with RAND
as I will lose the unique count (I can only get APPOX), so it leaves me with the following solution:
WITH
first_cte AS (
SELECT
s.one_id,
s.two_id,
s.d_month,
COUNT(*) AS total
FROM iceberg_db.tab s
GROUP BY s.one_id, s.two_id, s.d_month
),
second_cte AS (
SELECT
s.two_id,
s.d_month,
SUM(s.total) AS total,
COUNT(*) AS unique_one_id_count,
FROM first_cte s
GROUP BY s.two_id, s.d_month
)
select *
from second_cte
I get a much better performance like this. However, I do wonder if I can get even better performance as the data skew issue did not go away, it just became smaller as we still have some tasks that take longer as s.one_id
have much more data for some s.two_id
than others.
1 Answer
Reset to default 0I think the multi-step aggregation makes sense, you can try to use a salting column instead of s.one_id
to have more control over the number of partitions to use per s.two_id
WITH
raw_data AS (
SELECT
s.two_id,
FLOOR(RAND(123456)*<NUMBER_OF_BUCKETS>)) as salted_key,
s.d_month
FROM iceberg_db.tab s
),
first_cte AS (
SELECT
two_id,
salted_key,
d_month,
COUNT(*) AS total
FROM raw_data
GROUP BY two_id, salted_key, d_month
),
second_cte AS (
SELECT
s.two_id,
s.d_month,
SUM(s.total) AS total,
COUNT(*) AS unique_one_id_count,
FROM first_cte s
GROUP BY two_id, d_month
)
select *
from second_cte
Here you can control the number of buckets per key using NUMBER_OF_BUCKETS
, so if 100 is selected for example, then for each key data will be partitioned into 100 bucket, but keep in mind that too much buckets per key can hurt the performance of other keys with low data, so try different numbers relevant to your data