最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

apache spark - how to set "api-version" dynamically in fs.azure.account.oauth2.msi.endpoint - Stack Overflow

programmeradmin1浏览0评论

Currently I'm using hadoop-azure-3.4.1 via pyspark library to connect to ABFS. According to the documentation - .html#Azure_Managed_Identity - there is an option to use Managed Identity for authentication. This feature works perfectly from an Azure VM, but it fails on my local server with the following error:

Caused by: .apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException: HTTP Error 400; url='http://127.0.0.1:40342/metadata/identity/oauth2/token' AADToken: HTTP connection to http://127.0.0.1:40342/metadata/identity/oauth2/token failed for getting token from AzureAD.; contentType='application/json'; response '{"error":"invalid_request","error_description":"The api-version '2018-02-01' could not be found. Supported are 2021-02-01 2020-06-01 2019-11-01 2019-08-15","error_codes":[404],"timestamp":"2024-11-20 00:21:23.47342756 +0000 UTC m=+4427.811320871","trace_id":"","correlation_id":"efee3d6c-ddde-465c-96dd-d0c68c7f7ebd"}'

http://127.0.0.1:40342/metadata/identity/oauth2/token is the endpoint used to obtain tokens on machines connected to Azure through Arc: Azure official doc to use MI via ARC.

Based on hadoop-azure code, looks like this version is hardcoded: .java#L176

My python code example:

from pyspark.sql import SparkSession

# Spark add-ons
path_to_hadoop_azure_jar = "/opt/hadoop-azure-3.4.1.jar"
path_to_hadoop_common_jar = "/opt/hadoop-common-3.4.1.jar"
path_to_azure_storage_jar = "/opt/azure-storage-8.6.6.jar"
path_to_azure_datalake_jar = "/opt/hadoop-azure-datalake-3.4.1.jar"

# ABFS variables
account_name = "pilotdbwsa"
container_name = "pilot-dbw"
container_path = "test1-test1-arc"
abfs_path = f"abfss://{container_name}@{account_name}.dfs.core.windows/{container_path}"

# Spark Session setup
spark = SparkSession.builder.appName("AzureDataRead") \
    .config("spark.jars", f"{path_to_hadoop_common_jar},{path_to_hadoop_azure_jar},{path_to_azure_storage_jar},{path_to_azure_datalake_jar}") \
    .getOrCreate()
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")

