I have a large (TBs) dataset consisting of multiple tables. The end user needs to be able to join them efficiently. The unique key for each record is continuous, and it is important that records with key values close to one another are partitioned together.
The obvious solution is to use partitionByRange
on both dataframes prior to joining, but Spark 3.5.2 insists on shuffling by hash when joining tables, even though this should not be necessary.
The alternative strategy is to bucket and sort on this key - this avoids shuffling when joining, and is 4x faster on a small subset (and that difference will increase with data volume).
The key difference in .explain()
is this:
+- Exchange hashpartitioning(…), ENSURE_REQUIREMENTS, [id=#388]
This line is not present if I bucket and sort instead of using partitionByRange
.
For various reasons specific to my use case, I don't want to use the bucketing workaround- and I'd like to find out why Spark is behaving like this anyway.
Why does Spark insist on doing the full shuffle after partitionByRange? Isn't this a pretty major flaw/bug?
I have a large (TBs) dataset consisting of multiple tables. The end user needs to be able to join them efficiently. The unique key for each record is continuous, and it is important that records with key values close to one another are partitioned together.
The obvious solution is to use partitionByRange
on both dataframes prior to joining, but Spark 3.5.2 insists on shuffling by hash when joining tables, even though this should not be necessary.
The alternative strategy is to bucket and sort on this key - this avoids shuffling when joining, and is 4x faster on a small subset (and that difference will increase with data volume).
The key difference in .explain()
is this:
+- Exchange hashpartitioning(…), ENSURE_REQUIREMENTS, [id=#388]
This line is not present if I bucket and sort instead of using partitionByRange
.
For various reasons specific to my use case, I don't want to use the bucketing workaround- and I'd like to find out why Spark is behaving like this anyway.
Why does Spark insist on doing the full shuffle after partitionByRange? Isn't this a pretty major flaw/bug?
Share Improve this question asked Mar 27 at 10:50 as2bbgFtas2bbgFt 515 bronze badges 3- Any chance of considering the answer (for approval)? – Ged Commented Mar 30 at 8:39
- @Ged - yup, sorry - I'm discussing the problem with the team so hoping to either accept or add clarification today – as2bbgFt Commented Mar 31 at 11:06
- Well it is pretty clear cut. – Ged Commented Mar 31 at 19:14
2 Answers
Reset to default 1Well, I think it is simple.
If you have N partitions in, say, 2 DataFramees to be JOINed, what is to say the range of values (distribution) is exactly the same? If they are not the same, say upper and lower bounded partition values do not exactly match over all partitions, then a shuffle will be needed.
bucketBy is more when writing to disk for the next Job in the pipeline to read for a JOIN, and from what I understand and have seen there can be sometimes still a shuffle. The Databricks examples are contrived.
Turns out that is that this is essentially a flaw of Spark, and the correct solution is to use Iceberg to partition data to use storage-partitioned joins: https://medium/expedia-group-tech/turbocharge-efficiency-slash-costs-mastering-spark-iceberg-joins-with-storage-partitioned-join-03fdc1ff75c0.