最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

parquet - Spark file with corrupted header - Stack Overflow

programmeradmin3浏览0评论

I have a zipped parquet file with a corrupted header, i.e. it contains weird characters making it impossible to read the table in a standart way.

So I created a cleaning function that reads in the file as RDD, removes non-ascii characters, extracts the column names, removes these rows from the RDD and then converts the RDD to a data frame with the extracted column names.

df.show() works, however, if I want to print the count, it takes infinitively long, making the function unusable. Is there something to improve the function or a radically different approach?

The first 3 RDD rows (rdd.take(3)) look like this:


['FILE.3.a.csv\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x000000644\x000000000\x000000000\x00�\x00\x00\x00\x00\x00\x00\x11h\x1a', '�14766161726\x00013642\x00 0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00ustar  \x00root\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00root\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00var1;var2;var3;var4;var5;var6;var7;var8;var9;var10;var11;var12;var13;var14;var2;var15;var16;var17;var18', '"value1";"value2";"";"value4";"value5";"value6";"value7";"value8";"value9";"value10";"value11";"value12";"value13";"value14";"value15";"value16";"";""']

The function looks like this:

def clean_corrupted_data(input_path):
    
    # Read in input as text file
    rdd = spark.sparkContext.textFile(input_path)

    # Define cleaning function
    def remove_non_ascii(text):
          return re.sub(r'[\x00-\x1F\x7F-\x9F]|[^\x00-\x7F]+', '', text) 

    # Clean text
    clean_rdd = rdd.map(remove_non_ascii)

    # Take second '' bracket to filter out headers 
    header = clean_rdd.take(2)  
    header = header[1].split(";")

    # Remove headers from cleaned text
    data_rdd = clean_rdd.zipWithIndex().filter(lambda x: x[1] > 1).map(lambda x: x[0])  

    # Create dataframe from cleaned text + headers
    data_rdd = data_rdd.map(lambda line: [col.strip('"') for col in line.split(";")])
    valid_data_rdd = data_rdd.filter(lambda row: len(row) == len(header))
    df = spark.createDataFrame(valid_data_rdd, header)
    
    return df
发布评论

评论列表(0)

  1. 暂无评论