spark.conf.set(f"fs.azure.account.auth.type.{account_name}.dfs.core.windows", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{account_name}.dfs.core.windows", ".apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.msi.endpoint.{account_name}.dfs.core.windows", "http://127.0.0.1:40342/metadata/identity/oauth2/token")

# Logging
spark.sparkContext.setLogLevel("DEBUG")

# Create a simple DataFrame
data = [("Alice", 25), ("Bob", 30), ("Cathy", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# Write the DataFrame to ABFS as Parquet
try:
    df.write.parquet(abfs_path)
    print(f"Parquet file successfully written to {abfs_path}")
except Exception as e:
    print(f"Error writing Parquet file: {e}")

QUESTION: Is it possible to programmatically override api-version or specify it in the Spark configuration?

UPDATES:

11/25 - I have prepared changes for hadoop-azure and submitted a pull request github/apache/hadoop/pull/7186

Currently I'm using hadoop-azure-3.4.1 via pyspark library to connect to ABFS. According to the documentation - https://hadoop.apache./docs/stable/hadoop-azure/abfs.html#Azure_Managed_Identity - there is an option to use Managed Identity for authentication. This feature works perfectly from an Azure VM, but it fails on my local server with the following error:

Caused by: .apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException: HTTP Error 400; url='http://127.0.0.1:40342/metadata/identity/oauth2/token' AADToken: HTTP connection to http://127.0.0.1:40342/metadata/identity/oauth2/token failed for getting token from AzureAD.; contentType='application/json'; response '{"error":"invalid_request","error_description":"The api-version '2018-02-01' could not be found. Supported are 2021-02-01 2020-06-01 2019-11-01 2019-08-15","error_codes":[404],"timestamp":"2024-11-20 00:21:23.47342756 +0000 UTC m=+4427.811320871","trace_id":"","correlation_id":"efee3d6c-ddde-465c-96dd-d0c68c7f7ebd"}'

http://127.0.0.1:40342/metadata/identity/oauth2/token is the endpoint used to obtain tokens on machines connected to Azure through Arc: Azure official doc to use MI via ARC.

Based on hadoop-azure code, looks like this version is hardcoded: https://github/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/main/java//apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java#L176

My python code example:

from pyspark.sql import SparkSession

# Spark add-ons
path_to_hadoop_azure_jar = "/opt/hadoop-azure-3.4.1.jar"
path_to_hadoop_common_jar = "/opt/hadoop-common-3.4.1.jar"
path_to_azure_storage_jar = "/opt/azure-storage-8.6.6.jar"
path_to_azure_datalake_jar = "/opt/hadoop-azure-datalake-3.4.1.jar"

# ABFS variables
account_name = "pilotdbwsa"
container_name = "pilot-dbw"
container_path = "test1-test1-arc"
abfs_path = f"abfss://{container_name}@{account_name}.dfs.core.windows/{container_path}"

# Spark Session setup
spark = SparkSession.builder.appName("AzureDataRead") \
    .config("spark.jars", f"{path_to_hadoop_common_jar},{path_to_hadoop_azure_jar},{path_to_azure_storage_jar},{path_to_azure_datalake_jar}") \
    .getOrCreate()
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")

spark.conf.set(f"fs.azure.account.auth.type.{account_name}.dfs.core.windows", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{account_name}.dfs.core.windows", ".apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.msi.endpoint.{account_name}.dfs.core.windows", "http://127.0.0.1:40342/metadata/identity/oauth2/token")

# Logging
spark.sparkContext.setLogLevel("DEBUG")

# Create a simple DataFrame
data = [("Alice", 25), ("Bob", 30), ("Cathy", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# Write the DataFrame to ABFS as Parquet
try:
    df.write.parquet(abfs_path)
    print(f"Parquet file successfully written to {abfs_path}")
except Exception as e:
    print(f"Error writing Parquet file: {e}")

QUESTION: Is it possible to programmatically override api-version or specify it in the Spark configuration?

UPDATES:

11/25 - I have prepared changes for hadoop-azure and submitted a pull request github/apache/hadoop/pull/7186

Share Improve this question edited Nov 25, 2024 at 16:11 Sergei Varaksin asked Nov 20, 2024 at 3:18 Sergei VaraksinSergei Varaksin 133 bronze badges 4
  • 1 Have you tried to build /opt/hadoop-azure-3.4.1.jar with updated api version? – JayashankarGS Commented Nov 20, 2024 at 9:57
  • Thank you @JayashankarGS for your response. Not yet, gonna do it today (at least will it try since I'm a zero in Java). I hoped that there might be some easy workaround already – Sergei Varaksin Commented Nov 20, 2024 at 14:38
  • So...A simple update of api-version in hadoop-azure tool code did not help, since the mechanism for obtaining an access_token on VM and ARC is different. And the difference is that on ARC you need to first get an intermediate key from the endpoint 127.0.0.1:40342/metadata/identity/oauth2/token, and then from the same endpoint, using this key, get the main access_token. On VM, you immediately get an access_token. Obviously, this tool needs additional functionality. Does anyone know how I can open a ticket for enhancement? – Sergei Varaksin Commented Nov 20, 2024 at 22:15
  • You can also raise issue here issues.apache./jira/projects/HADOOP/issues/… asking to add functionality to get token for azure arc enabled servers. – JayashankarGS Commented Nov 21, 2024 at 8:39
Add a comment  | 

1 Answer 1

Reset to default 0

Yes, you are right getting access token from arc enabled servers is different from regular one.

It calls the endpoint for intermediate key and with basic authorization the access token is generated.

When making request to endpoint for the first time it should handle exception and using that metadata further request is made to get token, but in hadoop azure jar it is not handled raising error even after using supported Api version.

So, below are the possible workaround.

  1. Create a custom class in java where it has similar functionality like the code given here and use that class in spark.conf as for custom token provider like mentioned here

  2. For now you can use pandas to read/write the file with storage options giving default credentials.

First, run below command.

pip install fsspec,adlfs

code:

import pandas as pd

strg_opt = {'account_name': "jadls", 'anon': False}
p_df = pd.read_csv("abfs://data/csv/bulk.csv" ,storage_options=strg_opt)

df = spark.createDataFrame(p_df)
df.show()
p_df[["Date","Symbol"]].to_csv("abfs://data/csv/new.csv",storage_options=strg_opt)

Here, passing anon:False to storage_options uses default credential that is managed identity and creating spark dataframe from it.

Next, the path should be like in below format

abfs://{CONTAINER}/{FOLDER}/filename.csv

Output:

and in storage account

发布评论

评论列表(0)

  1. 暂无评论