Automatically provision NServiceBus Service Bus Function endpoint topology
2021-01-19 update: code for subscription was adjusted to ensure the correct default rule for subscription is created.
In the previous post, Automatically create Service Bus trigger queue for Azure Function, I've shown how to provision a ServiceBusTrigger
queue from within a Function.
In this post, we'll take that idea and push it further to something a bit more sophisticated - provisioning the topology necessary for NServiceBus endpoint hosted with Azure Function and using Azure Service Bus transport. If you haven't used NServiceBus or NServiceBus with Azure Functions, here's a starting point for you. NServiceBus can bring a few advantages over native Functions I'll leave to discover on your own. And now, let's have a look at what are the things we'll need to accomplish.
Just as with the native Azure Function, a logical endpoint is represented by an input queue. That input queue needs to be created.
Next, NServiceBus has centralized error and audit queues. While those are not difficult to create, it's more convenient to have those queues created by the first starting endpoint.
Last is the pub/sub infrastructure. Azure Service Bus transport has a specific topology all endpoints adhere to. That includes a centralized topic, by default named bundle-1
and each logical endpoint as a subscription. Upon startup, each endpoint subscribes to the events it's interested in using this infrastructure.
With this information, let's start putting the pieces needed for the whole thing to work together.
Discovering endpoints
As there might be one or more logical endpoints, the hard-coding queue name as it was done in the previous post is not ideal. An alternative would be to reflect the endpoint's name (queue name) at runtime when the Function App is bootstrapping everything.
var attribute = Assembly.GetExecutingAssembly().GetTypes()
.SelectMany(t => t.GetMethods())
.Where(m => m.GetCustomAttribute<FunctionNameAttribute>(false) != null)
.SelectMany(m => m.GetParameters())
.SelectMany(p => p.GetCustomAttributes<ServiceBusTriggerAttribute>(false))
.FirstOrDefault();
With this code, we'll discover all ServiceBusTriggerAttribute
applied to Azure Service Bus triggered functions. For each of these attributes, we'll have to
- Create a queue if it doesn't exist
- Create a subscription if it doesn't exist
The caveat is that a subscription can only be created when a topic is found. Therefore a topic needs to be created first. Also, to make the topology work as the transport expects, each subscription should be auto-forwarding messages to the input queue it's associated with. And finally, the audit and error queues can be provisioned as well, completing the topology work necessary for each endpoint to be bootstrapped.
Putting it together
Here's the helper method we'd be using:
static async Task CreateTopologyWithReflection(IConfiguration configuration, string topicName = "bundle-1", string auditQueue = "audit", string errorQueue = "error")
{
var connectionString = configuration.GetValue<string>("AzureWebJobsServiceBus");
var managementClient = new ManagementClient(connectionString);
var attribute = Assembly.GetExecutingAssembly().GetTypes()
.SelectMany(t => t.GetMethods())
.Where(m => m.GetCustomAttribute<FunctionNameAttribute>(false) != null)
.SelectMany(m => m.GetParameters())
.SelectMany(p => p.GetCustomAttributes<ServiceBusTriggerAttribute>(false))
.FirstOrDefault();
if (attribute == null)
{
throw new Exception("No endpoint was found");
}
// there are endpoints, create a topic
if (!await managementClient.TopicExistsAsync(topicName))
{
await managementClient.CreateTopicAsync(topicName);
}
var endpointQueueName = attributes.First().QueueName;
if (!await managementClient.QueueExistsAsync(endpointQueueName))
{
await managementClient.CreateQueueAsync(endpointQueueName);
}
if (!await managementClient.SubscriptionExistsAsync(topicName, endpointQueueName))
{
var subscriptionDescription = new SubscriptionDescription(topicName, endpointQueueName)
{
ForwardTo = endpointQueueName,
UserMetadata = $"Events {endpointQueueName} subscribed to"
};
var ruleDescription = new RuleDescription
{
Filter = new FalseFilter()
};
await managementClient.CreateSubscriptionAsync(subscriptionDescription, ruleDescription);
}
if (!await managementClient.QueueExistsAsync(auditQueue))
{
await managementClient.CreateQueueAsync(auditQueue);
}
if (!await managementClient.QueueExistsAsync(errorQueue))
{
await managementClient.CreateQueueAsync(errorQueue);
}
}
Next, this helper method needs to be involved in the Startup class:
[assembly: FunctionsStartup(typeof(Startup))]
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
CreateTopology(builder.GetContext().Configuration).GetAwaiter().GetResult();
builder.UseNServiceBus(() =>
{
var configuration = new ServiceBusTriggeredEndpointConfiguration(AzureServiceBusTriggerFunction.EndpointName);
configuration.Transport.SubscriptionRuleNamingConvention(type => type.Name);
return configuration;
});
}
}
In my test solutions, I've defined an endpoint named ASBEndpoint
(AzureServiceBusTriggerFunction.EndpointName
is assigned the name). Once Azure Function hosting the endpoint is deployed, the following topology is created:
with the correct forwarding to the input queue
Subscribing to events
In the endpoint, I've added an event and event handler.
public class SimpleEvent : IEvent { }
public class SimpleEventHandler : IHandleMessages<SimpleEvent>
{
readonly ILogger<SimpleEvent> logger;
public SimpleEventHandler(ILogger<SimpleEvent> logger)
{
this.logger = logger;
}
public Task Handle(SimpleEvent message, IMessageHandlerContext context)
{
logger.LogInformation($"{nameof(SimpleEventHandler)} invoked");
return Task.CompletedTask;
}
}
NServiceBus automatically picks up and subscribes to all the events it finds handlers for. The subscription is expressed as a rule for each event. But this only happens when an endpoint is activated. This is not the case with message triggered Function endpoint. Luckily, there's a trick with TimerTrigger
we can apply.
Timer trigger trick
Normally, TimerTirgger
is executed periodically using a schedule defined using the CRON expression. In addition to that, there's also a flag to force a time-triggered function to run a single time when a timer triggered function is deployed. With this option, we can leverage a timer triggered function to run once upon deployment and stay dormant for a year. When the function executes, it will dispatch the ForceAutoSubscription
control message and cause the endpoint to load and auto-subscribe to the SimpleEvent
.
Control message definition:
public class ForceAutoSubscription : IMessage { }
Timer function:
public class TimerFunc
{
readonly IFunctionEndpoint functionEndpoint;
public TimerFunc(IFunctionEndpoint functionEndpoint)
{
this.functionEndpoint = functionEndpoint;
}
[FunctionName("TimerFunc")]
public async Task Run([TimerTrigger("* * * 1 1 *", RunOnStartup = true)]TimerInfo myTimer,
ILogger logger, ExecutionContext executionContext)
{
var sendOptions = new SendOptions();
sendOptions.SetHeader(Headers.ControlMessageHeader, bool.TrueString);
sendOptions.SetHeader(Headers.MessageIntent, MessageIntentEnum.Send.ToString());
sendOptions.RouteToThisEndpoint();
await functionEndpoint.Send(new ForceAutoSubscription(), sendOptions, executionContext, logger);
}
}
Note: ForceAutoSubscription
is a control message and will neither require a message handler to be defined nor will it cause recoverability to be executed.
The final result is what we needed. The endpoint is subscribed to SimpleEvent
, and it's part of the topology. This means there's a rule under the endpoint's subscription.
Summary
With this in place, we can bootstrap NServiceBus Function hosted endpoint using Azure Service Bus transport (preview 0.5 and later) w/o the need to manually provision the topology.
P.S.: if you're interested in Azure Functions supporting an opt-in queue creation, here's a feature request you could upvote.