Azure Service Bus Messaging
Azure Service Bus Messaging
Azure Service Bus is a multi-tenant cloud messaging service which you can use to send information between applications and services.
The asynchronous operations give you flexible, brokered messaging, along with structured first-in, first-out (FIFO) messaging, and publish/subscribe capabilities.
Let’s Understand What is Azure Service Bus?
Microsoft Azure Service Bus is a fully managed enterprise integration message broker. Service Bus is most commonly used to decouple applications and services from each other and is a reliable and secure platform for asynchronous data and state transfer. Data is transferred between different applications and services using messages. A message is in binary format, which can contain JSON, XML, or just text.
Some common messaging scenarios are:
- Messaging: transfer business data, such as sales or purchase orders, journals, or inventory movements.
- Decouple applications: improve reliability and scalability of applications and services (client and service do not have to be online at the same time).
- Topics and subscriptions: enable 1:n relationships between publishers and subscribers.
- Message sessions: implement workflows that require message ordering or message deferral.
Namespaces
A namespace is a scoping container for all messaging components. Multiple queues and topics can reside within a single namespace, and namespaces often serve as application containers.
Queues
Messages are sent to and received from queues. Queues enable you to store messages until the receiving application is available to receive and process them.
Messages in queues are ordered and timestamped on arrival. Once accepted, the message is held safely in redundant storage. Messages are delivered in pull mode, which delivers messages on request.
Topics
You can also use topics to send and receive messages. While a queue is often used for point-to-point communication, topics are useful in publish/subscribe scenarios.
Topics can have multiple, independent subscriptions. A subscriber to a topic can receive a copy of each message sent to that topic. Subscriptions are named entities, which are durably created but can optionally expire or auto-delete.
In some scenarios, you may not want individual subscriptions to receive all messages sent to a topic. If so, you can use rules and filters to define conditions that trigger optional actions, filter specified messages and set or modify message properties.
Advanced features
Service Bus also has advanced features that enable you to solve more complex messaging problems. The following sections describe these key features:
Message sessions
To realize a first-in, first-out (FIFO) guarantee in Service Bus, use sessions. Message sessions enable joint and ordered handling of unbounded sequences of related messages.
Auto-forwarding
The auto-forwarding feature enables you to chain a queue or subscription to another queue or topic that is part of the same namespace. When auto-forwarding is enabled, Service Bus automatically removes messages that are placed in the first queue or subscription (source) and puts them in the second queue or topic (destination).
Dead-lettering
Service Bus supports a dead-letter queue (DLQ) to hold messages that cannot be delivered to any receiver, or messages that cannot be processed. You can then remove messages from the DLQ and inspect them.
Scheduled delivery
You can submit messages to a queue or topic for delayed processing; for example, to schedule a job to become available for processing by a system at a certain time.
Message deferral
When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances within the application, the entity has the option to defer retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.
Batching
Client-side batching enables a queue or topic client to delay sending a message for a certain period of time. If the client sends additional messages during this time period, it transmits the messages in a single batch.
Transactions
A transaction groups two or more operations together into an execution scope. Service Bus supports grouping operations against a single messaging entity (queue, topic, subscription) within the scope of a transaction.
Filtering and actions
Subscribers can define which messages they want to receive from a topic. These messages are specified in the form of one or more named subscription rules. For each matching rule condition, the subscription produces a copy of the message, which may be differently annotated for each matching rule.
Auto-delete on idle
Auto-delete on idle enables you to specify an idle interval after which the queue is automatically deleted. The minimum duration is 5 minutes.
Duplicate detection
If an error occurs that causes the client to have any doubt about the outcome of a send operation, duplicate detection takes the doubt out of these situations by enabling the sender to re-send the same message, and the queue or topic discards any duplicate copies.
SAS, RBAC, and Managed identities for Azure resources
Service Bus supports security protocols such as Shared Access Signatures (SAS), Role-Based Access Control (RBAC) and Managed identities for Azure resources.
Geo-disaster recovery
When Azure regions or datacenters experience downtime, Geo-disaster recovery enables data processing to continue operating in a different region or datacenter.
Security
Service Bus supports standard AMQP 1.0 and HTTP/REST protocols.
Client libraries
Service Bus supports client libraries for .NET, Java, JMS.
Integration
Service Bus fully integrates with the following Azure services:
Get started with Service Bus queues
Prerequisites
- Visual Studio 2017 Update 3 (version 15.3, 26730.01) or later.
- NET Core SDK, version 2.0 or later.
- An Azure subscription.
Create a namespace in the Azure portal
To begin using Service Bus messaging entities in Azure, you must first create a namespace with a name that is unique across Azure. A namespace provides a scoping container for addressing Service Bus resources within your application.
To create a namespace:
- Sign in to the Azure portal
- In the left navigation pane of the portal, select + Create a resource, select Integration, and then select Service Bus.
- In the Create namespace dialog, do the following steps:
- Enter a name for the namespace. The system immediately checks to see if the name is available.
- Select the pricing tier (Basic, Standard, or Premium) for the namespace. If you want to use topics and subscriptions, choose either Standard or Premium. Topics/subscriptions are not supported in the Basic pricing tier.
- If you selected the Premium pricing tier, follow these steps:
- Specify the number of messaging units. The premium tier provides resource isolation at the CPU and memory level so that each workload runs in isolation. This resource container is called a messaging unit. A premium namespace has least one messaging unit. You can select 1, 2, or 4 messaging units for each Service Bus Premium namespace. For more information, see Service Bus Premium Messaging.
- Specify whether you want to make the namespace zone redundant. The zone redundancy provides enhanced availability by spreading replicas across availability zones within one region at no additional cost. For more information, see Availability zones in Azure.
- For Subscription, choose an Azure subscription in which to create the namespace.
- For Resource group, choose an existing resource group in which the namespace will live, or create a new one.
- For Location, choose the region in which your namespace should be hosted.
- Select Create. The system now creates your namespace and enables it. You might have to wait several minutes as the system provisions resources for your account.
- Confirm that the service bus namespace is deployed successfully. To see the notifications, select the bell icon (Alerts) on the toolbar. Select the name of the resource group in the notification as shown in the image. You see the resource group that contains the service bus namespace.
- On the Resource group page for your resource group, select your service bus namespace.
- You see the home page for your service bus namespace.
Get the connection string
Creating a new namespace automatically generates an initial Shared Access Signature (SAS) rule with an associated pair of primary and secondary keys that each grant full control over all aspects of the namespace. See Service Bus authentication and authorization for information about how to create rules with more constrained rights for regular senders and receivers. To copy the primary and secondary keys for your namespace, follow these steps:
- Click All resources, then click the newly created namespace name.
- In the namespace window, click Shared access policies.
- In the Shared access policies screen, click RootManageSharedAccessKey.
- In the Policy: RootManageSharedAccessKey window, click the copy button next to Primary Connection String, to copy the connection string to your clipboard for later use. Paste this value into Notepad or some other temporary location.
- Repeat the previous step, copying and pasting the value of Primary key to a temporary location for later use.
Create a queue in the Azure portal
- On the Service Bus Namespace page, select Queues in the left navigational menu.
- On the Queues page, select + Queue on the toolbar.
- Enter a name for the queue, and leave the other values with their defaults.
- Now, select Create.
Send messages to the queue
To send messages to the queue, write a C# console application using Visual Studio.
Create a console application
Launch Visual Studio and create a new Console App (.NET Core) project.
Add the Service Bus NuGet package
- Right-click the newly created project and select Manage NuGet Packages.
- Click the Browse tab, search for Microsoft.Azure.ServiceBus, and then select the Microsoft.Azure.ServiceBus item. Click Install to complete the installation, then close this dialog box.
Write code to send messages to the queue
- In Program.cs, add the following using statements at the top of the namespace definition, before the class declaration:
using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.ServiceBus;
- Within the Program class, declare the following variables. Set the ServiceBusConnectionString variable to the connection string that you obtained when creating the namespace, and set QueueName to the name that you used when creating the queue:
const string ServiceBusConnectionString = ""; const string QueueName = ""; static IQueueClient queueClient;
- Replace the default contents of Main() with the following line of code:
MainAsync().GetAwaiter().GetResult();
- Directly after Main(), add the following asynchronous MainAsync() method that calls the send messages method:
static async Task MainAsync() { const int numberOfMessages = 10; queueClient = new QueueClient(ServiceBusConnectionString, QueueName); Console.WriteLine("======================================================"); Console.WriteLine("Press ENTER key to exit after sending all the messages."); Console.WriteLine("======================================================"); // Send messages. await SendMessagesAsync(numberOfMessages); Console.ReadKey(); await queueClient.CloseAsync(); }
- Directly after the MainAsync() method, add the following SendMessagesAsync() method that performs the work of sending the number of messages specified by numberOfMessagesToSend (currently set to 10):
static async Task SendMessagesAsync(int numberOfMessagesToSend) { try { for (var i = 0; i < numberOfMessagesToSend; i++) { // Create a new message to send to the queue. string messageBody = $"Message {i}"; var message = new Message(Encoding.UTF8.GetBytes(messageBody)); // Write the body of the message to the console. Console.WriteLine($"Sending message: {messageBody}"); // Send the message to the queue. await queueClient.SendAsync(message); } } catch (Exception exception) { Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}"); } }
- Here is what your Program.cs file should look like.
namespace CoreSenderApp { using System; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.ServiceBus; class Program { // Connection String for the namespace can be obtained from the Azure portal under the // 'Shared Access policies' section. const string ServiceBusConnectionString = ""; const string QueueName = ""; static IQueueClient queueClient; static void Main(string[] args) { MainAsync().GetAwaiter().GetResult(); } static async Task MainAsync() { const int numberOfMessages = 10; queueClient = new QueueClient(ServiceBusConnectionString, QueueName); Console.WriteLine("======================================================"); Console.WriteLine("Press ENTER key to exit after sending all the messages."); Console.WriteLine("======================================================"); // Send Messages await SendMessagesAsync(numberOfMessages); Console.ReadKey(); await queueClient.CloseAsync(); } static async Task SendMessagesAsync(int numberOfMessagesToSend) { try { for (var i = 0; i < numberOfMessagesToSend; i++) { // Create a new message to send to the queue string messageBody = $"Message {i}"; var message = new Message(Encoding.UTF8.GetBytes(messageBody)); // Write the body of the message to the console Console.WriteLine($"Sending message: {messageBody}"); // Send the message to the queue await queueClient.SendAsync(message); } } catch (Exception exception) { Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}"); } } } }
- Run the program, and check the Azure portal: click the name of your queue in the namespace Overview window. The queue Essentials screen is displayed. Notice that the Active Message Count value for the queue is now 10. Each time you run the sender application without retrieving the messages (as described in the next section), this value increases by 10. Also, note that the current size of the queue increments the Current value in the Essentials window each time the app adds messages to the queue.
Receive messages from the queue
To receive the messages you just sent, create another .NET Core console application and install the Microsoft.Azure.ServiceBusNuGet package, similar to the previous sender application.
Write code to receive messages from the queue
- In Program.cs, add the following using statements at the top of the namespace definition, before the class declaration:
using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.ServiceBus;
- Within the Program class, declare the following variables. Set the ServiceBusConnectionString variable to the connection string that you obtained when creating the namespace, and set QueueName to the name that you used when creating the queue:
const string ServiceBusConnectionString = ""; const string QueueName = ""; static IQueueClient queueClient;
- Replace the default contents of Main() with the following line of code:
MainAsync().GetAwaiter().GetResult();
- Directly after Main(), add the following asynchronous MainAsync() method that calls the RegisterOnMessageHandlerAndReceiveMessages() method:
static async Task MainAsync() { queueClient = new QueueClient(ServiceBusConnectionString, QueueName); Console.WriteLine("======================================================"); Console.WriteLine("Press ENTER key to exit after receiving all the messages."); Console.WriteLine("======================================================"); // Register the queue message handler and receive messages in a loop RegisterOnMessageHandlerAndReceiveMessages(); Console.ReadKey(); await queueClient.CloseAsync(); }
- Directly after the MainAsync() method, add the following method that registers the message handler and receives the messages sent by the sender application:
static void RegisterOnMessageHandlerAndReceiveMessages() { // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc. var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) { // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity. // Set it according to how many messages the application wants to process in parallel. MaxConcurrentCalls = 1, // Indicates whether the message pump should automatically complete the messages after returning from user callback. // False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync(). AutoComplete = false }; // Register the function that processes messages. queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions); }
- Directly after the previous method, add the following ProcessMessagesAsync() method to process the received messages:
static async Task ProcessMessagesAsync(Message message, CancellationToken token) { // Process the message. Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}"); // Complete the message so that it is not received again. // This can be done only if the queue Client is created in ReceiveMode.PeekLock mode (which is the default). await queueClient.CompleteAsync(message.SystemProperties.LockToken); // Note: Use the cancellationToken passed as necessary to determine if the queueClient has already been closed. // If queueClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc. // to avoid unnecessary exceptions. }
- Finally, add the following method to handle any exceptions that might occur:
// Use this handler to examine the exceptions received on the message pump. static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) { Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}."); var context = exceptionReceivedEventArgs.ExceptionReceivedContext; Console.WriteLine("Exception context for troubleshooting:"); Console.WriteLine($"- Endpoint: {context.Endpoint}"); Console.WriteLine($"- Entity Path: {context.EntityPath}"); Console.WriteLine($"- Executing Action: {context.Action}"); return Task.CompletedTask; }
- Here is what your Program.cs file should look like:
namespace CoreReceiverApp { using System; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.ServiceBus; class Program { // Connection String for the namespace can be obtained from the Azure portal under the // 'Shared Access policies' section. const string ServiceBusConnectionString = ""; const string QueueName = ""; static IQueueClient queueClient; static void Main(string[] args) { MainAsync().GetAwaiter().GetResult(); } static async Task MainAsync() { queueClient = new QueueClient(ServiceBusConnectionString, QueueName); Console.WriteLine("======================================================"); Console.WriteLine("Press ENTER key to exit after receiving all the messages."); Console.WriteLine("======================================================"); // Register QueueClient's MessageHandler and receive messages in a loop RegisterOnMessageHandlerAndReceiveMessages(); Console.ReadKey(); await queueClient.CloseAsync(); } static void RegisterOnMessageHandlerAndReceiveMessages() { // Configure the MessageHandler Options in terms of exception handling, number of concurrent messages to deliver etc. var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) { // Maximum number of Concurrent calls to the callback `ProcessMessagesAsync`, set to 1 for simplicity. // Set it according to how many messages the application wants to process in parallel. MaxConcurrentCalls = 1, // Indicates whether MessagePump should automatically complete the messages after returning from User Callback. // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below. AutoComplete = false }; // Register the function that will process messages queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions); } static async Task ProcessMessagesAsync(Message message, CancellationToken token) { // Process the message Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}"); // Complete the message so that it is not received again. // This can be done only if the queueClient is created in ReceiveMode.PeekLock mode (which is default). await queueClient.CompleteAsync(message.SystemProperties.LockToken); // Note: Use the cancellationToken passed as necessary to determine if the queueClient has already been closed. // If queueClient has already been Closed, you may chose to not call CompleteAsync() or AbandonAsync() etc. calls // to avoid unnecessary exceptions. } static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) { Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}."); var context = exceptionReceivedEventArgs.ExceptionReceivedContext; Console.WriteLine("Exception context for troubleshooting:"); Console.WriteLine($"- Endpoint: {context.Endpoint}"); Console.WriteLine($"- Entity Path: {context.EntityPath}"); Console.WriteLine($"- Executing Action: {context.Action}"); return Task.CompletedTask; } } }
- Run the program, and check the portal again. Notice that the Active Message Count and Current values are now 0.
Congratulations! You have now created a queue, sent a set of messages to that queue, and received those messages from the same queue.