¡¡¡¡Go to my github

.NET ÔÆÔ­Éú¼Ü¹¹Ê¦ÑµÁ·Óª£¨Ä£¿é¶þ »ù´¡¹®¹Ì RabbitMQ Masstransit Ïê½â£©--ѧϰ±Ê¼Ç

2.6.7 RabbitMQ -- Masstransit Ïê½â

  • Consumer Ïû·ÑÕß
  • Producer Éú²úÕß
  • Request-Response ÇëÇó-ÏìÓ¦

Consumer Ïû·ÑÕß

ÔÚ MassTransit ÖУ¬Ò»¸öÏû·ÑÕß¿ÉÒÔÏû·Ñ365ÌåÓýͶע»ò¶àÖÖÏûÏ¢

Ïû·ÑÕßµÄÀàÐͰüÀ¨£ºÆÕͨÏû·ÑÕߣ¬saga£¬saga ״̬»ú£¬Â·Óɻ£¨·Ö²¼Ê½×·×Ù£©£¬´¦ÀíÆ÷ handlers£¬¹¤×÷Ïû·ÑÕß job comsumers

  • Consumer
  • Instance
  • Handler
  • Others

Consumer

public class Program
{
    public static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Consumer<SubmitOrderConsumer>();
            });
        });
    }
}

¼Ì³Ð IConsumer£¬ÊµÏÖ Consume ·½·¨

class SubmitOrderConsumer :
    IConsumer<SubmitOrder>
{
    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        await context.Publish<OrderSubmitted>(new
        {
            context.Message.OrderId
        });
    }
}

Èý¸öÔ­Ôò£º

  • Óµ±§ The Hollywood Principle, which states, "Dont't call us, we'll call you."
  • Consume ·½·¨ÊÇÒ»¸ö±»µÈ´ýµÄ·½·¨£¬ÔÚÖ´ÐÐÖÐʱÆäËûÏû·ÑÕßÎÞ·¨½ÓÊÕµ½Õâ¸öÏûÏ¢£¬µ±Õâ¸ö·½·¨Íê³ÉµÄʱºò£¬ÏûÏ¢±» ack£¬²¢ÇÒ´Ó¶ÓÁÐÖÐÒÆ³ý
  • Task ·½·¨Òì³£»áµ¼ÖÂÏûÏ¢´¥·¢ retry£¬Èç¹ûûÓÐÅäÖÃÖØÊÔ£¬ÏûÏ¢½«±»Í¶µÝµ½Ê§°Ü¶ÓÁÐ

Instance

public class Program
{
    public static async Task Main()
    {
        var submitOrderConsumer = new SubmitOrderConsumer();

        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Instance(submitOrderConsumer);
            });
        });
    }
}

ËùÓнÓÊÕµ½µÄÏûÏ¢¶¼ÓÉÒ»¸öÏû·ÑÕßÀ´ÊµÀýÀ´´¦Àí£¨ÇëÈ·±£Õâ¸öÏû·ÑÕßÀàÊÇḬ̈߳²È«£©

Consumer ÿ´Î½ÓÊÕµ½ÏûÏ¢¶¼»á new Ò»¸öʵÀý

Handler

public class Program
{
    public static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Handler<SubmitOrder>(async context =>
                {
                    await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}");
                });
            });
        });
    }
}

ͨ¹ýÒ»¸öίÍÐ Lambda ·½·¨£¬À´Ïû·ÑÏûÏ¢

Others

  • Saga<>
  • StateMachineSaga<>

Producer Éú²úÕß

ÏûÏ¢µÄÉú²ú¿ÉÒÔͨ¹ýÁ½ÖÖ·½Ê½²úÉú£º·¢Ëͺͷ¢²¼

·¢Ë͵ÄʱºòÐèÒªÖ¸¶¨Ò»¸ö¾ßÌåµÄµØÖ· DestinationAddress£¬·¢²¼µÄʱºòÏûÏ¢»á±»¹ã²¥¸øËùÓж©ÔÄÁËÕâ¸öÏûÏ¢ÀàÐ͵ÄÏû·ÑÕß

»ùÓÚÕâÁ½ÖÖ¹æÔò£¬ÏûÏ¢±»¶¨ÒåΪ£ºÃüÁî command ºÍʼþ event

  • send
  • publish

send

¿ÉÒÔµ÷ÓÃÒÔ϶ÔÏóµÄ send ·½·¨À´·¢ËÍ command£º

  • ConsumeContext £¨ÔÚ Consumer µÄ Consumer ·½·¨²ÎÊýÖд«µÝ£©
  • ISendEndpointProvider£¨¿ÉÒÔ´Ó DI ÖлñÈ¡£©
  • IBusControl£¨×î¶¥²ãµÄ¿ØÖƶÔÏó£¬ÓÃÀ´Æô¶¯ºÍÍ£Ö¹ masstransit µÄ¿ØÖÆÆ÷£©

ConsumeContext

