I am trying to retrieve data from a table in HDFS in which a column contains timestamps.
I am connected in hdfs using CDSW and running a python script which opens a spark session and run an sql query to retrieve some rows from the table. Although running the same sql query in HUE imala i get the proper values, in CDSW using the python script i get None values only from the timestamp column. How can i retrieve my data properly. It's a huge table so i cannot just export the csv file from the impala editor. I want to retrieve data for migration to another database. The script i run in CDSW is the following:
where measurement_time
returns None
values instead of timestamps that appear in HUE Impala editor.
import pandas as pd
import numpy as np
import sys
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.functions import lit
from pyspark.sql.functions import unix_timestamp
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_utc_timestamp
from pyspark.sql import SQLContext
os.environ['PYSPARK_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python3.6'
os.environ['PROJ_LIB']='/home/cdsw/.conda/envs/python3.6/share/proj'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python3.6'
spark = SparkSession.builder\
.master("yarn")\
.config("spark.sql.session.timeZone","UTC")\
.config("spark.submit.deployMode", "client")\
.config("spark.eventLog.enabled", "true")\
.config("spark.executor.instances", "30")\
.config("spark.executor.cores", "2")\
.config("spark.executor.memory", "4g")\
.config("spark.rpc.message.maxSize", "1024")\
.config("spark.executor.memoryOverhead", "800")\
.config("spark.driver.memory", "4g")\
.config("spark.driver.memoryOverhead", "800")\
.config("spark.spark.driver.maxResultSize", "4g")\
.config("spark.executor.dynamicAllocation.initialExecutors", "false")\
.config("spark.executor.dynamicAllocation.minExecutors", "false")\
.config("spark.executor.dynamicAllocation.maxExecutors", "false")\
.config("spark.sql.broadcastTimeout", "1000")\
.config("spark.kryoserializer.buffer.max", "1024m")\
.config("spark.sql.execution.arrow.pyspark.enabled", "true")\
.config("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.avro.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
hiveaction = sqlContext.sql('SET hive.exec.dynamic.partition = true')
hiveaction = sqlContext.sql('SET hive.exec.dynamic.partition.mode = nonstrict')
pd.set_option("display.max_rows", None, "display.max_columns", None)
qry ="""SELECT parameter_id, measurement_time , value, par_dt FROM aums.eems_archive_data WHERE par_dt = '20240101' LIMIT 10"""
spark_df = spark.sql(qry)
data_df = spark_df.toPandas()
print(data_df.head(1))
I am expecting:
parameter_id measurement_time value par_dt \
0 d7cc8e82-7ad1-11ed-aaf9-fa163ed4a1d0 2024-01-01 01:34:24 13.45 20240101
and i get:
parameter_id measurement_time value par_dt \
0 d7cc8e82-7ad1-11ed-aaf9-fa163ed4a1d0 None 13.45 20240101