Элемент очереди процессов net основной API - PullRequest
0 голосов
/ 08 мая 2020

Я использую https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.1&tabs=visual-studio в качестве ссылки для создания фоновой службы и очередей обработки.

У меня есть класс ImportService.cs, куда приходит csvfile из HTTP-запроса, затем я хочу добавить в очередь, которая обрабатывает этот CSV-файл и записывает результаты в базу данных. Это мой класс обслуживания, где у меня есть IBackgroundTaskQueue instance

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using CsvHelper;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;

namespace Services.Services
{
public class ImportService : BaseService, IImportService
{
    private readonly IUploadDataRepository _uploadDataRepository;
    private readonly ConfigurationSettings _configurationSettings;
    public IBackgroundTaskQueue Queue { get; }
    private const string AZURE_BLOB_CONTAINER = "blobcontainer";

    public ImportService(IServiceProvider services, IBackgroundTaskQueue queue) : base(services)
    {
        _uploadDataRepository = services.GetUploadDataRepository();
        _configurationSettings = services.GetService<ConfigurationSettings>();
        Queue = queue;
    }

    public async Task UploadToBlobStorage(IFormFile file, int userId, Type type)
    {        
        var fileFormat = GetFileFormat(file);
        var tempFilePath = await GetTemporaryPath(file);
        var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
        string storageConnectionString = _configurationSettings.ConnectionStrings.BlobStorageConnection;
        CloudStorageAccount account = CloudStorageAccount.Parse(storageConnectionString);
        var blobClient = account.CreateCloudBlobClient();

        // Make sure container is there
        var blobContainer = blobClient.GetContainerReference(AZURE_BLOB_CONTAINER);
        await blobContainer.CreateIfNotExistsAsync();

        // set the permission to blob type
        await blobContainer.SetPermissionsAsync(new BlobContainerPermissions { PublicAccess = BlobContainerPublicAccessType.Blob });
        CloudBlockBlob blockBlob = blobContainer.GetBlockBlobReference(fileName);
        using (var fileStream = File.OpenRead(tempFilePath))
        {
          await blockBlob.UploadFromStreamAsync(fileStream);
        }
            // ADD FILE TO QUEUE AND PROCESS IT
        Queue.QueueBackgroundWorkItem(async token =>
        {
            Console.WriteLine("ITEM QUEUED PROCESS IT??");

        });
        await _uploadDataRepository.Add(uploadData);
    }
}

}

Ниже я добавлю классы, созданные из примера Microsoft:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Services.Services.Contracts {
  public interface IBackgroundTaskQueue {
    void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
    Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
  }
}

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace Services.Services {
  /// <summary>
  /// Queued Hosted Service class
  /// </summary>
  public abstract class QueuedHostedService: BackgroundService {
    private readonly ILogger _logger;
    private IBackgroundTaskQueue TaskQueue {
      get;
    }

    protected QueuedHostedService(IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
      TaskQueue = taskQueue;
      _logger = loggerFactory.CreateLogger < QueuedHostedService > ();
      Console.WriteLine("QueuedHostedService initialized");
    }
    protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
      _logger.LogInformation("Queued Hosted Service is starting.");

      while (!cancellationToken.IsCancellationRequested) {
        var workItem = await TaskQueue.DequeueAsync(cancellationToken);

        try {
          await workItem(cancellationToken);
        } catch (Exception ex) {
          _logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
        }
      }
    }

    private void DoWork(object state) {
      Console.WriteLine("PROCCESS FILEE???");
    }
  }
}

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Services.Services {
  public class BackgroundTaskQueue: IBackgroundTaskQueue {
    private ConcurrentQueue < Func < CancellationToken, Task >> _workItems =
      new ConcurrentQueue < Func < CancellationToken, Task >> ();

    private SemaphoreSlim _signal = new SemaphoreSlim(0);

    public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
      if (workItem == null) {
        throw new ArgumentNullException(nameof(workItem));
      }

      _workItems.Enqueue(workItem);
      _signal.Release();
    }

    public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
      await _signal.WaitAsync(cancellationToken);
      _workItems.TryDequeue(out
        var workItem);

      return workItem;
    }
  }
}

Мой вопрос: где следует обрабатывать этот файл? В ImportService? Или в QueuedHostedService? Если в QueuedHostedService как мне передать и получить доступ к этому файлу? Что было бы лучше всего для этого? Я хотел создать функцию DoWork() в QueuedHostedService, которая обрабатывает этот файл, но я не уверен, как

...