最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

c# - Azure Blob Storage client-side decryption using Python - Stack Overflow

programmeradmin3浏览0评论

I want to decrypt blobs encrypted with client-side encryption using the following C# code, which is running in Production, and follows the Microsoft C# examples:

public static IServiceCollection AddEncryptedFileStorage<T>(this IServiceCollection services, string configurationSectionName) where T : class, IFileStorage
    {
        string clientName = configurationSectionName;
        services.AddAzureClients(clientBuilder =>
        {
            clientBuilder.AddClient<BlobServiceClient, BlobClientOptions>((options, credential, serviceProvider) =>
            {
                var configuration = serviceProvider.GetRequiredService<IConfiguration>();
                var configurationSection = configuration.GetSection(configurationSectionName);
 
                var storageAccountUri = FileStorageHelper.GetStorageAccountUri(configurationSection.GetRequiredValue<string>("StorageAccountName"));
                string encryptionKeyId = configurationSection.GetValue<string>("EncryptionKeyId")!;
 
                var keyClient = serviceProvider.GetRequiredService<KeyClient>();

                KeyVaultKey key = keyClient.GetKey(encryptionKeyId, cancellationToken: CancellationToken.None);

 
                var encryptionOptions = new ClientSideEncryptionOptions(ClientSideEncryptionVersion.V2_0)
                {
                    KeyEncryptionKey = new CryptographyClient(key.Id, credential),
                    KeyResolver = new KeyResolver(credential),
                    KeyWrapAlgorithm = "RSA-OAEP"
                };
 
                var blobClientOptions = new SpecializedBlobClientOptions
                {
                    ClientSideEncryption = encryptionOptions,
                    Retry = { Delay = options.Retry.NetworkTimeout, MaxRetries = options.Retry.MaxRetries }
                };
 
                return new BlobServiceClient(storageAccountUri, credential, blobClientOptions);
            }).WithName(clientName);
        });
        services.TryAddSingleton(typeof(T), sp => FileStorageFactory(sp, clientName, configurationSectionName));
 
        services.AddHealthChecks().AddBlobStorage(clientName);
 
        return services;
    }

Following the Microsoft Python docs, I tried running the following code, which prints data if encryption_options aren't set, but gives otherwise this error:

HttpResponseError: Decryption failed.
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c5de91e5-86a9-4892-ae41-922218e4545e/lib/python3.10/site-packages/azure/storage/blob/_download.py:70, in process_content(data, start_offset, end_offset, encryption)
     69 try:
---> 70     return decrypt_blob(
     71         encryption.get("required") or False,
     72         encryption.get("key"),
     73         encryption.get("resolver"),
     74         content,
     75         start_offset,
     76         end_offset,
     77         data.response.headers,
     78     )
     79 except Exception as error:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c5de91e5-86a9-4892-ae41-922218e4545e/lib/python3.10/site-packages/azure/storage/blob/_encryption.py:905, in decrypt_blob(require_encryption, key_encryption_key, key_resolver, content, start_offset, end_offset, response_headers)
    903     raise ValueError('Specified encryption version is not supported.')
--> 905 content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
    907 if version == _ENCRYPTION_PROTOCOL_V1:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c5de91e5-86a9-4892-ae41-922218e4545e/lib/python3.10/site-packages/azure/storage/blob/_encryption.py:658, in _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
    657 if not hasattr(key_encryption_key, 'get_kid') or not callable(key_encryption_key.get_kid):
--> 658     raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'get_kid'))
    659 if not hasattr(key_encryption_key, 'unwrap_key') or not callable(key_encryption_key.unwrap_key):
AttributeError: key encryption key does not define a complete interface. Value of get_kid is either missing or invalid.
from azure.storage.blob import BlobServiceClient  
from azure.identity import DefaultAzureCredential, DeviceCodeCredential  
from azure.keyvault.keys import KeyClient  
from azure.keyvault.keys.crypto import CryptographyClient
from azure.keyvault.keys.crypto import KeyWrapAlgorithm  
from azure.core.exceptions import ResourceNotFoundError  

