I would like to shift the values of a column downwards and have the last element be put on the top. The lag
window function almost accomplishes what I want:
df.select($"sample_column", lag("sample_column", 1).over(windowSpec).as("lag")).show()
+-----------------+----+
|sample_column | lag|
+-----------------+----+
| 0|NULL|
| 1| 0|
| 2| 1|
| 3| 2|
| 4| 3|
| 5| 4|
| 6| 5|
| 7| 6|
| 8| 7|
| 9| 8|
| 10| 9|
| 11| 10|
| 12| 11|
| 13| 12|
| 14| 13|
| 15| 14|
| 16| 15|
| 17| 16|
| 18| 17|
| 19| 18|
+-----------------+----+
only showing top 20 rows
However, if I try to replace the NULL
with the command
df.select($"sample_column", lag("sample_column", 1, last("__index__level_0__").over(windowSpec)).over(windowSpec).as("lag")).show()
I get the error:
.apache.spark.SparkRuntimeException: [UNSUPPORTED_FEATURE.LITERAL_TYPE] The feature is not supported: Literal for 'last(__index__level_0__) OVER (PARTITION BY file_name ORDER BY sample_column ASC NULLS FIRST unspecifiedframe$())' of class .apache.spark.sql.Column.
.apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:320)
.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)
.apache.spark.sql.functions$.lag(functions.scala:1487)
.apache.spark.sql.functions$.lag(functions.scala:1471)
.apache.spark.sql.functions$.lag(functions.scala:1457)
ammonite.$sess.cmd10$Helper.<init>(cmd10.sc:1)
ammonite.$sess.cmd10$.<init>(cmd10.sc:7)
ammonite.$sess.cmd10$.<clinit>(cmd10.sc:-1)
How to do this correctly? Does Spark already has a default implementation for it?
I would like to shift the values of a column downwards and have the last element be put on the top. The lag
window function almost accomplishes what I want:
df.select($"sample_column", lag("sample_column", 1).over(windowSpec).as("lag")).show()
+-----------------+----+
|sample_column | lag|
+-----------------+----+
| 0|NULL|
| 1| 0|
| 2| 1|
| 3| 2|
| 4| 3|
| 5| 4|
| 6| 5|
| 7| 6|
| 8| 7|
| 9| 8|
| 10| 9|
| 11| 10|
| 12| 11|
| 13| 12|
| 14| 13|
| 15| 14|
| 16| 15|
| 17| 16|
| 18| 17|
| 19| 18|
+-----------------+----+
only showing top 20 rows
However, if I try to replace the NULL
with the command
df.select($"sample_column", lag("sample_column", 1, last("__index__level_0__").over(windowSpec)).over(windowSpec).as("lag")).show()
I get the error:
.apache.spark.SparkRuntimeException: [UNSUPPORTED_FEATURE.LITERAL_TYPE] The feature is not supported: Literal for 'last(__index__level_0__) OVER (PARTITION BY file_name ORDER BY sample_column ASC NULLS FIRST unspecifiedframe$())' of class .apache.spark.sql.Column.
.apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:320)
.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)
.apache.spark.sql.functions$.lag(functions.scala:1487)
.apache.spark.sql.functions$.lag(functions.scala:1471)
.apache.spark.sql.functions$.lag(functions.scala:1457)
ammonite.$sess.cmd10$Helper.<init>(cmd10.sc:1)
ammonite.$sess.cmd10$.<init>(cmd10.sc:7)
ammonite.$sess.cmd10$.<clinit>(cmd10.sc:-1)
How to do this correctly? Does Spark already has a default implementation for it?
Share Improve this question asked Mar 14 at 11:45 Ícaro LorranÍcaro Lorran 1167 bronze badges1 Answer
Reset to default 2If this column doesn't have any nulls, then you can try to use lag
and last
functions together like this:
import spark.implicits._
import .apache.spark.sql.expressions.Window
val df = spark.sparkContext.parallelize(Seq(
(0),
(1),
(2),
(3),
(4),
(5),
(6),
(7)
)).toDF("sample_column")
val window = Window.orderBy($"sample_column")
df.select(
$"sample_column",
coalesce(
lag("sample_column", 1).over(window),
last("sample_column").over(window.rowsBetween(0, Window.unboundedFollowing))
).as("lag")
).orderBy($"sample_column")
.show()
+-------------+---+
|sample_column|lag|
+-------------+---+
| 0| 7|
| 1| 0|
| 2| 1|
| 3| 2|
| 4| 3|
| 5| 4|
| 6| 5|
| 7| 6|
+-------------+---+
lag
currently doesn't support the ignorenulls
option, so you might have to separate the null rows out, compute the lag
column for non-null rows and union the data frames.