Currently I'm using hadoop-azure-3.4.1 via pyspark library to connect to ABFS. According to the documentation - .html#Azure_Managed_Identity - there is an option to use Managed Identity for authentication. This feature works perfectly from an Azure VM, but it fails on my local server with the following error:
Caused by: .apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException: HTTP Error 400; url='http://127.0.0.1:40342/metadata/identity/oauth2/token' AADToken: HTTP connection to http://127.0.0.1:40342/metadata/identity/oauth2/token failed for getting token from AzureAD.; contentType='application/json'; response '{"error":"invalid_request","error_description":"The api-version '2018-02-01' could not be found. Supported are 2021-02-01 2020-06-01 2019-11-01 2019-08-15","error_codes":[404],"timestamp":"2024-11-20 00:21:23.47342756 +0000 UTC m=+4427.811320871","trace_id":"","correlation_id":"efee3d6c-ddde-465c-96dd-d0c68c7f7ebd"}'
http://127.0.0.1:40342/metadata/identity/oauth2/token is the endpoint used to obtain tokens on machines connected to Azure through Arc: Azure official doc to use MI via ARC.
Based on hadoop-azure code, looks like this version is hardcoded: .java#L176
My python code example:
from pyspark.sql import SparkSession
# Spark add-ons
path_to_hadoop_azure_jar = "/opt/hadoop-azure-3.4.1.jar"
path_to_hadoop_common_jar = "/opt/hadoop-common-3.4.1.jar"
path_to_azure_storage_jar = "/opt/azure-storage-8.6.6.jar"
path_to_azure_datalake_jar = "/opt/hadoop-azure-datalake-3.4.1.jar"
# ABFS variables
account_name = "pilotdbwsa"
container_name = "pilot-dbw"
container_path = "test1-test1-arc"
abfs_path = f"abfss://{container_name}@{account_name}.dfs.core.windows/{container_path}"
# Spark Session setup
spark = SparkSession.builder.appName("AzureDataRead") \
.config("spark.jars", f"{path_to_hadoop_common_jar},{path_to_hadoop_azure_jar},{path_to_azure_storage_jar},{path_to_azure_datalake_jar}") \
.getOrCreate()
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
spark.conf.set(f"fs.azure.account.auth.type.{account_name}.dfs.core.windows", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{account_name}.dfs.core.windows", ".apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.msi.endpoint.{account_name}.dfs.core.windows", "http://127.0.0.1:40342/metadata/identity/oauth2/token")
# Logging
spark.sparkContext.setLogLevel("DEBUG")
# Create a simple DataFrame
data = [("Alice", 25), ("Bob", 30), ("Cathy", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
# Write the DataFrame to ABFS as Parquet
try:
df.write.parquet(abfs_path)
print(f"Parquet file successfully written to {abfs_path}")
except Exception as e:
print(f"Error writing Parquet file: {e}")
QUESTION: Is it possible to programmatically override api-version or specify it in the Spark configuration?
UPDATES:
11/25 - I have prepared changes for hadoop-azure and submitted a pull request github/apache/hadoop/pull/7186
Currently I'm using hadoop-azure-3.4.1 via pyspark library to connect to ABFS. According to the documentation - https://hadoop.apache./docs/stable/hadoop-azure/abfs.html#Azure_Managed_Identity - there is an option to use Managed Identity for authentication. This feature works perfectly from an Azure VM, but it fails on my local server with the following error:
Caused by: .apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException: HTTP Error 400; url='http://127.0.0.1:40342/metadata/identity/oauth2/token' AADToken: HTTP connection to http://127.0.0.1:40342/metadata/identity/oauth2/token failed for getting token from AzureAD.; contentType='application/json'; response '{"error":"invalid_request","error_description":"The api-version '2018-02-01' could not be found. Supported are 2021-02-01 2020-06-01 2019-11-01 2019-08-15","error_codes":[404],"timestamp":"2024-11-20 00:21:23.47342756 +0000 UTC m=+4427.811320871","trace_id":"","correlation_id":"efee3d6c-ddde-465c-96dd-d0c68c7f7ebd"}'
http://127.0.0.1:40342/metadata/identity/oauth2/token is the endpoint used to obtain tokens on machines connected to Azure through Arc: Azure official doc to use MI via ARC.
Based on hadoop-azure code, looks like this version is hardcoded: https://github/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/main/java//apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java#L176
My python code example:
from pyspark.sql import SparkSession
# Spark add-ons
path_to_hadoop_azure_jar = "/opt/hadoop-azure-3.4.1.jar"
path_to_hadoop_common_jar = "/opt/hadoop-common-3.4.1.jar"
path_to_azure_storage_jar = "/opt/azure-storage-8.6.6.jar"
path_to_azure_datalake_jar = "/opt/hadoop-azure-datalake-3.4.1.jar"
# ABFS variables
account_name = "pilotdbwsa"
container_name = "pilot-dbw"
container_path = "test1-test1-arc"
abfs_path = f"abfss://{container_name}@{account_name}.dfs.core.windows/{container_path}"
# Spark Session setup
spark = SparkSession.builder.appName("AzureDataRead") \
.config("spark.jars", f"{path_to_hadoop_common_jar},{path_to_hadoop_azure_jar},{path_to_azure_storage_jar},{path_to_azure_datalake_jar}") \
.getOrCreate()
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
spark.conf.set(f"fs.azure.account.auth.type.{account_name}.dfs.core.windows", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{account_name}.dfs.core.windows", ".apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.msi.endpoint.{account_name}.dfs.core.windows", "http://127.0.0.1:40342/metadata/identity/oauth2/token")
# Logging
spark.sparkContext.setLogLevel("DEBUG")
# Create a simple DataFrame
data = [("Alice", 25), ("Bob", 30), ("Cathy", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
# Write the DataFrame to ABFS as Parquet
try:
df.write.parquet(abfs_path)
print(f"Parquet file successfully written to {abfs_path}")
except Exception as e:
print(f"Error writing Parquet file: {e}")
QUESTION: Is it possible to programmatically override api-version or specify it in the Spark configuration?
UPDATES:
11/25 - I have prepared changes for hadoop-azure and submitted a pull request github/apache/hadoop/pull/7186
Share Improve this question edited Nov 25, 2024 at 16:11 Sergei Varaksin asked Nov 20, 2024 at 3:18 Sergei VaraksinSergei Varaksin 133 bronze badges 4 |1 Answer
Reset to default 0Yes, you are right getting access token from arc enabled servers is different from regular one.
It calls the endpoint for intermediate key and with basic authorization the access token is generated.
When making request to endpoint for the first time it should handle exception and using that metadata further request is made to get token, but in hadoop azure jar it is not handled raising error even after using supported Api version.
So, below are the possible workaround.
Create a custom class in java where it has similar functionality like the code given here and use that class in
spark.conf
as for custom token provider like mentioned hereFor now you can use pandas to read/write the file with storage options giving default credentials.
First, run below command.
pip install fsspec,adlfs
code:
import pandas as pd
strg_opt = {'account_name': "jadls", 'anon': False}
p_df = pd.read_csv("abfs://data/csv/bulk.csv" ,storage_options=strg_opt)
df = spark.createDataFrame(p_df)
df.show()
p_df[["Date","Symbol"]].to_csv("abfs://data/csv/new.csv",storage_options=strg_opt)
Here, passing anon:False
to storage_options
uses default credential that is managed identity and creating spark dataframe from it.
Next, the path should be like in below format
abfs://{CONTAINER}/{FOLDER}/filename.csv
Output:
and in storage account
/opt/hadoop-azure-3.4.1.jar
with updated api version? – JayashankarGS Commented Nov 20, 2024 at 9:57api-version
inhadoop-azure
tool code did not help, since the mechanism for obtaining anaccess_token
on VM and ARC is different. And the difference is that on ARC you need to first get an intermediate key from the endpoint 127.0.0.1:40342/metadata/identity/oauth2/token, and then from the same endpoint, using this key, get the mainaccess_token
. On VM, you immediately get anaccess_token
. Obviously, this tool needs additional functionality. Does anyone know how I can open a ticket for enhancement? – Sergei Varaksin Commented Nov 20, 2024 at 22:15