return $result[$k]; if (3 == DEBUG) return TRUE; if (1 == $gid) return TRUE; // 管理员有所有权限 if (!isset($grouplist[$gid])) return TRUE; $group = $grouplist[$gid]; $result[$k] = empty($group[$access]) ? FALSE : TRUE; return $result[$k]; } // 从缓存中读取 forum_list 数据 function group_list_cache() { global $conf; if ('mysql' == $conf['cache']['type']) { $grouplist = group_get(); } else { $grouplist = cache_get('grouplist'); if (NULL === $grouplist || FALSE === $grouplist) { $grouplist = group_find(); cache_set('grouplist', $grouplist); } } return $grouplist; } // 更新 forumlist 缓存 function group_list_cache_delete() { global $conf; $r = 'mysql' == $conf['cache']['type'] ? group_delete_cache() : cache_delete('grouplist'); return $r; } //--------------------------kv + cache-------------------------- $g_grouplist = FALSE; function group_get() { global $g_grouplist; FALSE === $g_grouplist AND $g_grouplist = website_get('grouplist'); if (empty($g_grouplist)) { $g_grouplist = group_find(); $g_grouplist AND group_set($g_grouplist); } return $g_grouplist; } function group_set($val) { global $g_grouplist; $g_grouplist = $val; return website_set('grouplist', $g_grouplist); } function group_delete_cache() { website_set('grouplist', ''); return TRUE; } ?>Databricks Runtime Autoloader Behaviour with csv - Stack Overflow
最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Databricks Runtime Autoloader Behaviour with csv - Stack Overflow

programmeradmin6浏览0评论

I have been trying to read csv files with Databricks Autoloader for a while now, earlier in DBR 11.3LTS version, my csv files used to be read pretty easily without any significant issues.

Basically I have a bunch of csv files that needs to be ingested and there are a few files where certain rows of the csv file the data is delimited by ";", Problem is that certain files have a lot of semi-colons, so it turns out that after reading it with this delimiter, it has 2 billion columns. Earlier DBR versions used to just skip these extra columns but now in DBR 14.3LTS which we require as an upgrade doesn't skip these extra columns.

We have a custom logic to segregate such rows into bad tables and stuff so it was well handled in the old DBR 11.3LTS version

The new version requires me to use cloudFiles.maxColumns, so I hardcoded it to use 2.5 billion columns so that it can read it but now it gives an error saying "Requested array size exceeds VM Limit". I have tried using 128gb ram clusters as well as multiple node clusters as well and it always gives an executor lost failure with no particular useful information on the SPARK-UI.

Tried using different cloudFiles options to make it work but similar problems.

Is there anyway to replicate the behaviour of DBR 11.3LTS in 14.3lts without any significant changes?

This is how the configuration file looks like