credential = DeviceCodeCredential(additionally_allowed_tenants =["<tenant id>"]) 
  
def configure_blob_service_client(key_id, cred):  
    storage_account_name = "<storage_name>"  
    storage_account_uri = f"https://{storage_account_name}.blob.core.windows"  

    encryption_options = {
        #"require_encryption": True,
        "key_encryption_key": CryptographyClient(key_id, credential),#ExtendedCryptographyClient(key.id , credential),
        "encryption_version": '2.0',
        "key_wrap_algorithm": KeyWrapAlgorithm.rsa_oaep
    }
      
    blob_service_client = BlobServiceClient(account_url=storage_account_uri, credential=cred, **encryption_options)  
    return blob_service_client

key_vault_name = f"<key_vault_name>"  
key_vault_uri = f"https://{key_vault_name}.vault.azure"  
    

key_client = KeyClient(vault_url=key_vault_uri, credential=credential)  
    
key_name = "<key_name>/<key_version>"
key = key_client.get_key(key_name)  
display(key.id)

blob_service_client = configure_blob_service_client(key.id, credential)  
  
blob_client = blob_service_client.get_blob_client(container="<container_name>", blob="<blob_name>")  

downloaded = blob_client.download_blob()  
blob_data = downloaded.readall()  
  
blob_data

So the SDK expects the key encryption key to have a get_kid() method (SDK source code), but the SDK's CryptographyClient doesn't have such a method.

Tried implementing the get_kid() method myself, which leads to other issues, so probably not the way to go

I want to decrypt blobs encrypted with client-side encryption using the following C# code, which is running in Production, and follows the Microsoft C# examples:

public static IServiceCollection AddEncryptedFileStorage<T>(this IServiceCollection services, string configurationSectionName) where T : class, IFileStorage
    {
        string clientName = configurationSectionName;
        services.AddAzureClients(clientBuilder =>
        {
            clientBuilder.AddClient<BlobServiceClient, BlobClientOptions>((options, credential, serviceProvider) =>
            {
                var configuration = serviceProvider.GetRequiredService<IConfiguration>();
                var configurationSection = configuration.GetSection(configurationSectionName);
 
                var storageAccountUri = FileStorageHelper.GetStorageAccountUri(configurationSection.GetRequiredValue<string>("StorageAccountName"));
                string encryptionKeyId = configurationSection.GetValue<string>("EncryptionKeyId")!;
 
                var keyClient = serviceProvider.GetRequiredService<KeyClient>();

                KeyVaultKey key = keyClient.GetKey(encryptionKeyId, cancellationToken: CancellationToken.None);

 
                var encryptionOptions = new ClientSideEncryptionOptions(ClientSideEncryptionVersion.V2_0)
                {
                    KeyEncryptionKey = new CryptographyClient(key.Id, credential),
                    KeyResolver = new KeyResolver(credential),
                    KeyWrapAlgorithm = "RSA-OAEP"
                };
 
                var blobClientOptions = new SpecializedBlobClientOptions
                {
                    ClientSideEncryption = encryptionOptions,
                    Retry = { Delay = options.Retry.NetworkTimeout, MaxRetries = options.Retry.MaxRetries }
                };
 
                return new BlobServiceClient(storageAccountUri, credential, blobClientOptions);
            }).WithName(clientName);
        });
        services.TryAddSingleton(typeof(T), sp => FileStorageFactory(sp, clientName, configurationSectionName));
 
        services.AddHealthChecks().AddBlobStorage(clientName);
 
        return services;
    }

Following the Microsoft Python docs, I tried running the following code, which prints data if encryption_options aren't set, but gives otherwise this error:

HttpResponseError: Decryption failed.
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c5de91e5-86a9-4892-ae41-922218e4545e/lib/python3.10/site-packages/azure/storage/blob/_download.py:70, in process_content(data, start_offset, end_offset, encryption)
     69 try:
