最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

c# - How to Use MassTransit Response - Stack Overflow

programmeradmin2浏览0评论

project

Version

<PackageReference Include="MassTransit" Version="8.3.4" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.3.1" />
<PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.3.1" />

Provider

/// Program
builder.Services.AddMassTransit(x =>
{
    //x.AddConsumer<CheckOrderStatusConsumer>();
    //将请求发送到指定地址,
    x.AddRequestClient<CheckOrderStatus>();

    x.UsingInMemory((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

/// Controller
[Route("api/[controller]")]
[ApiController]
public class RequestController : ControllerBase
{
        IRequestClient<CheckOrderStatus> _client;

        public RequestController(IRequestClient<CheckOrderStatus> client)
        {
            _client = client;
        }

        [HttpGet("{orderId}")]
        public async Task<IActionResult> Get(string orderId, CancellationToken cancellationToken)
        {
            //var response = await _client.GetResponse<OrderStatusResult>(new { orderId }, cancellationToken);
            return Ok(response.Message);
        }
}

Consumer

/// Program

builder.Services.AddMassTransit(x =>
{
    x.AddRequestClient<CheckOrderStatus>();

    x.UsingInMemory((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

/// Consumer
public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
{
    public async Task Consume(ConsumeContext<CheckOrderStatus> context)
    {
        await context.RespondAsync<OrderStatusResult>(new
            {
                OrderId = "11111",
            });
    }
}

Objective: I want to retrieve classes between two Web API projects.

//x.AddConsumer<CheckOrderStatusConsumer>(); 

This code will request timeout after commenting. Cancelling the annotation can run normally

I'm sorry, I don't know English. I asked questions through a translator

I am self-taught, and there may be some unclear expressions or lack of understanding of business processes. Please forgive me

project

Version

<PackageReference Include="MassTransit" Version="8.3.4" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.3.1" />
<PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.3.1" />

Provider

/// Program
builder.Services.AddMassTransit(x =>
{
    //x.AddConsumer<CheckOrderStatusConsumer>();
    //将请求发送到指定地址,
    x.AddRequestClient<CheckOrderStatus>();

    x.UsingInMemory((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

/// Controller
[Route("api/[controller]")]
[ApiController]
public class RequestController : ControllerBase
{
        IRequestClient<CheckOrderStatus> _client;

        public RequestController(IRequestClient<CheckOrderStatus> client)
        {
            _client = client;
        }

        [HttpGet("{orderId}")]
        public async Task<IActionResult> Get(string orderId, CancellationToken cancellationToken)
        {
            //var response = await _client.GetResponse<OrderStatusResult>(new { orderId }, cancellationToken);
            return Ok(response.Message);
        }
}

Consumer

/// Program

builder.Services.AddMassTransit(x =>
{
    x.AddRequestClient<CheckOrderStatus>();

    x.UsingInMemory((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

/// Consumer
public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
{
    public async Task Consume(ConsumeContext<CheckOrderStatus> context)
    {
        await context.RespondAsync<OrderStatusResult>(new
            {
                OrderId = "11111",
            });
    }
}

Objective: I want to retrieve classes between two Web API projects.

//x.AddConsumer<CheckOrderStatusConsumer>(); 

This code will request timeout after commenting. Cancelling the annotation can run normally

I'm sorry, I don't know English. I asked questions through a translator

I am self-taught, and there may be some unclear expressions or lack of understanding of business processes. Please forgive me

Share Improve this question edited Jan 19 at 13:44 marc_s 754k184 gold badges1.4k silver badges1.5k bronze badges asked Jan 18 at 21:25 Space_Time_Space_Time_ 133 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

You have commented CheckOrderStatusConsumer which needs to be added MassTransit configuration:

//x.AddConsumer<CheckOrderStatusConsumer>())

and thefore consumer for CheckOrderStatus messages is not registered. you need to uncomment that paticular line first.

Then once you do so you need handle the Consumer Request correctly like below:

public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
{
    public async Task Consume(ConsumeContext<CheckOrderStatus> context)
    {
        await context.RespondAsync<OrderStatusResult>(new
        {
            OrderId = context.Message.OrderId // Respond using the incoming order ID
        });
    }
}

Then you can fetch the reponse like below :

var response = await _client.GetResponse<OrderStatusResult>(new { OrderId = orderId });

If the default timeout is too short consider adding few more seconds passing a optional parameter like below:

var response = await _client.GetResponse<OrderStatusResult>(new { OrderId = orderId }, timeout: TimeSpan.FromSeconds(30));

Updated answer:

Have a shared library project and have the below, CheckOrderStatus class as requester and OrderStatusResult class as responder:

In project shared library:

public class CheckOrderStatus
{
    public string UserId { get; set; }
}

public class OrderStatusResult
{
    public string UserId { get; set; }
    public string UserName { get; set; }
}

In project Requestor:

builder.Services.AddMassTransit(x =>
{
    
    x.AddRequestClient<CheckOrderStatus>(); // Add a request client for CheckOrderStatus     
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("rabbitmq://localhost"); // RabbitMQ broker
    });
});

In Requestor project controller:

[Route("api/[controller]")]
[ApiController]
public class RequestController : ControllerBase
{
    private readonly IRequestClient<CheckOrderStatus> _client;

    public RequestController(IRequestClient<CheckOrderStatus> client)
    {
        _client = client;
    }

    [HttpGet("{userId}")]
    public async Task<IActionResult> GetUser(string userId, CancellationToken cancellationToken)
    {
        var response = await _client.GetResponse<OrderStatusResult>(
            new CheckOrderStatus { UserId = userId }, cancellationToken);

        return Ok(response.Message);
    }
}

In project Responder:

builder.Services.AddMassTransit(x =>
{
    
    x.AddConsumer<CheckOrderStatusConsumer>(); // Register the consumer
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("rabbitmq://localhost");
        cfg.ConfigureEndpoints(context); // Auto-configure consumer endpoints
    });
});

In project Responder add consuming logic:

public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
{
    public async Task Consume(ConsumeContext<CheckOrderStatus> context)
    {
        // Simulate fetching the user from a database or other business logic related functionality would come here
        var user = new OrderStatusResult
        {
            UserId = context.Message.UserId,
            UserName = "John Doe"
        };

        await context.RespondAsync(user);
    }
}
发布评论

评论列表(0)

  1. 暂无评论