最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Tune spark sql due to skewed data - Stack Overflow

programmeradmin8浏览0评论

The SQL in question:

WITH
first_cte AS (
    SELECT
        s.two_id,
        s.d_month,
        COUNT(*)               AS total,
        COUNT(DISTINCT one_id) AS unique_one_id_count 
    FROM iceberg_db.tab s
    GROUP BY s.two_id, s.d_month
)
select * 
from first_cte

I run it in glue on g8x and 32 workers and it takes long time. The data is about 300 million records which generate 700k records. The problem is that the data is skewed as there as a few two_id with many one_id while most of the two_id doesn't have many. Resulting that most of the tasks waiting on a few.

I was reading about a few techniques to solve this issue. Salting, Storage Partition Join, Broadcast, explode/COLLECT_SET/flattrn technique, and others. SPJ and broadcast are not relevant as they relate to join (although I did try the spark conf and hints). explode/COLLECT_SET/flattrn requires all unique values to be in the memory so I dropped it. which leaves salting, since I am doing count distinct I can't do it with RAND as I will lose the unique count (I can only get APPOX), so it leaves me with the following solution:

WITH
first_cte AS (
    SELECT
        s.one_id,
        s.two_id,
        s.d_month,
        COUNT(*)        AS total
    FROM iceberg_db.tab s
    GROUP BY s.one_id, s.two_id, s.d_month
),
second_cte AS (
    SELECT
        s.two_id, 
        s.d_month,
        SUM(s.total) AS total,
        COUNT(*)     AS unique_one_id_count,
    FROM first_cte s
    GROUP BY s.two_id, s.d_month
)
select * 
from second_cte

I get a much better performance like this. However, I do wonder if I can get even better performance as the data skew issue did not go away, it just became smaller as we still have some tasks that take longer as s.one_id have much more data for some s.two_id than others.

The SQL in question:

WITH
first_cte AS (
    SELECT
        s.two_id,
        s.d_month,
        COUNT(*)               AS total,
        COUNT(DISTINCT one_id) AS unique_one_id_count 
    FROM iceberg_db.tab s
    GROUP BY s.two_id, s.d_month
)
select * 
from first_cte

I run it in glue on g8x and 32 workers and it takes long time. The data is about 300 million records which generate 700k records. The problem is that the data is skewed as there as a few two_id with many one_id while most of the two_id doesn't have many. Resulting that most of the tasks waiting on a few.

I was reading about a few techniques to solve this issue. Salting, Storage Partition Join, Broadcast, explode/COLLECT_SET/flattrn technique, and others. SPJ and broadcast are not relevant as they relate to join (although I did try the spark conf and hints). explode/COLLECT_SET/flattrn requires all unique values to be in the memory so I dropped it. which leaves salting, since I am doing count distinct I can't do it with RAND as I will lose the unique count (I can only get APPOX), so it leaves me with the following solution:

WITH
first_cte AS (
    SELECT
        s.one_id,
        s.two_id,
        s.d_month,
        COUNT(*)        AS total
    FROM iceberg_db.tab s
    GROUP BY s.one_id, s.two_id, s.d_month
),
second_cte AS (
    SELECT
        s.two_id, 
        s.d_month,
        SUM(s.total) AS total,
        COUNT(*)     AS unique_one_id_count,
    FROM first_cte s
    GROUP BY s.two_id, s.d_month
)
select * 
from second_cte

I get a much better performance like this. However, I do wonder if I can get even better performance as the data skew issue did not go away, it just became smaller as we still have some tasks that take longer as s.one_id have much more data for some s.two_id than others.

Share Improve this question asked Mar 21 at 12:04 NirNir 2,6579 gold badges48 silver badges75 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

I think the multi-step aggregation makes sense, you can try to use a salting column instead of s.one_id to have more control over the number of partitions to use per s.two_id

WITH
raw_data AS (
  SELECT
    s.two_id,
    FLOOR(RAND(123456)*<NUMBER_OF_BUCKETS>)) as salted_key,
    s.d_month
  FROM iceberg_db.tab s
),
first_cte AS (
  SELECT
    two_id,
    salted_key,
    d_month,
    COUNT(*) AS total
  FROM raw_data
  GROUP BY two_id, salted_key, d_month
),
second_cte AS (
  SELECT
    s.two_id, 
    s.d_month,
    SUM(s.total) AS total,
    COUNT(*)     AS unique_one_id_count,
  FROM first_cte s
  GROUP BY two_id, d_month
)
select * 
from second_cte

Here you can control the number of buckets per key using NUMBER_OF_BUCKETS, so if 100 is selected for example, then for each key data will be partitioned into 100 bucket, but keep in mind that too much buckets per key can hurt the performance of other keys with low data, so try different numbers relevant to your data

发布评论

评论列表(0)

  1. 暂无评论