I have a dataframe with columns entry_transaction, created_date, updated_date, transaction_date.
Created_date
, Updated_date
are strings with YYYY-MM-dd HH:mm:ss.SSS
, and transaction_day is in long milliseconds. I need to loop through the columns and if a column is in datetime format, convert to long, else keep the default value, and also apply datatype from a schema file.
Following is the code, while am able to convert the datetime to long format, am unable to get the default value for transaction_date.
There are 50 plus tables, and each table has more than 50 columns. so I need to use for loop and apply this change dynamically by checking if the column is a datetime column.
Here is the sample code for reproducing:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()
data = [("2025-03-12 18:47:33.943", "1735862400000", "2025-03-12 18:47:33.943", "2025-03-12 18:47:33.943"), ("2025-03-12 10:47:33.943", "1735862400000", "2025-03-12 12:47:33.943", "2025-03-12 16:47:33.943"), ("2025-03-01 18:47:33.943", "1735862400000", "2025-03-04 18:47:33.943", "2025-03-12 18:47:33.943")]
columns = ["entry_transaction", "transaction_date", "creation_date", "updated_date"]
df = spark.createDataFrame(data, columns)
df.show()
df2 = df.select("entry_transaction","transaction_date", "creation_date", "updated_date")
schema = StructType([StructField('entry_transaction', LongType(), True), StructField('transaction_date', StringType(), True), StructField('creation_date', LongType(), True), StructField('updated_date', LongType(), True)])
for field in schema.fields:
print(field.name, field.dataType)
df2 = df2.withColumn(
field.name,
unix_timestamp(col(field.name).cast(TimestampType())) * 1000000 if to_date(col(field.name), "yyyy-MM-dd").isNotNull else df2[field.name])
sorted_columns = sorted(df2.columns)
df_reordered = df2.select(sorted_columns)
display(df_reordered)
Expected output:
But am getting NULL for transaction_date
Please help how to get the proper dataframe. Thanks,
I have a dataframe with columns entry_transaction, created_date, updated_date, transaction_date.
Created_date
, Updated_date
are strings with YYYY-MM-dd HH:mm:ss.SSS
, and transaction_day is in long milliseconds. I need to loop through the columns and if a column is in datetime format, convert to long, else keep the default value, and also apply datatype from a schema file.
Following is the code, while am able to convert the datetime to long format, am unable to get the default value for transaction_date.
There are 50 plus tables, and each table has more than 50 columns. so I need to use for loop and apply this change dynamically by checking if the column is a datetime column.
Here is the sample code for reproducing:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()
data = [("2025-03-12 18:47:33.943", "1735862400000", "2025-03-12 18:47:33.943", "2025-03-12 18:47:33.943"), ("2025-03-12 10:47:33.943", "1735862400000", "2025-03-12 12:47:33.943", "2025-03-12 16:47:33.943"), ("2025-03-01 18:47:33.943", "1735862400000", "2025-03-04 18:47:33.943", "2025-03-12 18:47:33.943")]
columns = ["entry_transaction", "transaction_date", "creation_date", "updated_date"]
df = spark.createDataFrame(data, columns)
df.show()
df2 = df.select("entry_transaction","transaction_date", "creation_date", "updated_date")
schema = StructType([StructField('entry_transaction', LongType(), True), StructField('transaction_date', StringType(), True), StructField('creation_date', LongType(), True), StructField('updated_date', LongType(), True)])
for field in schema.fields:
print(field.name, field.dataType)
df2 = df2.withColumn(
field.name,
unix_timestamp(col(field.name).cast(TimestampType())) * 1000000 if to_date(col(field.name), "yyyy-MM-dd").isNotNull else df2[field.name])
sorted_columns = sorted(df2.columns)
df_reordered = df2.select(sorted_columns)
display(df_reordered)
Expected output:
But am getting NULL for transaction_date
Please help how to get the proper dataframe. Thanks,
Share Improve this question edited Mar 28 at 18:45 Yuva asked Mar 28 at 17:57 YuvaYuva 3,1738 gold badges47 silver badges75 bronze badges 5- 1 Might be looking for this spark.apache./docs/latest/api/python/reference/pyspark.sql/… – Chris Commented Mar 28 at 21:14
- @ Yuva you need to handle the conversion of datetime columns to long format while keeping the default value for columns that are already in long format. – Dileep Raj Narayan Thumula Commented Mar 29 at 3:20
- 1 Approach 1 for field in schema.fields: df = df.withColumn( field.name, when( col(field.name).cast("timestamp").isNotNull(), unix_timestamp(to_timestamp(col(field.name), "yyyy-MM-dd HH:mm:ss.SSS")) * 1000000 ).otherwise(col(field.name)) ) – Dileep Raj Narayan Thumula Commented Mar 29 at 3:30
- for field in schema.fields: if isinstance(field.dataType, LongType): df = df.withColumn(field.name, unix_timestamp(col(field.name).cast(TimestampType())) * 1000000) else: df = df.withColumn(field.name, col(field.name)) – Dileep Raj Narayan Thumula Commented Mar 29 at 3:31
- 1 Can you try the above 2 ways and let me know if did help you? – Dileep Raj Narayan Thumula Commented Mar 29 at 3:31
1 Answer
Reset to default 0Thank you all for your responses/tips. Am able to get expected result, and am posting here the piece of code so for reference. Am only giving the specific section of code where i am handling for timestamp
if isinstance(field.dataType, TimestampType):
# print(f" ------ IF PART ------ {field.name, field.dataType}")
df = df.withColumn(
field.name,
when(to_date(col(field.name), "yyyy-MM-dd HH:mm:ss.SSSSSS").isNotNull(),
unix_timestamp(col(field.name).cast(TimestampType())) * 1000000)
.when(to_date(col(field.name), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX").isNotNull(),
unix_timestamp(col(field.name).cast(TimestampType())) * 1000000)
.otherwise(unix_timestamp(col(field.name).cast(TimestampType())) * 1000000))
else:
# print(f" ------ ELse PART ------ {field.name, field.dataType}")
df = df.withColumn(field.name, col(field.name).cast(field.dataType))