{
  "Name": "eventlog_n4_wlf_csv",
  "DataProductName": "some_data_prod",
  "LookbackRange": 21,
  "Database": "some_db",
  "DatasetPath": "some_path",
  "CheckpointPath": "some_checkpoint",
  "BadPath": "some_bad_path",
  "BadTablePath": "some_bad_table_path",
  "PartitionColumns": [
    "Year",
    "Month",
    "Day"
  ],
  "SchemaColumns": [
    {
      "colName": "Year",
      "nullable": false,
      "dataType": "int"
    },
    {
      "colName": "Month",
      "nullable": false,
      "dataType": "int"
    },
    {
      "colName": "Day",
      "nullable": false,
      "dataType": "int"
    },
    {
      "colName": "SerialNumber",
      "nullable": false,
      "dataType": "int"
    },
    {
      "colName": "MaterialNumber",
      "nullable": false,
      "dataType": "int"
    },
    {
      "colName": "EventSourceHash",
      "nullable": false,
      "dataType": "string"
    },
    {
      "colName": "Category",
      "nullable": true,
      "dataType": "int"
    },
    {
      "colName": "CategoryString",
      "nullable": true,
      "dataType": "string"
    },
    {
      "colName": "EventCode",
      "nullable": true,
      "dataType": "int"
    },
    {
      "colName": "EventIdentifier",
      "nullable": true,
      "dataType": "bigint"
    },
    {
      "colName": "EventType",
      "nullable": true,
      "dataType": "int"
    },
    {
      "colName": "Logfile",
      "nullable": false,
      "dataType": "string"
    },
    {
      "colName": "Message",
      "nullable": true,
      "dataType": "string"
    },
    {
      "colName": "RecordNumber",
      "nullable": false,
      "dataType": "bigint"
    },
    {
      "colName": "SourceName",
      "nullable": true,
      "dataType": "string"
    },
    {
      "colName": "TimeGenerated",
      "nullable": false,
      "dataType": "timestamp"
    },
    {
      "colName": "TimeZoneOffset",
      "nullable": true,
      "dataType": "int"
    },
    {
      "colName": "Type",
      "nullable": true,
      "dataType": "string"
    },
    {
      "colName": "User",
      "nullable": true,
      "dataType": "string"
    },
    {
      "colName": "Data",
      "nullable": true,
      "dataType": "string"
    },
    {
      "colName": "FileName",
      "nullable": false,
      "dataType": "string"
    },
    {
      "colName": "FileUploadDate",
      "nullable": false,
      "dataType": "timestamp"
    }
  ],
  "Properties": {
    "delta.enableChangeDataFeed": "true",
    "delta.checkpoint.writeStatsAsStruct": "true",
    "delta.targetFileSize": "32mb",
    "delta.autoOptimize.autoCompact": "true"
  },
  "WriteOptions": {
    
  },
  "Sources": {
    "csv_source": {
      "Path": "/mnt/raw/some_data_product_path/exp/day=*/materialnum=*/serialnum=*/",
      "ReadOptions": {
        "useStrictGlobber": "true",
        "header": "true",
        "sep": ";",
        "cloudFiles.partitionColumns": "day,materialnum,serialnum",
        "cloudFiles.schemaEvolutionMode": "none",
        "cloudFiles.schemaLocation": "/mnt/gold/tables/data_product/base_table/_schemaLocation",
        "cloudFiles.useNotifications": "false",
        "cloudFiles.maxFileAge": "28 days",
        "cloudFiles.maxBytesPerTrigger": "8589934592",
        "cloudFiles.maxFilesPerTrigger": "100",
        "maxColumns": 2147483646
      }
    }
  },
  "Monitoring": {
    "TableName": "monitoring.processing_statistics",
    "TablePath": "/mnt/gold/tables/monitoring/processing_statistics"
  }
}

Just another sample code where I am trying to read a particular file

    from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType


# Define schema correctly
schema = StructType(
    [
        StructField("Category", IntegerType(), True),
        StructField("CategoryString", StringType(), True),
        StructField("EventCode", IntegerType(), True),
        StructField("EventIdentifier", LongType(), True),
        StructField("EventType", IntegerType(), True),
        StructField("Logfile", StringType(), True),
        StructField("Message", StringType(), True),
        StructField("RecordNumber", LongType(), True),
        StructField("SourceName", StringType(), True),
        StructField("TimeGenerated", StringType(), True),
        StructField("Type", StringType(), True),
        StructField("User", StringType(), True),
        StructField("Data", StringType(), True),  # Fixed the incorrect schema syntax
    ]
)

# Read CSV file with schema
# df = spark.read.csv("dbfs:/FileStore/asarkar/sample.csv", schema=schema, header=True)

# df = spark.read.option("mode", "DROPMALFORMED").csv("dbfs:/FileStore/asarkar/sample.csv", schema=schema, header=True)


df = (
    spark.read.option("header", True)
    .option("delimiter", ";")
    .option("mode", "PERMISSIVE")
    .option("encoding", "UTF-8")
    .option("maxColumns", 2147483647)
    .csv("dbfs:/FileStore/asarkar/sample.csv", schema=schema)
)


display(df)

When I change maxColumns to use this particular value, it gives me Requested array size exceeds VM limit

if I give a different value around 50000 or so, it gives me this

aused by: com.univocity.parsersmon.TextParsingException: java.lang.ArrayIndexOutOfBoundsException - 20480 Hint: Number of columns processed may have exceeded limit of 20480 columns. Use settings.setMaxColumns(int) to define the maximum number of columns your input can have Ensure your configuration is correct, with delimiters, quotes and escape sequences that match the input format you are trying to parse

发布评论

评论列表(0)

  1. 暂无评论