Apache Spark heavily rely on closure serialization for moving computations to different machines, unfortunately, the default JVM serialization is quite dirty and inclined to include irrelevant information. As a consequence, many Scala/Java frameworks may need special adaptation to work with it.
(using ScalaTest in the following example, both Scala 2 & 3 yield identical behaviour)
class ReduceClosureSerializingWithInline extends SparkUnitTest { // class is not serialisable
val k = "1"
it("will fail") {
sc.parallelize(1 to 2).map {
v =>
k + v
}.collect()
}
{ // as a circumvention
val k = "1"
it("will succeed") {
sc.parallelize(1 to 2).map {
v =>
k + v
}.collect()
}
}
}
abstract class SparkUnitTest extends AnyFunSpec, BeforeAndAfterAll:
def appName: String = getClass.getSimpleName
given spark: SparkSession =
val sparkConf = new SparkConf(true).setAll(
Map(
)
)
SparkSession
.builder()
.master("local")
.config(sparkConf)
.appName(suiteName)
.getOrCreate()
def sc: SparkContext = spark.sparkContext
override def afterAll(): Unit =
spark.stop()
export .scalatest.matchers.should.Matchers.shouldEqual
the only difference between success & failure is that k is a member of SparkUnitTest in the first case (which carries the entire SparkUnitTest in the closure), but a free variable in the second case (which only carries itself in the closure).
This circumvention works in the above simple case, but in many other more complex cases it is either impossible, or making implementations much longer than necessary. Is there a method to make the first k behaving similar to the second k when being serialised as part of a closure?
Apache Spark heavily rely on closure serialization for moving computations to different machines, unfortunately, the default JVM serialization is quite dirty and inclined to include irrelevant information. As a consequence, many Scala/Java frameworks may need special adaptation to work with it.
(using ScalaTest in the following example, both Scala 2 & 3 yield identical behaviour)
class ReduceClosureSerializingWithInline extends SparkUnitTest { // class is not serialisable
val k = "1"
it("will fail") {
sc.parallelize(1 to 2).map {
v =>
k + v
}.collect()
}
{ // as a circumvention
val k = "1"
it("will succeed") {
sc.parallelize(1 to 2).map {
v =>
k + v
}.collect()
}
}
}
abstract class SparkUnitTest extends AnyFunSpec, BeforeAndAfterAll:
def appName: String = getClass.getSimpleName
given spark: SparkSession =
val sparkConf = new SparkConf(true).setAll(
Map(
)
)
SparkSession
.builder()
.master("local")
.config(sparkConf)
.appName(suiteName)
.getOrCreate()
def sc: SparkContext = spark.sparkContext
override def afterAll(): Unit =
spark.stop()
export .scalatest.matchers.should.Matchers.shouldEqual
the only difference between success & failure is that k is a member of SparkUnitTest in the first case (which carries the entire SparkUnitTest in the closure), but a free variable in the second case (which only carries itself in the closure).
This circumvention works in the above simple case, but in many other more complex cases it is either impossible, or making implementations much longer than necessary. Is there a method to make the first k behaving similar to the second k when being serialised as part of a closure?
Share Improve this question edited Jan 20 at 1:24 tribbloid asked Jan 19 at 19:30 tribbloidtribbloid 3,86815 gold badges71 silver badges120 bronze badges 5 |1 Answer
Reset to default 2No. You should not use instance variables if you do not want to serialize the instance. You have to carry all your serializable data in data classes (e.g. a case class, but doesn't have to be) that can safely be serialized. Your logic can be carried either on classes that only have serializable fields, or singletons (object
).
object
? – Gaël J Commented Jan 20 at 6:25object
s. – Gaël J Commented Jan 21 at 18:51