Stream
mik*_*ike 6
您可以通过使用 Structured Streaming 提供的流调度功能来实现这一点。
您可以通过创建一个定期刷新静态数据帧的人工“速率”流来触发静态数据帧的刷新(非持久化 -> 加载 -> 持久化)。这个想法是:
最初加载静态数据帧并保持为 var定义一个刷新静态Dataframe的方法使用按所需时间间隔(例如 1 小时)触发的“速率”流读取实际流数据并使用静态 Dataframe 执行连接操作在该 Rate Stream 中有一个接收foreachBatch器,它调用在步骤 2 中创建的 refresher 方法。以下代码在 Spark 3.0.1、Scala 2.12.10 和 Delta 0.7.0 上运行良好。
// 1. Load the staticDataframe initially and keep as `var` var staticDf = spark.read.format("delta").load(deltaPath) staticDf.persist() // 2. Define a method that refreshes the static Dataframe def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = { staticDf.unpersist() staticDf = spark.read.format("delta").load(deltaPath) staticDf.persist() println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake") } // 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour) val staticRefreshStream = spark.readStream .format("rate") .option("rowsPerSecond", 1) .option("numPartitions", 1) .load() .selectExpr("CAST(value as LONG) as trigger") .as[Long] // 4. Read actual streaming data and perform join operation with static Dataframe // As an example I used Kafka as a streaming source val streamingDf = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("startingOffsets", "earliest") .option("failOnDataLoss", "false") .load() .selectExpr("CAST(value AS STRING) as id", "offset as streamingField") val joinDf = streamingDf.join(staticDf, "id") val query = joinDf.writeStream .format("console") .option("truncate", false) .option("checkpointLocation", "/path/to/sparkCheckpoint") .start() // 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method staticRefreshStream.writeStream .outputMode("append") .foreachBatch(foreachBatchMethod[Long] _) .queryName("RefreshStream") .trigger(Trigger.ProcessingTime("5 seconds")) .start()有一个完整的例子,增量表被创建并更新为新值,如下所示:
val deltaPath = "file:///tmp/delta/table" import spark.implicits._ val df = Seq( (1L, "static1"), (2L, "static2") ).toDF("id", "deltaField") df.write .mode(SaveMode.Overwrite) .format("delta") .save(deltaPath)Stream