public class SubmitOrderConsumer : 
    IConsumer<SubmitOrder>
{
    private readonly IOrderSubmitter _orderSubmitter;

    public SubmitOrderConsumer(IOrderSubmitter submitter)
        => _orderSubmitter = submitter;

    public async Task Consume(IConsumeContext<SubmitOrder> context)
    {
        await _orderSubmitter.Process(context.Message);

        await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
    }
}

ISendEndpointProvider

public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
{
    var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);

    await endpoint.Send(new SubmitOrder { OrderId = "123" });
}

publish

  • ·¢Ë͵ØÖ·
  • ¶ÌµØÖ·
  • Convention Map

·¢Ë͵ØÖ·

  • rabbitmq://localhost/input-queue
  • rabbitmq://localhost/input-queue?durable=false

¶ÌµØÖ·

  • GetSendEndpoint(new Uri("queue:input-queue"))

Convention Map

ÔÚÅäÖÃÎļþÖÐÖ¸¶¨ map ¹æÔò

EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));

Ö±½Ó·¢ËÍ

public class SubmitOrderConsumer : 
    IConsumer<SubmitOrder>
{
    private readonly IOrderSubmitter _orderSubmitter;

    public SubmitOrderConsumer(IOrderSubmitter submitter)
        => _orderSubmitter = submitter;

    public async Task Consume(IConsumeContext<SubmitOrder> context)
    {
        await _orderSubmitter.Process(context.Message);

        await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
    }
}

¿ÉÒÔµ÷ÓÃÒÔ϶ÔÏóµÄ publish ·½·¨À´·¢ËÍ event£º

  • ConsumeContext £¨ÔÚ Consumer µÄ Consumer ·½·¨²ÎÊýÖд«µÝ£©
  • IPublishEndpoint£¨¿ÉÒÔ´Ó DI ÖлñÈ¡£©
  • IBusControl£¨×î¶¥²ãµÄ¿ØÖƶÔÏó£¬ÓÃÀ´Æô¶¯ºÍÍ£Ö¹ masstransit µÄ¿ØÖÆÆ÷£©

IPublishEndpoint

public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
{
    await publishEndpoint.Publish<OrderSubmitted>(new
    {
        OrderId = "27",
        OrderDate = DateTime.UtcNow,
    });
}

Request-Response ÇëÇó-ÏìÓ¦

Request-Response ģʽÈÃÓ¦ÓóÌÐòÖ®¼ä½âñîÖ®ºó£¬ÒÀÈ»²ÉÓÃͬ²½µÄ·½Ê½

  • Consumer
  • IClientFactory
  • IRequestClient
  • Send a request

Consumer

public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
    var order = await _orderRepository.Get(context.Message.OrderId);
    if (order == null)
        throw new InvalidOperationException("Order not found");
    
    await context.RespondAsync<OrderStatusResult>(new 
    {
        OrderId = order.Id,
        order.Timestamp,
        order.StatusCode,
        order.StatusText
    });
}

ÐèÒª´¦Àí·µ»ØÀàÐÍ OrderStatusResult£¬Òì²½·½Ê½Ä£Äâͬ²½£¬Êµ¼ÊÉÏͬÑùÓÐÏûÏ¢¶ÓÁУ¬Ïû·ÑÕß´¦Àí¹ý³Ì

IClientFactory

public interface IClientFactory 
{
    IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);

    IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);

    RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);

    RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
}

ͨ¹ý IBusControl µÄ CreateClientFactory ·½·¨¿ÉÒԵõ½ ClientFactory

IRequestClient

public interface IRequestClient<TRequest>
    where TRequest : class
{
    RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);

    Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
}

RequestClient ¿ÉÒÔ´´½¨ÇëÇ󣬻òÕßÖ±½Ó»ñµÃÏìÓ¦

Send a request

var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");
var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);

var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});

֪ʶ¹²ÏíÐí¿ÉЭÒé

±¾×÷Æ·²ÉÓÃ֪ʶ¹²ÏíÊðÃû-·ÇÉÌÒµÐÔʹÓÃ-Ïàͬ·½Ê½¹²Ïí 4.0 ¹ú¼ÊÐí¿ÉЭÒé½øÐÐÐí¿É¡£

»¶Ó­×ªÔØ¡¢Ê¹Óá¢ÖØÐ·¢²¼£¬µ«Îñ±Ø±£ÁôÎÄÕÂÊðÃû Ö£×ÓÃú £¨°üº¬Á´½Ó£º http://www.cnblogs.com/MingsonZheng/ £©£¬²»µÃÓÃÓÚÉÌҵĿµÄ£¬»ùÓÚ365betÌåÓýÔÚÏßÐ޸ĺóµÄ×÷Æ·Îñ±ØÒÔÏàͬµÄÐí¿É·¢²¼¡£

ÈçÓÐÈκ365betÌåÓýÔÚÏßÉÎÊ£¬ÇëÓë365betÌåÓýÔÚÏßÁªÏµ (MingsonZheng@outlook.com) ¡£

posted @ 2021-01-13 22:52  MingsonZheng  ÔĶÁ(157)  ÆÀÂÛ(0±à¼­  ÊÕ²Ø