---> 70     return decrypt_blob(
     71         encryption.get("required") or False,
     72         encryption.get("key"),
     73         encryption.get("resolver"),
     74         content,
     75         start_offset,
     76         end_offset,
     77         data.response.headers,
     78     )
     79 except Exception as error:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c5de91e5-86a9-4892-ae41-922218e4545e/lib/python3.10/site-packages/azure/storage/blob/_encryption.py:905, in decrypt_blob(require_encryption, key_encryption_key, key_resolver, content, start_offset, end_offset, response_headers)
    903     raise ValueError('Specified encryption version is not supported.')
--> 905 content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
    907 if version == _ENCRYPTION_PROTOCOL_V1:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c5de91e5-86a9-4892-ae41-922218e4545e/lib/python3.10/site-packages/azure/storage/blob/_encryption.py:658, in _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
    657 if not hasattr(key_encryption_key, 'get_kid') or not callable(key_encryption_key.get_kid):
--> 658     raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'get_kid'))
    659 if not hasattr(key_encryption_key, 'unwrap_key') or not callable(key_encryption_key.unwrap_key):
AttributeError: key encryption key does not define a complete interface. Value of get_kid is either missing or invalid.
from azure.storage.blob import BlobServiceClient  
from azure.identity import DefaultAzureCredential, DeviceCodeCredential  
from azure.keyvault.keys import KeyClient  
from azure.keyvault.keys.crypto import CryptographyClient
from azure.keyvault.keys.crypto import KeyWrapAlgorithm  
from azure.core.exceptions import ResourceNotFoundError  

credential = DeviceCodeCredential(additionally_allowed_tenants =["<tenant id>"]) 
  
def configure_blob_service_client(key_id, cred):  
    storage_account_name = "<storage_name>"  
    storage_account_uri = f"https://{storage_account_name}.blob.core.windows.net"  

    encryption_options = {
        #"require_encryption": True,
        "key_encryption_key": CryptographyClient(key_id, credential),#ExtendedCryptographyClient(key.id , credential),
        "encryption_version": '2.0',
        "key_wrap_algorithm": KeyWrapAlgorithm.rsa_oaep
    }
      
    blob_service_client = BlobServiceClient(account_url=storage_account_uri, credential=cred, **encryption_options)  
    return blob_service_client

key_vault_name = f"<key_vault_name>"  
key_vault_uri = f"https://{key_vault_name}.vault.azure.net"  
    

key_client = KeyClient(vault_url=key_vault_uri, credential=credential)  
    
key_name = "<key_name>/<key_version>"
key = key_client.get_key(key_name)  
display(key.id)

blob_service_client = configure_blob_service_client(key.id, credential)  
  
blob_client = blob_service_client.get_blob_client(container="<container_name>", blob="<blob_name>")  

downloaded = blob_client.download_blob()  
blob_data = downloaded.readall()  
  
blob_data

So the SDK expects the key encryption key to have a get_kid() method (SDK source code), but the SDK's CryptographyClient doesn't have such a method.

Tried implementing the get_kid() method myself, which leads to other issues, so probably not the way to go

Share Improve this question edited Jan 20 at 14:40 Robiche asked Jan 20 at 14:39 RobicheRobiche 11 bronze badge
Add a comment  | 

1 Answer 1

Reset to default 0

Solution was simply to extend the CryptoGraphy client...

class ExtendedCryptographyClient:  
def __init__(self, key_id, credential):  
    self.cryptography_client = CryptographyClient(key_id, credential)  
    self.key_id = key_id  

def wrap_key(self, key: bytes) -> bytes:  
    # Implement wrapping logic using cryptography_client  
    wrap_result = self.cryptography_client.wrap_key(KeyWrapAlgorithm.rsa_oaep, key)  
    return wrap_result.encrypted_key  

def unwrap_key(self, key: bytes, algorithm: str) -> bytes:  
    # Implement unwrapping logic using cryptography_client  
    unwrap_result = self.cryptography_client.unwrap_key(algorithm, key)  
    return unwrap_result.key  

def get_kid(self) -> str:  
    # Return the key ID  
    return self.key_id  

def get_key_wrap_algorithm(self) -> str:  
    # Return the key wrap algorithm used  
    return KeyWrapAlgorithm.rsa_oaep
发布评论

评论列表(0)

  1. 暂无评论