I am working on a very large dataset that has over 800,000 records and I'm trying to update the column value based on the ID column. If
# Imports
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('SparkByExamples') \
.getOrCreate()
data = [('James','Smith','M','N'), ('Anna','Rose','F','N'),
('Robert','Williams','M','N')
]
columns = ["firstname","lastname","gender","deleted"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
# Output
+---------+--------+------+-------+
|firstname|lastname|gender|deleted|
+---------+--------+------+-------+
| James| Smith| M| N|
| Anna| Rose| F| N|
| Robert|Williams| M| N|
+---------+--------+------+-------+
I also have a list of names for which I want to update the value of the deleted column.
deleted_list = ['James', 'Robert']
Currently, I'm using a code like this:
df = df.withColumn("deleted_new", when(col("firstname").isin(deleted_list ), "Y").otherwise("N"))
df = df.drop('deleted').withColumnRenamed('deleted_new', 'deleted')
However, it is taking a really long time, as my data frame and list are huge. Is there a PySpark way to make this faster?
# Explected output
+---------+--------+------+-------+
|firstname|lastname|gender|deleted|
+---------+--------+------+-------+
| James| Smith| M| Y|
| Anna| Rose| F| N|
| Robert|Williams| M| Y|
+---------+--------+------+-------+
++ Well the list is probably close to a million or more. Also 800K I think it takes about 10 min as I'm testing with sample data, but eventually, it could be millions of records that I will have to process...
I am working on a very large dataset that has over 800,000 records and I'm trying to update the column value based on the ID column. If
# Imports
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('SparkByExamples') \
.getOrCreate()
data = [('James','Smith','M','N'), ('Anna','Rose','F','N'),
('Robert','Williams','M','N')
]
columns = ["firstname","lastname","gender","deleted"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
# Output
+---------+--------+------+-------+
|firstname|lastname|gender|deleted|
+---------+--------+------+-------+
| James| Smith| M| N|
| Anna| Rose| F| N|
| Robert|Williams| M| N|
+---------+--------+------+-------+
I also have a list of names for which I want to update the value of the deleted column.
deleted_list = ['James', 'Robert']
Currently, I'm using a code like this:
df = df.withColumn("deleted_new", when(col("firstname").isin(deleted_list ), "Y").otherwise("N"))
df = df.drop('deleted').withColumnRenamed('deleted_new', 'deleted')
However, it is taking a really long time, as my data frame and list are huge. Is there a PySpark way to make this faster?
# Explected output
+---------+--------+------+-------+
|firstname|lastname|gender|deleted|
+---------+--------+------+-------+
| James| Smith| M| Y|
| Anna| Rose| F| N|
| Robert|Williams| M| Y|
+---------+--------+------+-------+
++ Well the list is probably close to a million or more. Also 800K I think it takes about 10 min as I'm testing with sample data, but eventually, it could be millions of records that I will have to process...
Share Improve this question edited Mar 30 at 18:12 Sarah asked Mar 29 at 22:04 SarahSarah 551 silver badge5 bronze badges 3 |1 Answer
Reset to default 1You could try parallelizing deleted_list
and perform a left join:
to_delete_df = spark.createDataFrame([(n, 'Y') for n in deleted_list], ['firstname', 'deleted_new'])
df.join(to_delete_df, ['firstname'], 'left')\
.withColumn("deleted", F.coalesce(F.col("deleted_new"), F.col("deleted")))\
.drop("deleted_new").show()
+---------+--------+------+-------+
|firstname|lastname|gender|deleted|
+---------+--------+------+-------+
| James| Smith| M| Y|
| Anna| Rose| F| N|
| Robert|Williams| M| Y|
+---------+--------+------+-------+
By using isin
on a very large list, you generate a very heavy execution plan and spark generally has troubles with that.
list
is huge, what is the length of list? – Jonathan Commented Mar 30 at 3:53df
– Derek O Commented Mar 30 at 15:45