.NET ÔÆÔÉú¼Ü¹¹Ê¦ÑµÁ·Óª£¨Ä£¿é¶þ »ù´¡¹®¹Ì RabbitMQ Masstransit Ïê½â£©--ѧϰ±Ê¼Ç
2.6.7 RabbitMQ -- Masstransit Ïê½â
- Consumer Ïû·ÑÕß
- Producer Éú²úÕß
- Request-Response ÇëÇó-ÏìÓ¦
Consumer Ïû·ÑÕß
ÔÚ MassTransit ÖУ¬Ò»¸öÏû·ÑÕß¿ÉÒÔÏû·Ñ365ÌåÓýͶע»ò¶àÖÖÏûÏ¢
Ïû·ÑÕßµÄÀàÐͰüÀ¨£ºÆÕͨÏû·ÑÕߣ¬saga£¬saga ״̬»ú£¬Â·Óɻ£¨·Ö²¼Ê½×·×Ù£©£¬´¦ÀíÆ÷ handlers£¬¹¤×÷Ïû·ÑÕß job comsumers
- Consumer
- Instance
- Handler
- Others
Consumer
public class Program
{
public static async Task Main()
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint("order-service", e =>
{
e.Consumer<SubmitOrderConsumer>();
});
});
}
}
¼Ì³Ð IConsumer£¬ÊµÏÖ Consume ·½·¨
class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
await context.Publish<OrderSubmitted>(new
{
context.Message.OrderId
});
}
}
Èý¸öÔÔò£º
- Óµ±§ The Hollywood Principle, which states, "Dont't call us, we'll call you."
- Consume ·½·¨ÊÇÒ»¸ö±»µÈ´ýµÄ·½·¨£¬ÔÚÖ´ÐÐÖÐʱÆäËûÏû·ÑÕßÎÞ·¨½ÓÊÕµ½Õâ¸öÏûÏ¢£¬µ±Õâ¸ö·½·¨Íê³ÉµÄʱºò£¬ÏûÏ¢±» ack£¬²¢ÇÒ´Ó¶ÓÁÐÖÐÒÆ³ý
- Task ·½·¨Òì³£»áµ¼ÖÂÏûÏ¢´¥·¢ retry£¬Èç¹ûûÓÐÅäÖÃÖØÊÔ£¬ÏûÏ¢½«±»Í¶µÝµ½Ê§°Ü¶ÓÁÐ
Instance
public class Program
{
public static async Task Main()
{
var submitOrderConsumer = new SubmitOrderConsumer();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint("order-service", e =>
{
e.Instance(submitOrderConsumer);
});
});
}
}
ËùÓнÓÊÕµ½µÄÏûÏ¢¶¼ÓÉÒ»¸öÏû·ÑÕßÀ´ÊµÀýÀ´´¦Àí£¨ÇëÈ·±£Õâ¸öÏû·ÑÕßÀàÊÇḬ̈߳²È«£©
Consumer ÿ´Î½ÓÊÕµ½ÏûÏ¢¶¼»á new Ò»¸öʵÀý
Handler
public class Program
{
public static async Task Main()
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint("order-service", e =>
{
e.Handler<SubmitOrder>(async context =>
{
await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}");
});
});
});
}
}
ͨ¹ýÒ»¸öίÍÐ Lambda ·½·¨£¬À´Ïû·ÑÏûÏ¢
Others
- Saga<>
- StateMachineSaga<>
Producer Éú²úÕß
ÏûÏ¢µÄÉú²ú¿ÉÒÔͨ¹ýÁ½ÖÖ·½Ê½²úÉú£º·¢Ëͺͷ¢²¼
·¢Ë͵ÄʱºòÐèÒªÖ¸¶¨Ò»¸ö¾ßÌåµÄµØÖ· DestinationAddress£¬·¢²¼µÄʱºòÏûÏ¢»á±»¹ã²¥¸øËùÓж©ÔÄÁËÕâ¸öÏûÏ¢ÀàÐ͵ÄÏû·ÑÕß
»ùÓÚÕâÁ½ÖÖ¹æÔò£¬ÏûÏ¢±»¶¨ÒåΪ£ºÃüÁî command ºÍʼþ event
- send
- publish
send
¿ÉÒÔµ÷ÓÃÒÔ϶ÔÏóµÄ send ·½·¨À´·¢ËÍ command£º
- ConsumeContext £¨ÔÚ Consumer µÄ Consumer ·½·¨²ÎÊýÖд«µÝ£©
- ISendEndpointProvider£¨¿ÉÒÔ´Ó DI ÖлñÈ¡£©
- IBusControl£¨×î¶¥²ãµÄ¿ØÖƶÔÏó£¬ÓÃÀ´Æô¶¯ºÍÍ£Ö¹ masstransit µÄ¿ØÖÆÆ÷£©
ConsumeContext
public class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
private readonly IOrderSubmitter _orderSubmitter;
public SubmitOrderConsumer(IOrderSubmitter submitter)
=> _orderSubmitter = submitter;
public async Task Consume(IConsumeContext<SubmitOrder> context)
{
await _orderSubmitter.Process(context.Message);
await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
}
}
ISendEndpointProvider
public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
{
var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);
await endpoint.Send(new SubmitOrder { OrderId = "123" });
}
publish
- ·¢Ë͵ØÖ·
- ¶ÌµØÖ·
- Convention Map
·¢Ë͵ØÖ·
- rabbitmq://localhost/input-queue
- rabbitmq://localhost/input-queue?durable=false
¶ÌµØÖ·
- GetSendEndpoint(new Uri("queue:input-queue"))
Convention Map
ÔÚÅäÖÃÎļþÖÐÖ¸¶¨ map ¹æÔò
EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));
Ö±½Ó·¢ËÍ
public class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
private readonly IOrderSubmitter _orderSubmitter;
public SubmitOrderConsumer(IOrderSubmitter submitter)
=> _orderSubmitter = submitter;
public async Task Consume(IConsumeContext<SubmitOrder> context)
{
await _orderSubmitter.Process(context.Message);
await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
}
}
¿ÉÒÔµ÷ÓÃÒÔ϶ÔÏóµÄ publish ·½·¨À´·¢ËÍ event£º
- ConsumeContext £¨ÔÚ Consumer µÄ Consumer ·½·¨²ÎÊýÖд«µÝ£©
- IPublishEndpoint£¨¿ÉÒÔ´Ó DI ÖлñÈ¡£©
- IBusControl£¨×î¶¥²ãµÄ¿ØÖƶÔÏó£¬ÓÃÀ´Æô¶¯ºÍÍ£Ö¹ masstransit µÄ¿ØÖÆÆ÷£©
IPublishEndpoint
public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
{
await publishEndpoint.Publish<OrderSubmitted>(new
{
OrderId = "27",
OrderDate = DateTime.UtcNow,
});
}
Request-Response ÇëÇó-ÏìÓ¦
Request-Response ģʽÈÃÓ¦ÓóÌÐòÖ®¼ä½âñîÖ®ºó£¬ÒÀÈ»²ÉÓÃͬ²½µÄ·½Ê½
- Consumer
- IClientFactory
- IRequestClient
- Send a request
Consumer
public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
var order = await _orderRepository.Get(context.Message.OrderId);
if (order == null)
throw new InvalidOperationException("Order not found");
await context.RespondAsync<OrderStatusResult>(new
{
OrderId = order.Id,
order.Timestamp,
order.StatusCode,
order.StatusText
});
}
ÐèÒª´¦Àí·µ»ØÀàÐÍ OrderStatusResult£¬Òì²½·½Ê½Ä£Äâͬ²½£¬Êµ¼ÊÉÏͬÑùÓÐÏûÏ¢¶ÓÁУ¬Ïû·ÑÕß´¦Àí¹ý³Ì
IClientFactory
public interface IClientFactory
{
IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);
IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);
RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
}
ͨ¹ý IBusControl µÄ CreateClientFactory ·½·¨¿ÉÒԵõ½ ClientFactory
IRequestClient
public interface IRequestClient<TRequest>
where TRequest : class
{
RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
}
RequestClient ¿ÉÒÔ´´½¨ÇëÇ󣬻òÕßÖ±½Ó»ñµÃÏìÓ¦
Send a request
var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");
var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);
var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});
±¾×÷Æ·²ÉÓÃ֪ʶ¹²ÏíÊðÃû-·ÇÉÌÒµÐÔʹÓÃ-Ïàͬ·½Ê½¹²Ïí 4.0 ¹ú¼ÊÐí¿ÉÐÒé½øÐÐÐí¿É¡£
»¶Ó×ªÔØ¡¢Ê¹Óá¢ÖØÐ·¢²¼£¬µ«Îñ±Ø±£ÁôÎÄÕÂÊðÃû Ö£×ÓÃú £¨°üº¬Á´½Ó£º http://www.cnblogs.com/MingsonZheng/ £©£¬²»µÃÓÃÓÚÉÌҵĿµÄ£¬»ùÓÚ365betÌåÓýÔÚÏßÐ޸ĺóµÄ×÷Æ·Îñ±ØÒÔÏàͬµÄÐí¿É·¢²¼¡£
ÈçÓÐÈκ365betÌåÓýÔÚÏßÉÎÊ£¬ÇëÓë365betÌåÓýÔÚÏßÁªÏµ (MingsonZheng@outlook.com) ¡£