I have an Azure SQL database that I want to query with PySpark. I have to "copy" the data to a temporary table, and then query this temporary table. I would like to use pretty much the same code that's in Spark's documentation for the prepareQuery
option .html#data-source-option
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("prepareQuery", "(SELECT * INTO #TempTable FROM (SELECT * FROM tbl) t)")
.option("query", "SELECT * FROM #TempTable")
.load()
Since some of the tables are quite big, it takes a while to create the temporary tables. The issue is that these queries are executed twice, I guess the first time to infer the schema, and then to read the data. So Spark is running the expensive operation of creating the temporary tables twice, which I would like to avoid. Is there any way to disable inferSchema, as it is for some of the other datasources?
I tried to provide the customSchema
option, but it is still executing my queries twice. Also played around with fetchsize
and pushDownPredicate
, but they do not affect this behavior.
UPDATE: Enabled auditing on the SQL Server so that I can see the queries that are executed on it. Spark is running these queries first:
SELECT *
INTO #TempTable
FROM tbl;
SELECT * FROM (SELECT * FROM #TempTable) SPARK_GEN_SUBQ_0 WHERE 1=0
And then the second time the "query" part changes to:
SELECT "Id","FirstName","LastName" FROM (SELECT * FROM #TempTable) SPARK_GEN_SUBQ_0
So to me it seems to be clear that it does this for schema inference (because the second time the list of columns are populated), but I would like to disable it somehow because creating the temporary table takes a lot of time.
I have an Azure SQL database that I want to query with PySpark. I have to "copy" the data to a temporary table, and then query this temporary table. I would like to use pretty much the same code that's in Spark's documentation for the prepareQuery
option https://spark.apache./docs/latest/sql-data-sources-jdbc.html#data-source-option
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("prepareQuery", "(SELECT * INTO #TempTable FROM (SELECT * FROM tbl) t)")
.option("query", "SELECT * FROM #TempTable")
.load()
Since some of the tables are quite big, it takes a while to create the temporary tables. The issue is that these queries are executed twice, I guess the first time to infer the schema, and then to read the data. So Spark is running the expensive operation of creating the temporary tables twice, which I would like to avoid. Is there any way to disable inferSchema, as it is for some of the other datasources?
I tried to provide the customSchema
option, but it is still executing my queries twice. Also played around with fetchsize
and pushDownPredicate
, but they do not affect this behavior.
UPDATE: Enabled auditing on the SQL Server so that I can see the queries that are executed on it. Spark is running these queries first:
SELECT *
INTO #TempTable
FROM tbl;
SELECT * FROM (SELECT * FROM #TempTable) SPARK_GEN_SUBQ_0 WHERE 1=0
And then the second time the "query" part changes to:
SELECT "Id","FirstName","LastName" FROM (SELECT * FROM #TempTable) SPARK_GEN_SUBQ_0
So to me it seems to be clear that it does this for schema inference (because the second time the list of columns are populated), but I would like to disable it somehow because creating the temporary table takes a lot of time.
Share Improve this question edited Mar 4 at 18:02 Christoph Rackwitz 15.9k5 gold badges39 silver badges51 bronze badges asked Mar 2 at 6:49 ralparralpar 111 silver badge2 bronze badges1 Answer
Reset to default 1Spark's JDBC connector needs to determine the schema of the data coming from the database and the only reliable way to bypass the duplicate execution of your prepareQuery
is to provide an explicit schema using the schema
option.
The options like fetchsize
and pushDownPredicate
affect how data is retrieved after the schema is known, they don't change the initial schema discovery process.
Here's how you can implement it:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DoubleType
# Define your schema based on the columns in your Azure SQL table
custom_schema = StructType([
StructField("column1", IntegerType(), True),
StructField("column2", StringType(), True),
StructField("column3", TimestampType(), True),
StructField("column4", DoubleType(), True),
# ... add more fields as needed
])
df = spark.read.format("jdbc") \
.schema(custom_schema) \
.option("url", jdbcUrl) \
.option("prepareQuery", "(SELECT * INTO #TempTable FROM (SELECT * FROM tbl) t)") \
.option("query", "SELECT * FROM #TempTable") \
.load()
With JDBC connections in PySpark the schema is determined by the database's metadata, not by sampling the data. It's not inferred by scanning the data in the same way as it does for CSV files, therefore, there isn't a direct "disable inferSchema" option.