I have a zipped parquet file with a corrupted header, i.e. it contains weird characters making it impossible to read the table in a standart way.
So I created a cleaning function that reads in the file as RDD, removes non-ascii characters, extracts the column names, removes these rows from the RDD and then converts the RDD to a data frame with the extracted column names.
df.show() works, however, if I want to print the count, it takes infinitively long, making the function unusable. Is there something to improve the function or a radically different approach?
The first 3 RDD rows (rdd.take(3)) look like this:
['FILE.3.a.csv\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x000000644\x000000000\x000000000\x00�\x00\x00\x00\x00\x00\x00\x11h\x1a', '�14766161726\x00013642\x00 0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00ustar \x00root\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00root\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00var1;var2;var3;var4;var5;var6;var7;var8;var9;var10;var11;var12;var13;var14;var2;var15;var16;var17;var18', '"value1";"value2";"";"value4";"value5";"value6";"value7";"value8";"value9";"value10";"value11";"value12";"value13";"value14";"value15";"value16";"";""']
The function looks like this:
def clean_corrupted_data(input_path):
# Read in input as text file
rdd = spark.sparkContext.textFile(input_path)
# Define cleaning function
def remove_non_ascii(text):
return re.sub(r'[\x00-\x1F\x7F-\x9F]|[^\x00-\x7F]+', '', text)
# Clean text
clean_rdd = rdd.map(remove_non_ascii)
# Take second '' bracket to filter out headers
header = clean_rdd.take(2)
header = header[1].split(";")
# Remove headers from cleaned text
data_rdd = clean_rdd.zipWithIndex().filter(lambda x: x[1] > 1).map(lambda x: x[0])
# Create dataframe from cleaned text + headers
data_rdd = data_rdd.map(lambda line: [col.strip('"') for col in line.split(";")])
valid_data_rdd = data_rdd.filter(lambda row: len(row) == len(header))
df = spark.createDataFrame(valid_data_rdd, header)
return df