最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Convert dataframe column if its datetime to longType, otherwise keep the same column - Stack Overflow

programmeradmin0浏览0评论

I have a dataframe with columns entry_transaction, created_date, updated_date, transaction_date.

Created_date, Updated_date are strings with YYYY-MM-dd HH:mm:ss.SSS, and transaction_day is in long milliseconds. I need to loop through the columns and if a column is in datetime format, convert to long, else keep the default value, and also apply datatype from a schema file.

Following is the code, while am able to convert the datetime to long format, am unable to get the default value for transaction_date.

There are 50 plus tables, and each table has more than 50 columns. so I need to use for loop and apply this change dynamically by checking if the column is a datetime column.

Here is the sample code for reproducing:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example").getOrCreate()

data = [("2025-03-12 18:47:33.943", "1735862400000", "2025-03-12 18:47:33.943", "2025-03-12 18:47:33.943"), ("2025-03-12 10:47:33.943", "1735862400000", "2025-03-12 12:47:33.943", "2025-03-12 16:47:33.943"), ("2025-03-01 18:47:33.943", "1735862400000", "2025-03-04 18:47:33.943", "2025-03-12 18:47:33.943")]
columns = ["entry_transaction", "transaction_date", "creation_date", "updated_date"]

df = spark.createDataFrame(data, columns)
df.show()
df2 = df.select("entry_transaction","transaction_date", "creation_date", "updated_date")

schema = StructType([StructField('entry_transaction', LongType(), True), StructField('transaction_date', StringType(), True), StructField('creation_date', LongType(), True), StructField('updated_date', LongType(), True)])

for field in schema.fields:
    print(field.name, field.dataType)
    df2 = df2.withColumn(
                        field.name,
                        unix_timestamp(col(field.name).cast(TimestampType())) * 1000000 if to_date(col(field.name), "yyyy-MM-dd").isNotNull else df2[field.name])
sorted_columns = sorted(df2.columns)
df_reordered = df2.select(sorted_columns)
display(df_reordered)

Expected output:

But am getting NULL for transaction_date

Please help how to get the proper dataframe. Thanks,

I have a dataframe with columns entry_transaction, created_date, updated_date, transaction_date.

Created_date, Updated_date are strings with YYYY-MM-dd HH:mm:ss.SSS, and transaction_day is in long milliseconds. I need to loop through the columns and if a column is in datetime format, convert to long, else keep the default value, and also apply datatype from a schema file.

Following is the code, while am able to convert the datetime to long format, am unable to get the default value for transaction_date.

There are 50 plus tables, and each table has more than 50 columns. so I need to use for loop and apply this change dynamically by checking if the column is a datetime column.

Here is the sample code for reproducing:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example").getOrCreate()

data = [("2025-03-12 18:47:33.943", "1735862400000", "2025-03-12 18:47:33.943", "2025-03-12 18:47:33.943"), ("2025-03-12 10:47:33.943", "1735862400000", "2025-03-12 12:47:33.943", "2025-03-12 16:47:33.943"), ("2025-03-01 18:47:33.943", "1735862400000", "2025-03-04 18:47:33.943", "2025-03-12 18:47:33.943")]
columns = ["entry_transaction", "transaction_date", "creation_date", "updated_date"]

df = spark.createDataFrame(data, columns)
df.show()
df2 = df.select("entry_transaction","transaction_date", "creation_date", "updated_date")

schema = StructType([StructField('entry_transaction', LongType(), True), StructField('transaction_date', StringType(), True), StructField('creation_date', LongType(), True), StructField('updated_date', LongType(), True)])

for field in schema.fields:
    print(field.name, field.dataType)
    df2 = df2.withColumn(
                        field.name,
                        unix_timestamp(col(field.name).cast(TimestampType())) * 1000000 if to_date(col(field.name), "yyyy-MM-dd").isNotNull else df2[field.name])
sorted_columns = sorted(df2.columns)
df_reordered = df2.select(sorted_columns)
display(df_reordered)

Expected output:

But am getting NULL for transaction_date

Please help how to get the proper dataframe. Thanks,

Share Improve this question edited Mar 28 at 18:45 Yuva asked Mar 28 at 17:57 YuvaYuva 3,1738 gold badges47 silver badges75 bronze badges 5
  • 1 Might be looking for this spark.apache./docs/latest/api/python/reference/pyspark.sql/… – Chris Commented Mar 28 at 21:14
  • @ Yuva you need to handle the conversion of datetime columns to long format while keeping the default value for columns that are already in long format. – Dileep Raj Narayan Thumula Commented Mar 29 at 3:20
  • 1 Approach 1 for field in schema.fields: df = df.withColumn( field.name, when( col(field.name).cast("timestamp").isNotNull(), unix_timestamp(to_timestamp(col(field.name), "yyyy-MM-dd HH:mm:ss.SSS")) * 1000000 ).otherwise(col(field.name)) ) – Dileep Raj Narayan Thumula Commented Mar 29 at 3:30
  • for field in schema.fields: if isinstance(field.dataType, LongType): df = df.withColumn(field.name, unix_timestamp(col(field.name).cast(TimestampType())) * 1000000) else: df = df.withColumn(field.name, col(field.name)) – Dileep Raj Narayan Thumula Commented Mar 29 at 3:31
  • 1 Can you try the above 2 ways and let me know if did help you? – Dileep Raj Narayan Thumula Commented Mar 29 at 3:31
Add a comment  | 

1 Answer 1

Reset to default 0

Thank you all for your responses/tips. Am able to get expected result, and am posting here the piece of code so for reference. Am only giving the specific section of code where i am handling for timestamp

if isinstance(field.dataType, TimestampType):
    # print(f" ------ IF PART ------ {field.name, field.dataType}")
    df = df.withColumn(
                    field.name, 
                    when(to_date(col(field.name), "yyyy-MM-dd HH:mm:ss.SSSSSS").isNotNull(), 
                        unix_timestamp(col(field.name).cast(TimestampType())) * 1000000)
                    .when(to_date(col(field.name), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX").isNotNull(),
                        unix_timestamp(col(field.name).cast(TimestampType())) * 1000000)
                    .otherwise(unix_timestamp(col(field.name).cast(TimestampType())) * 1000000))
else:
    # print(f" ------ ELse PART ------ {field.name, field.dataType}")
    df = df.withColumn(field.name, col(field.name).cast(field.dataType))

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论