Azure Event Hubs Latest
Scale applications based on Azure Event Hubs.
Trigger Specification
This specification describes the azure-eventhub
trigger for Azure Event Hubs.
triggers:
- type: azure-eventhub
metadata:
connectionFromEnv: EVENTHUB_CONNECTIONSTRING_ENV_NAME
storageConnectionFromEnv: STORAGE_CONNECTIONSTRING_ENV_NAME
consumerGroup: $Default
unprocessedEventThreshold: '64'
activationUnprocessedEventThreshold: '10'
blobContainer: 'name_of_container'
# Optional (Default: AzurePublicCloud)
cloud: Private
# Required when cloud = Private
endpointSuffix: servicebus.airgap.example
# Required when cloud = Private
storageEndpointSuffix: airgap.example
# Required when using pod identity authentication with blob storage
storageAccountName: 'name_of_account'
Parameter list:
connectionFromEnv
- Name of the environment variable your deployment uses to get the connection string appended withEntityPath=<event_hub_name>
. If the connection string does not end withEntityPath=<event_hub_name>
, then the parameterseventHubName
/eventHubNameFromEnv
must be used to provide the name of the Event Hub.storageConnectionFromEnv
- Name of the environment variable that provides connection string for Azure Storage Account to store checkpoint. As of now the Event Hub scaler only reads from Azure Blob Storage. (Only required when not using pod identity)consumerGroup
- Consumer group of Azure Event Hub consumer. (default:$default
, Optional)unprocessedEventThreshold
- Average target value to trigger scaling actions. (Default:64
, Optional)activationUnprocessedEventThreshold
- Target value for activating the scaler. Learn more about activation here. (Default:0
, Optional)blobContainer
- Container name to store checkpoint. This is needed for everycheckpointStrategy
except ofAzureFunction
. With Azure Functions theblobContainer
is autogenerated and cannot be overridden.eventHubNamespace
- Name of the Event Hub namespace which has the Event Hub. (Optional)eventHubNamespaceFromEnv
- Name of the environment variable that provides the name of the Event Hub namespace, which has the Event Hub. (Optional)eventHubName
- Name of the Event Hub containing the messages. (Optional)eventHubNameFromEnv
- Name of the environment variable that provides the name of the Event Hub, containing the messages. (Optional)storageAccountName
- Account name for blob storage used for checkpoints. (Required whenstorageConnectionFromEnv
is not specified. The storage account name is the first part in the hostname of a Blob Storage endpoint. E.g.: forexamplename.blob.core.windows.net
the account name isexamplename
.)checkpointStrategy
- configure the checkpoint behaviour of different Event Hub SDKs. (Values:azureFunction
,blobMetadata
,goSdk
, default:""
, Optional)azureFunction
- Suitable for Azure Functions & Azure WebJobs SDK. This is the default setting, whenblobcontainer
is not specified.blobMetadata
- For all implementations that store checkpoint information on blob metadata such as current C#, Python, Java and JavaScript Event Hub SDKs.goSdk
- For all implementations using the Golang SDK’s checkpointing.dapr
- Suitable for Dapr pubsub and bindings, depending on the used Dapr version:- pubsub components: >= Dapr 1.6 (older versions need the GoSdk checkpointer)
- binding components: >= Dapr 1.9 (older versions need the GoSdk checkpointer)
- When no checkpoint strategy is specified, the Event Hub scaler will use backwards compatibility and able to scale older implementations of C#, Python or Java Event Hub SDKs. (see “Legacy checkpointing”). If this behaviour should be used,
blobContainer
is also required.
cloud
- Name of the cloud environment that the Event Hub belongs to. (Values:AzurePublicCloud
,AzureUSGovernmentCloud
,AzureChinaCloud
,AzureGermanCloud
,Private
, Default:AzurePublicCloud
, Optional)endpointSuffix
- Service Bus endpoint suffix of the cloud environment. (Required whencloud
is set toPrivate
, e.g.servicebus.cloudapi.de
forAzureGermanCloud
).storageEndpointSuffix
- Blob Storage endpoint of the cloud environment. (Required whencloud
is set toPrivate
, e.g.airgap.example
. Do not include theblob
part of the endpoint.)
When using pod identity, Microsoft Entra ID endpoint is recovered via
AZURE_AUTHORITY_HOST
env var provided by https://azure.github.io/azure-workload-identity/docs/installation/mutating-admission-webhook.html
💡 Learn more about the checkpointing behaviour in this section.
💡 The Azure Storage connection string is not compatible with connection string created from a Shared Access Signature.
Authentication Parameters
You can authenticate by using pod identity or connection string authentication.
Connection String Authentication:
-
connection
- Connection string for the Azure Event Hubs Namespace.The following formats are supported.
- With SharedAccessKey -
Endpoint=sb://<sb>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>;EntityPath=<hub-name>
.
- With SharedAccessKey -
-
storageConnection
- Connection string for the Azure Storage Account used to store checkpoint information.
💡 When providing
connection
,EntityPath
is optional. If it is not provided, theneventHubName
must be used to provide the name of the Azure Event Hub instance to use inside the namespace.
Pod identity based authentication:
Azure AD Pod Identity or Azure AD Workload Identity providers can be used.
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: nameOfTriggerAuth
namespace: default
spec:
podIdentity:
provider: Azure | azure-workload
When you do so, the Event Hub scaler will depend on the existence of two configurations you have to provide: eventHubNamespace
and eventHubName
. You can also configure storageAccountName
if you wish to use Azure AD Pod / Workload Identity to authenticate to Azure Blob Storage instead of a connection string.
💡 When using Azure AD Pod Identity to authenticate the identity must have appropriate RBAC role-assignments for both Event Hub and Storage Account. Permissions covered by
Azure Event Hubs Data Receiver
andStorage Blob Data Reader
are required.
Checkpointing Behaviour
The list of available checkpointing strategies can be found in the trigger specification section. The way checkpoints are stored has changed with updates to the EventHub SDKs.
-
Legacy behaviour: The older implementations are based on the
EventProcessorHost
client, which stores the checkpoint information as contents of the storage blob. This is the default behaviour when nocheckpointStrategy
is specified. This is applicable for the following scenarios:- .NET applications using
Microsoft.Azure.EventHubs
NuGet package. - Java applications using
azure-eventhubs-eph
package. - Python applications using
azure-eventhub
package below v5.
- .NET applications using
-
Current behaviour: The newer implementations are based on the
EventProcessorClient
, which stores the checkpoint information as metadata on the storage blob. This is the behaviour whencheckpointStrategy
is set toblobMetadata
. This is applicable for the following scenarios:- .NET applications using
Azure.Messaging.EventHubs
NuGet package. - Python applications using
azure-eventhub
v5. - .NET Azure Functions using
Microsoft.Azure.WebJobs.Extensions.EventHubs
v5. - Azure Functions in other languages using
Microsoft.Azure.Functions.ExtensionBundle
v3.
- .NET applications using
💡
blobContainer
name is required for applications following legacy behaviour.
💡 Users should set
blobContainer
toazure-webjobs-eventhub
for Azure Functions usingblobMetadata
ascheckpointStrategy
.
Example
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: azure-eventhub-scaledobject
namespace: default
spec:
scaleTargetRef:
name: azureeventhub-function
triggers:
- type: azure-eventhub
metadata:
# Required
storageConnectionFromEnv: AzureWebJobsStorage
# Required if not using Pod Identity
connectionFromEnv: EventHub
# Required if using Pod Identity
eventHubNamespace: AzureEventHubNameSpace
eventHubName: NameOfTheEventHub
# Optional
consumerGroup: $Default # default: $Default
unprocessedEventThreshold: '64' # default 64 events.
blobContainer: ehcontainer