I am working on an Azure Function that uses a ServiceBusTrigger and queries Azure Table Storage. In order to process multiple messages as quickly as possible we're using the MaxConcurrentCalls setting to enable parallel message processing (e.g. we set MaxConcurrentCalls to 200). We're using Managed Identity to access the Service bus and the Azure Table Storage via DefaultAzureCredential.
Performance testing shows that multiple instances of the Function app are instantiated and are processing messages as expected, however each instance makes a call to the Azure /msi/token endpoint to obtain a ManagedIdentityCredential, and this call is the bottleneck, taking anywhere from 200ms to 5000ms. I.e using the above setting, if 200 messages get dropped onto the service bus then 200 "instances" of the azure function will start processing them, and make 200 calls to get a ManagedIdentityCredential.
- What is the mechanism behind how Azure functions are processing messages concurrently, will it create multiple processes or multiple threads within the same process?
- is there a way to share/cache the credential once it's obtained to be used by the other message processing instances as well and eliminate the redundant calls to /msi/token ?
We're using Python as programming language.
this is the code to initialize the Azure resources
# helper function to initialize global table service client
def init_azure_resource_clients(config_settings: EligibilitySettings):
"""get table service client for Azure Table Storage and service bus client"""
non_aio_credential = DefaultAzureCredential()
# initialize global Service Bus client
global _azure_servicebus_client
_azure_servicebus_client = ServiceBusClient(fully_qualified_namespace=config_settings.serviceBusNamespaceFQ, credential=non_aio_credential)
# initialize global Table Service Client
global _azure_table_service_client
# prefer connection string if available
if config_settings.tableStorageConnectionString:
_azure_table_service_client = TableServiceClient.from_connection_string(conn_str=config_settings.tableStorageConnectionString)
else:
_azure_table_service_client = TableServiceClient(endpoint=f"https://{config_settings.tableStorageAccount}.table.core.windows", credential=non_aio_credential)
And here is some sample code on how is it called:
import logging
import azure.functions as func
# global reference to the azure resources we need to access
_azure_table_service_client = None
_azure_servicebus_client = None
app = func.FunctionApp()
@app.function_name(name="ServiceBusQueueTrigger1")
@app.service_bus_queue_trigger(arg_name="msg",
queue_name="<QUEUE_NAME>",
connection="<CONNECTION_SETTING>")
def test_function(msg: func.ServiceBusMessage):
logging.info('ServiceBus queue trigger processed message: %s',
msg.get_body().decode('utf-8'))
# initialize global azure resources
init_azure_resource_clients(config_settings)
# parse incoming message
message_body = msg.get_body().decode('utf-8')
message_json = json.loads(message_body)
result = process_message(message_json)