最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

apache spark - Insert overwrite multiple partitions in an external Hive table - Stack Overflow

programmeradmin4浏览0评论

I am trying to overwrite multiple partitions in a large table. Basically I have my main external S3 table sandbox, partitioned by part:

scala> q("select * from sandbox")
+---+------------------------------------+----+
|id |val                                 |part|
+---+------------------------------------+----+
|0  |5d8bad52-5373-4147-8cea-9492bb0d86b9|p0  |
|1  |28ed4d74-43ac-453b-bec9-651337bc18fc|p1  |
|4  |3f958c0f-88a8-4afa-bcf5-f89bcead3712|p4  |
|2  |cb596b60-a12a-4a19-9c37-6a12a036a71d|p2  |
|3  |b69f53d6-6de4-495f-881c-9259204e4a30|p3  |
+---+------------------------------------+----+

and data that represents results of a query:

scala> q("select * from data")
+---+--------------------------------------------+----+
|id |val                                         |part|
+---+--------------------------------------------+----+
|0  |updated_9521f4d0-0717-4025-b0a2-1237bf1b3b34|p0  |
|1  |updated_d4987777-97f5-4676-a464-bd45877868fc|p1  |
+---+--------------------------------------------+----+

I want to merge data into sandbox and overwrite only partitions that exist in sandbox, without touching the rest of them, so basically:

q("insert overwrite sandbox partition (part) select * from data"),

but that query drops p2, p3 and p4, resulting in:

scala> q("select * from sandbox")
+---+--------------------------------------------+----+
|id |val                                         |part|
+---+--------------------------------------------+----+
|0  |updated_9521f4d0-0717-4025-b0a2-1237bf1b3b34|p0  |
|1  |updated_d4987777-97f5-4676-a464-bd45877868fc|p1  |
+---+--------------------------------------------+----+

Alternatively I can q("insert overwrite sandbox partition(part='p1') select * from data where part = 'p1'"), and that achieves my desired result, but only for one partition, and I would strongly prefer to avoid running many queries for this merge.

I could also left join and union the 2 datasets, but I suspect that would defeat the whole purpose of partitioning sandbox in the first place, and data is expected to be a relatively very small portion of sandbox, which is the whole reason for trying to partition in the first place.

Any ideas how to best overwrite multiple, but not all partitions?

EDIT:

Problem Im trying to solve is that I need to overwrite only partitions in source table, and leave the other ones in destination (but not source) as is. Full illustration of the issue from ground up -

scala> def q(s: String) = spark.sql(s).show(false)
q: (s: String)Unit

scala> case class Record(id: Int, `val`: String, part: String)
defined class Record

scala> q("""
     |    create external table sandbox (
     |         id int,
     |         val string,
     |         part string
     |    ) using parquet
     |    partitioned by (
     |         part
     |    )
     |    location 's3://...../sandbox/'"""
     | )
25/03/21 19:22:28 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
++
||
++
++


scala> (0 until 10).map(i => {Record(i, java.util.UUID.randomUUID().toString(), "p" + (i % 5))}).toSeq.toDS.write.format("parquet").partitionBy("part").mode("append").saveAsTable("sandbox")

scala>

scala> q("""
     |     create table data (
     |         id int,
     |         val string,
     |         part string
     |     ) using parquet
     |     partitioned by (
     |          part
     |     )
     |     location 'hdfs:///data'""")
++
||
++
++


scala> (0 until 2).map(i => {Record(i, "updated_" + java.util.UUID.randomUUID().toString(), "p" + (i % 5))}).toSeq.toDS.write.format("parquet").mode("append").partitionBy("part").saveAsTable("data")

scala>

scala> q("select * from data")
+---+--------------------------------------------+----+
|id |val                                         |part|
+---+--------------------------------------------+----+
|0  |updated_ca718ad1-319d-4bbf-9469-6539db7918f2|p0  |
|1  |updated_a3e80d62-c10b-4b05-ac1e-d054f121909c|p1  |
+---+--------------------------------------------+----+


scala> q("select * from sandbox")
+---+------------------------------------+----+
|id |val                                 |part|
+---+------------------------------------+----+
|4  |5707218e-d717-432e-b810-a20c5037de30|p4  |
|9  |5e98e6b4-474f-4674-93a0-76d26ded0679|p4  |
|0  |0dd8e874-9de8-47bb-be83-59c4e6e0f021|p0  |
|2  |d0ffb48e-0dca-493d-b4cb-4417d956145c|p2  |
|7  |7fc72d81-7124-4ce1-94ba-da9c93233f66|p2  |
|1  |53b3776c-89ce-4511-9958-9184a2a99cbf|p1  |
|6  |4dcb8a74-2cb7-4b01-87e6-713d54f79b7d|p1  |
|8  |312053a7-01bd-41a2-b4a6-0200a87f77b6|p3  |
|5  |e6198dc2-2997-4988-99da-1fb2fa525ba7|p0  |
|3  |e45489cc-bdb4-409c-972a-3229df89af73|p3  |
+---+------------------------------------+----+


scala> q("insert overwrite sandbox partition (part) select * from data")
++
||
++
++


scala> q("select * from sandbox")
+---+--------------------------------------------+----+
|id |val                                         |part|
+---+--------------------------------------------+----+
|0  |updated_ca718ad1-319d-4bbf-9469-6539db7918f2|p0  |
|1  |updated_a3e80d62-c10b-4b05-ac1e-d054f121909c|p1  |
+---+--------------------------------------------+----+


scala> q("insert overwrite sandbox partition (part) select * from data where part = 'p0'")
++
||
++
++


scala> q("select * from sandbox")
+---+--------------------------------------------+----+
|id |val                                         |part|
+---+--------------------------------------------+----+
|0  |updated_ca718ad1-319d-4bbf-9469-6539db7918f2|p0  |
+---+--------------------------------------------+----+
发布评论

评论列表(0)

  1. 暂无评论