最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

dataframe - Spark struct to case class conversion issues with getAs[T] - Stack Overflow

programmeradmin2浏览0评论

I often use the map function on spark Dataset rows to do transformations in Scala on typed objects. My usual pattern is to convert intermediate results created from data frame transformations (withColumn, groupBy, etc.) and create a typed Dataset of the intermediate result so I can use map.

This works well but leads to a lot of 'temporary' case classes for intermediate results or unwieldy tuple types.

An alternative would be to run map on a data frame and retrieve typed fields from the row using getAs[T] but this doesn't seem to work with spark.implicits if T is a case class.

E.g. this gives the error ClassCastException: .apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Person

import .apache.spark._
import .apache.spark.sql._
import .apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import .apache.spark.sql.expressions.Window

import spark.implicits._

final case class Person(name: String, age: Integer)
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS

val df = people.alias("p")
               .select($"p.name", struct($"p.*").alias("person"))
val ds = df.map(row => {
  val name = row.getAs[String]("name")
  val person = row.getAs[Person]("person")
  (name, person)
})

display(ds)

whereas this works fine:

import .apache.spark._
import .apache.spark.sql._
import .apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import .apache.spark.sql.expressions.Window

import spark.implicits._

final case class Person(name: String, age: Integer)
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS

val df = people.alias("p")
               .select($"p.name", struct($"p.*").alias("person"))
               .as[Tuple2[String, Person]]
val ds = df.map(row => {
  val name = row._1
  val person = row._2
  (name, person)
})

display(ds)

So spark is happily converting the dataframe person struct to the Person case class in the second example but won't do it in the first example. Does anyone know a simple way to fix this?

Thanks,

David

I often use the map function on spark Dataset rows to do transformations in Scala on typed objects. My usual pattern is to convert intermediate results created from data frame transformations (withColumn, groupBy, etc.) and create a typed Dataset of the intermediate result so I can use map.

This works well but leads to a lot of 'temporary' case classes for intermediate results or unwieldy tuple types.

An alternative would be to run map on a data frame and retrieve typed fields from the row using getAs[T] but this doesn't seem to work with spark.implicits if T is a case class.

E.g. this gives the error ClassCastException: .apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Person

import .apache.spark._
import .apache.spark.sql._
import .apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import .apache.spark.sql.expressions.Window

import spark.implicits._

final case class Person(name: String, age: Integer)
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS

val df = people.alias("p")
               .select($"p.name", struct($"p.*").alias("person"))
val ds = df.map(row => {
  val name = row.getAs[String]("name")
  val person = row.getAs[Person]("person")
  (name, person)
})

display(ds)

whereas this works fine:

import .apache.spark._
import .apache.spark.sql._
import .apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import .apache.spark.sql.expressions.Window

import spark.implicits._

final case class Person(name: String, age: Integer)
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS

val df = people.alias("p")
               .select($"p.name", struct($"p.*").alias("person"))
               .as[Tuple2[String, Person]]
val ds = df.map(row => {
  val name = row._1
  val person = row._2
  (name, person)
})

display(ds)

So spark is happily converting the dataframe person struct to the Person case class in the second example but won't do it in the first example. Does anyone know a simple way to fix this?

Thanks,

David

Share Improve this question asked Nov 21, 2024 at 8:25 David ReganDavid Regan 3192 silver badges11 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

"Simple", possibly :), but very much using internal api's that are subject to change (and have done). This code won't work as-is on Spark 4 either (tested on 3.5.1).

As an approach it's also likely slower than the second example you provide using tuples as the Spark code translates from InternalRow to the user land Row before entering your map code. The below code then converts back to InternalRow before calling the decoder.

resolveAndBind is typically ok in this kind of example but it's also not guaranteed to work in all cases as resolution of field names etc. typically needs to happen as part of the full analysis of the query plan.

import .apache.spark._
import .apache.spark.sql._
import .apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import .apache.spark.sql.expressions.Window
import .apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}

import spark.implicits._

implicit val pEnc = implicitly[Encoder[Person]].asInstanceOf[ExpressionEncoder[Person]]
val decoder = pEnc.resolveAndBind().objDeserializer

val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS

val df = people.alias("p")
  .select($"p.name", struct($"p.*").alias("person"))
val ds = df.map(row => {
  val name = row.getAs[String]("name")
  val personRow = row.getAs[Row]("person")
  val person = decoder.eval(CatalystTypeConverters.convertToCatalyst(personRow).asInstanceOf[InternalRow]).asInstanceOf[Person]
  (name, person)
})

ds.show

in summary, you are better off using a tuple wrapper and the inbuilt encoding wherever possible, it's faster and is designed and tested to work that way.

发布评论

评论列表(0)

  1. 暂无评论