# MultiBus
pronounced mool-tee-buss
MassTransit is designed so that most applications only need a single bus, and that is the recommended approach. Using a single bus, with however many receive endpoints are needed, minimizes complexity and ensures efficient broker resource utilization. Consistent with this guidance, container configuration using the AddMassTransit
method registers the appropriate types so that they are available to other components, as well as consumers, sagas, and activities.
However, with broader use of cloud-based platforms comes a greater variety of messaging transports, not to mention HTTP as a transfer protocol. As application sophistication increases, connecting to multiple message transports and/or brokers is becoming more common. Therefore, rather than force developers to create their own solutions, MassTransit has the ability to configure additional bus instances within specific dependency injection containers.
And by specific, right now it is very specific: Microsoft.Extensions.DependencyInjection. Though technically, any container that supports
IServiceCollection
for configuration might work.
WARNING
This capability should be considered early-access. MultiBus has undergone limited testing, and while it has good test coverage there are likely edge cases related to container behavior or MassTransit use that complicate matters.
# Standard Configuration
To review the standard configuration, configuring MassTransit is done using the AddMassTransit method as shown.
using MassTransit;
using MassTransit.AspNetCore;
public void ConfigureServices(IServiceCollection services)
{
services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>();
x.AddRequestClient<SubmitOrder>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("localhost");
cfg.UseHealthCheck(context);
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransitHostedService();
}
This configures the container so that there is a bus, using RabbitMQ, with a single consumer SubmitOrderConsumer, using automatic endpoint configuration. The MassTransit hosted service, which configures the bus health checks and starts/stop the bus via IHostedService
, is also added to the container.
There are several interfaces added to the container using this configuration:
Interface | Lifestyle | Notes |
---|---|---|
IBusControl | Singleton | Used to start/stop the bus |
IBus | Singleton | Publish/Send on this bus, starting a new conversation |
ISendEndpointProvider | Scoped | Send messages from consumer dependencies, ASP.NET Controllers |
IPublishEndpoint | Scoped | Publish messages from consumer dependencies, ASP.NET Controllers |
IClientFactory | Singleton | Used to create request clients (singleton, or within scoped consumers) |
IRequestClient<SubmitOrder> | Scoped | Used to send requests |
ConsumeContext | Scoped | Available in any message scope, such as a consumer, saga, or activity |
When a consumer, a saga, or an activity is consuming a message the ConsumeContext is available in the container scope. When the consumer is created using the container, the consumer and any dependencies are created within that scope. If a dependency includes ISendEndpontProvider, IPublishEndpoint, or even ConsumeContext (should not be the first choice, but totally okay) on the constructor, all three of those interfaces result in the same reference – which is great because it ensures that messages sent and/or published by the consumer or its dependencies includes the proper correlation identifiers and monitoring activity headers.
# MultiBus Configuration
To support multiple bus instances in a single container, the interface behaviors described above had to be considered carefully. There are expectations as to how these interfaces behave, and it was important to ensure consistent behavior whether an application has one, two, or a dozen bus instances (please, not a dozen – think of the children). A way to differentiate between different bus instances ensuring that sent or published messages end up on the right queues or topics is needed. The ability to configure each bus instance separately, yet leverage the power of a single shared container is also a must.
To configure additional bus instances, create a new interface that includes IBus.
public interface ISecondBus :
IBus
{
}
Using that interface, configure the additional bus using the AddMassTransit<T>
method, which is included in the MassTransit.MultiBus namespace.
using MassTransit;
using MassTransit.MultiBus;
public void ConfigureServices(IServiceCollection services)
{
// previous AddMassTransit call omitted to focus
services.AddMassTransit<ISecondBus>(x =>
{
x.AddConsumer<AllocateInventoryConsumer>();
x.AddRequestClient<AllocateInventory>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("remote-host");
cfg.UseHealthCheck(context);
cfg.ConfigureEndpoints(context);;
}));
});
// call to AddMassTransitHostedService ommitted to focus
}
This configures the container so that there is an additional bus, using RabbitMQ, with a single consumer AllocateInventoryConsumer, using automatic endpoint configuration. Only a single hosted service is required that will start all bus instances so there is no need to add it twice.
Notable differences in the new method:
- The generic argument, ISecondBus, is the type that will be added to the container instead of IBus. This ensures that access to the additional bus is directly available without confusion.
- The AddBus argument,
context
, is now a registration context instead of the container type (which is IServiceProvider for Microsoft Dependency Injection). It should be used for the ConfigureConsumer, ConfigureSaga, ConfigureEndpoints, etc. methods. If access to the container is neeced, the Container property can be used.
The registered interfaces are slightly different for additional bus instances.
Interface | Lifestyle | Notes |
---|---|---|
IBusControl | N/A | Not registered, but automatically started/stopped by the hosted service |
IBus | N/A | Not registered, the new bus interface is registered instead |
ISecondBus | Singleton | Publish/Send on this bus, starting a new conversation |
ISendEndpointProvider | Scoped | Send messages from consumer dependencies only |
IPublishEndpoint | Scoped | Publish messages from consumer dependencies only |
IClientFactory | N/A | Registered as an instance-specific client factory |
IRequestClient<SubmitOrder> | Scoped | Created using the specific bus instance |
ConsumeContext | Scoped | Available in any message scope, such as a consumer, saga, or activity |
For consumers or dependencies that need to send or publish messages to a different bus instance, a dependency on that specific bus interface (such as IBus, or ISecondBus) would be added.
WARNING
Soem things do not work across bus instances. As stated above, calling Send or Publish on an IBus (or other bus instance interface) starts a new conversation. Middleware components such as the InMemoryOutbox currently do not buffer messages across bus instances.
# Bus Interface Types
In the example above, which should be the most common of this hopefully uncommon use, the ISecondBus interface is all that is required. MassTransit creates a dynamic class to delegate the IBus
methods to the bus instance. However, it is possible to specify a class that implements ISecondBus instead.
To specify a class, as well as take advantage of the container to bring additional properties along with it, take a look at the following types and configuration.
public interface IThirdBus :
IBus
{
ISomeService SomeService { get; }
}
public class ThirdBus :
BusInstance<IThirdBus>,
IThirdBus
{
public ThirdBus(IBusControl busControl, ISomeService someService)
: base(busControl)
{
SomeService = someService;
}
public ISomeService SomeService { get; }
}
Then, to specify the class in the configuration.
using MassTransit;
using MassTransit.MultiBus;
public void ConfigureServices(IServiceCollection services)
{
// previous AddMassTransit call omitted to focus
services.AddMassTransit<IThirdBus, ThirdBus>(x =>
{
x.AddConsumer<DestroyAllHumansConsumer>();
x.AddRequestClient<DestroyAllHumans>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("another-planet");
cfg.UseHealthCheck(context);
cfg.ConfigureEndpoints(context);;
}));
});
// call to AddMassTransitHostedService ommitted to focus
}
This would add a third bus instance, the same as the second, but using the instance class specified. The class is resolved from the container and given IBusControl
, which must be passed to the base class ensuring that it is properly configured.
← Mediator Message Data →