Spark
m_v*_*uri 5
简而言之,不,您不能StorageLevel.OFF_HEAP用于广播变量。
要了解原因,让我们看一下该方法的源代码SparkContext.broadcast(...)。
/** * Broadcast a read-only variable to the cluster, returning a * [[.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions ... */ def broadcast[T: ClassTag](value: T): Broadcast[T] = { : val bc = env.broadcastManager.newBroadcast[T](value, isLocal) : bc }在上面的代码中,broadcastManager.newBroadcast(...)创建Broadcast对象的就是这个方法的返回类型。
现在,让我们深入研究并检查newBroadcast()。
def newBroadcast(value_ : T, isLocal: Boolean): Broadcast[T] = { broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())}在上面的代码中,broadcastManager有一个组件调用broadcastFactory并使用抽象工厂设计模式将广播变量的创建委托给其相关工厂。
另请注意,BroadcastManager跟踪id每个broadcast变量的唯一性,每个新的广播变量都会递增。
目前,在 spark 中只有一种BroadcastFactory可以初始化,即TorrentBroadcastFactory. 这在BroadcastManager. _
// Called by SparkContext or Executor before using Broadcastprivate def initialize() { : broadcastFactory = new TorrentBroadcastFactory :}引用源代码TorrentBroadcastFactory
使用类似 BitTorrent 的协议将广播数据分布式传输到执行程序的广播实现
这个特定的工厂使用TorrentBroadcast。这个类的描述信息量很大。
驱动程序将序列化的对象分成小块,并将这些块存储在驱动程序的 BlockManager 中。
在每个执行器上,执行器首先尝试从其 BlockManager 获取对象。如果它不存在,则它使用远程获取从驱动程序和/或其他执行程序(如果可用)获取小块。一旦获得块,它将块放入自己的 BlockManager 中,以供其他执行程序从中获取。这可以防止驱动程序成为发送多个广播数据副本(每个执行程序一个)的瓶颈。
阅读类的writeBlock函数TorrentBroadcast,我们可以看到StorageLevel.MEMORY_AND_DISK_SER这个广播的硬编码选项。
/** * Divide the object into multiple blocks and put those blocks in the block manager. * * @param value the object to divide * @return number of blocks this broadcast variable is divided into */ private def writeBlocks(value: T): Int = { import StorageLevel._ : : if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { throw new SparkException(s"Failed to store $pieceId of $broadcastId " + s"in local BlockManager") } : :因此,由于此代码使用 的硬编码值StorageLevel.MEMORY_AND_DISK_SER,我们不能StorageLevel.OFF_HEAP用于广播变量。
Spark