I. Introduction
In general, the message producer is the component that helps in submitting a message, and the consumer is the one that helps in receiving it. This concept is not limited to messaging over networks; the components can be async processes on the same machine using a memory or disk-based messaging channel. The common feature of such systems is that the two components are loosely coupled using a channel (more on those in the next chapter) and are independent of each other. This makes it possible to add any number of consumers for the same producer or even replace one consumer with another without the need to redeploy everything
.
In a way, this is the basis and driving force of MicroService architectures.
In Carby Job Manager, we will connect small Azure Functions with custom messaging attributes to form a workflow system. So in our case, the users of both the producers and consumers will be Azure Functions.
In this chapter, we’ll walk through the concepts of how to design a producer and consumer of messages and will try to implement some of these techniques to be able to pick the ones that will suit our requirements.
II. Expected Behaviour
As stated above, message producers are the components that handle the task of sending messages to the messaging channel. They usually appear as an interface with at least a send method available:
interface IMessageProducer {
public Task SendAsync(
object message,
MessageProducerOptions options = default
);
}
This method receives the message generated by the sending application, with options describing how the message should be transported (guaranteed delivery, exactly-once/at-least-once/at-most-once delivery, etc., more on later).
Internally the message might be wrapped into a DTO object containing metadata attached by the producer. These can contain vital information on handling the message on the consuming side and common sense data like timestamps when the message was sent and when it was published to the channel, message ids, correlation ids, and so on.
From a producer’s view transport of a message can happen in two main ways:
Point-to-Point or one-to-one, where the sender has only one consumer endpoint and sends the message directly to a queue from which it will be processed.
Publish/Subscribe, or one to many, where the sender has a list of consumers and sends the message to all subscribers for processing.
According to different needs and practices, it is not the clear responsibility of the producer to decide how to behave. While the Enterprise Integration Patterns lists many consumer types, these producer types and many others are listed under the Message Channel patterns.
III. Usage Example
Carby Job Manager producer will behave as a point-to-point sender knowing the target where the message needs to be delivered. Later we’ll design our consumer accordingly. The next chapter will include how to decouple the producer from the consumer.
To use this producer, we’ll use an HttpTrigger Function so that we’ll be able to test it. We want the following architecture in use:
public class MessageProducerDemo
{
[FunctionName(nameof(SendMessageWhenTriggered))]
public async Task SendMessageWhenTriggered(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "message/producer/demo")] HttpRequest req,
[MessageProducer("message-target")] IMessageProducer messageProducer
)
{
await messageProducer.SendAsync(new Message("Hello World!"));
}
}
From the business value side not much value in this snippet, but it already has the two main facets we wish to implement as part of a message producer.
IMessageProducer
interface that can be injected and used to send messages regardless of the transport channels, protocols, etc. It makes it easier to test our function in isolation, as we can substitute it in unit tests.MessageProducer
attribute. We’ll use this to inject the implementation into the function method.
We define the interface just as shown above. We create an empty class for the MessageProducerOptions
.
public class MessageProducerOptions
{
}
And a dummy attribute class MessageProducerAttribute
with a string constructor so that the above compiles fine.
public class MessageProducerAttribute : Attribute
{
public string Target { get; init; }
public MessageProducerAttribute(string target)
{
Target = target;
}
}
Though it compiles, we’re still not done. We need to implement a few support classes so that Azure Functions can recognise our new attributes and inject the proper implementation.
IV. Bindings to Azure Functions Runtime
On different Microsoft pages and tutorials, we can find many articles on how to bind attributes to the functions runtime, but most are already outdated. By trial and error, I came up with the following classes required for the above attribute to make it work.
First, add the following attributes to the MessageProducerAttribute
.
[AttributeUsage( AttributeTargets.Parameter)]
[Binding]
Binding comes from the Microsoft.Azure.WebJobs.Core Nuget package. Then we need to register the binding with an extension. This requires two classes.
An ExtensionConfigProvider
:
internal sealed class MessageProducerExtensionConfigProvider : IExtensionConfigProvider
{
public void Initialize(ExtensionConfigContext context)
{
var bindingRule = context.AddBindingRule<MessageProducerAttribute>();
bindingRule.BindToInput(CreateMessageProducerFromAttribute);
}
private IMessageProducer CreateMessageProducerFromAttribute(MessageProducerAttribute arg)
{
return new PointToPointMessageProducer(arg.Target);
}
}
And the IMessageProducer
implementation:
internal class PointToPointMessageProducer : IMessageProducer
{
public PointToPointMessageProducer(string target)
{
throw new NotImplementedException();
}
public Task SendAsync(object message, MessageProducerOptions? options = default)
{
throw new NotImplementedException();
}
}
To simplify our life later, we create the skeleton for registering the extension with the WebJobs runtime that runs under Azure Functions. CarbyJobManagerExtensions
is a static extension class that we’ll use to add in all the different bindings we create:
public static class CarbyJobManagerExtensions
{
public static IWebJobsBuilder AddCarbyExtensions(this IWebJobsBuilder builder)
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}
builder.AddExtension<MessageProducerExtensionConfigProvider>();
return builder;
}
}
And CarbyJobManagerStartup
is a class that will be used to automatically register our custom bindings with the help of the assembly annotation. Without this, we need to add the above extension method to every project that wishes to use this library.
[assembly: WebJobsStartup(typeof(CarbyJobManagerStartup.CarbyStartup))]
namespace Carby.JobManager.Functions;
public class CarbyJobManagerStartup
{
public class CarbyStartup : IWebJobsStartup
{
public void Configure(IWebJobsBuilder builder)
{
builder.AddCarbyExtensions();
}
}
}
With these in place, the Azure Function project we started with as the expected design not just compiles, but the function actually starts up successfully, meaning that our binding for IMessageProducer was installed properly.
If you try to invoke the HttpTrigger using Postman, browser or whatever tool you use to test REST APIs, it will still throw an exception. Although everything binds up as expected, our producer is still not implemented. Notice the thrown exceptions in the class above.
V. Fill in the Gaps
Finally, the part where we implement how our producer will work. If we were to use only Azure Storage Queues, we could have injected the QueueClient directly into the function. But with Carby Job Manager, we wish to overcome one limitation with the message size by splitting the process into two steps:
Store the message on Azure Blob Storage
Send a control message to Azure Storage Queue with information on where the message can be found
To achieve this, replace the contents of PointToPointMessageProducer class with:
internal class PointToPointMessageProducer : IMessageProducer
{
private readonly string _target;
private readonly IAzureStorageFactory _azureStorageFactory;
public PointToPointMessageProducer(
string target,
IAzureStorageFactory azureStorageFactory
)
{
_target = target;
_azureStorageFactory = azureStorageFactory;
}
public async Task SendAsync(
object message,
MessageProducerOptions? options = default
)
{
// Generate unique message id
var messageId = Guid.NewGuid();
// Store message to blob storage
await AddMessageToBlobStorage(messageId, message);
// Send control message to queue
await SendControlMessageToTarget(messageId);
}
private async Task SendControlMessageToTarget(Guid messageId)
{
var queue =
await _azureStorageFactory.CreateQueueClient(_target);
await queue.SendMessageAsync(
new BinaryData(
JsonSerializer.Serialize(
new ControlMessage
{
MessageId = messageId
}
)
)
);
}
private async Task AddMessageToBlobStorage(
Guid messageId,
object message
)
{
var blobName = $"{messageId}.json";
var containerName = $"msg-{_target}";
var blobClient =
await _azureStorageFactory.CreateBlobClient(containerName,
blobName);
await blobClient.UploadAsync(
new BinaryData(JsonSerializer.Serialize(message))
);
}
}
As you can see, we use IAzureStorageFactory
to gain access to Azure Blob Storage and Azure Storage Queue. Here’s the interface:
internal interface IAzureStorageFactory
{
Task<BlobClient> CreateBlobClient(
string containerName,
string blobName
);
Task<QueueClient> CreateQueueClient(string queueName);
}
And the implementation:
internal sealed class AzureStorageFactory : IAzureStorageFactory
{
private static string GetStorageConnection()
=> Environment.GetEnvironmentVariable(
"CarbyJobManager:StorageAccountConnection")!;
public async Task<BlobClient> CreateBlobClient(
string containerName,
string blobName
)
{
var containerClient = new BlobContainerClient(GetStorageConnection(), containerName);
await containerClient.CreateIfNotExistsAsync();
var blobClient = containerClient.GetBlobClient(blobName);
return blobClient;
}
public async Task<QueueClient> CreateQueueClient(string queueName)
{
var queueClient = new QueueClient(GetStorageConnection(), queueName.ToLowerInvariant());
await queueClient.CreateIfNotExistsAsync();
return queueClient;
}
}
As the message producer class is generated on demand by an extension that handles the MessageProducer attribute, we cannot use dependency injection directly to pass the factory. So we inject it into the MessageProducerExtensionConfigProvider
. Changes to the class are highlighted in bold.
internal sealed class MessageProducerExtensionConfigProvider : IExtensionConfigProvider
{
private readonly IAzureStorageFactory _azureStorageFactory;
public MessageProducerExtensionConfigProvider(IAzureStorageFactory azureStorageFactory)
{
_azureStorageFactory = azureStorageFactory;
}
public void Initialize(ExtensionConfigContext context)
{
var bindingRule = context.AddBindingRule<MessageProducerAttribute>();
bindingRule.BindToInput(CreateMessageProducerFromAttribute);
}
private IMessageProducer CreateMessageProducerFromAttribute(MessageProducerAttribute producerAttribute)
{
return new PointToPointMessageProducer(producerAttribute.Target, _azureStorageFactory);
}
}
To enable DI, we also need to add it to the services, as usual, with Azure Functions.
public static class CarbyJobManagerExtensions
{
public static IWebJobsBuilder AddCarbyExtensions(this IWebJobsBuilder builder)
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}
builder.AddExtension<MessageProducerExtensionConfigProvider>();
builder.Services.AddSingleton<IAzureStorageFactory, AzureStorageFactory>();
return builder;
}
}
Now if we add the following setting to the local.settings.json
the function should start, and on invocation of the only public endpoint, we should see a “Hello World!“ message in the blob storage and a related control message in the “msg-message-target“ queue.
"CarbyJobManager:StorageAccountConnection": "UseDevelopmentStorage=true",
VI. Conclusion
As you can see, with such little code, we can send messages of any size to a remote consumer. Most of the classes are there to enable our custom attribute so Azure Functions runtime will know how to handle it. With these in place, we can inject IMessageProducer
into any function, and don’t need to bother about how to handle the underlying communication channels.
In the next chapter, we’ll look at how to create consumers. That will be a longer journey, but if you stay with me, you’ll see the values we’ll gain by having only these two concepts already.
Subscribe to get a notification when it’s ready, or follow me on LinkedIn.