Честно говоря, я не думаю, что имеет смысл принимать и обрабатывать сообщения в одном процессе, поэтому я бы рекомендовал использовать внешнюю систему обмена сообщениями, такую как RabbitMQ или Kafka или любую другую. другая существующая система вашего предпочтения, где вы можете поместить свои сообщения, и другой процесс будет использовать его. Это довольно большая тема, вы можете начать с этого урока
Если вы все еще хотите иметь его в одном процессе, это также возможно, вы можете создать очередь фоновых задач, поместить туда свои сообщения и создать фоновую задачу , которая будет использовать их из этой очереди.
public interface IBackgroundTaskQueue
{
void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
Task<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private ConcurrentQueue<Func<CancellationToken, Task>> _workItems =
new ConcurrentQueue<Func<CancellationToken, Task>>();
private SemaphoreSlim _signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(
Func<CancellationToken, Task> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
}
public async Task<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken)
{
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out var workItem);
return workItem;
}
}
Фоновое задание:
public class QueuedHostedService : BackgroundService
{
private readonly ILogger _logger;
public QueuedHostedService(IBackgroundTaskQueue taskQueue,
ILoggerFactory loggerFactory)
{
TaskQueue = taskQueue;
_logger = loggerFactory.CreateLogger<QueuedHostedService>();
}
public IBackgroundTaskQueue TaskQueue { get; }
protected async override Task ExecuteAsync(
CancellationToken cancellationToken)
{
_logger.LogInformation("Queued Hosted Service is starting.");
while (!cancellationToken.IsCancellationRequested)
{
var workItem = await TaskQueue.DequeueAsync(cancellationToken);
try
{
await workItem(cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
$"Error occurred executing {nameof(workItem)}.");
}
}
_logger.LogInformation("Queued Hosted Service is stopping.");
}
}
Регистрация:
public void ConfigureServices(IServiceCollection services)
{
services.AddHostedService<QueuedHostedService>();
services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
}
Ввести в контроллер:
public class ApiController
{
private IBackgroundTaskQueue queue;
public ApiController(IBackgroundTaskQueue queue)
{
this.queue = queue;
}
public IActionResult StartProcessing()
{
queue.QueueBackgroundWorkItem(async token =>
{
// put processing code here
}
return Ok();
}
}
Вы можете изменить BackgroundTaskQueue в соответствии с вашими требованиями, но я надеюсь, что вы понимаете идею, лежащую в основе этого.