Я использую https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.1&tabs=visual-studio в качестве ссылки для создания фоновой службы и очередей обработки.
У меня есть класс ImportService.cs, куда приходит csvfile из HTTP-запроса, затем я хочу добавить в очередь, которая обрабатывает этот CSV-файл и записывает результаты в базу данных. Это мой класс обслуживания, где у меня есть IBackgroundTaskQueue
instance
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using CsvHelper;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Services.Services
{
public class ImportService : BaseService, IImportService
{
private readonly IUploadDataRepository _uploadDataRepository;
private readonly ConfigurationSettings _configurationSettings;
public IBackgroundTaskQueue Queue { get; }
private const string AZURE_BLOB_CONTAINER = "blobcontainer";
public ImportService(IServiceProvider services, IBackgroundTaskQueue queue) : base(services)
{
_uploadDataRepository = services.GetUploadDataRepository();
_configurationSettings = services.GetService<ConfigurationSettings>();
Queue = queue;
}
public async Task UploadToBlobStorage(IFormFile file, int userId, Type type)
{
var fileFormat = GetFileFormat(file);
var tempFilePath = await GetTemporaryPath(file);
var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
string storageConnectionString = _configurationSettings.ConnectionStrings.BlobStorageConnection;
CloudStorageAccount account = CloudStorageAccount.Parse(storageConnectionString);
var blobClient = account.CreateCloudBlobClient();
// Make sure container is there
var blobContainer = blobClient.GetContainerReference(AZURE_BLOB_CONTAINER);
await blobContainer.CreateIfNotExistsAsync();
// set the permission to blob type
await blobContainer.SetPermissionsAsync(new BlobContainerPermissions { PublicAccess = BlobContainerPublicAccessType.Blob });
CloudBlockBlob blockBlob = blobContainer.GetBlockBlobReference(fileName);
using (var fileStream = File.OpenRead(tempFilePath))
{
await blockBlob.UploadFromStreamAsync(fileStream);
}
// ADD FILE TO QUEUE AND PROCESS IT
Queue.QueueBackgroundWorkItem(async token =>
{
Console.WriteLine("ITEM QUEUED PROCESS IT??");
});
await _uploadDataRepository.Add(uploadData);
}
}
}
Ниже я добавлю классы, созданные из примера Microsoft:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Services.Services.Contracts {
public interface IBackgroundTaskQueue {
void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Services.Services {
/// <summary>
/// Queued Hosted Service class
/// </summary>
public abstract class QueuedHostedService: BackgroundService {
private readonly ILogger _logger;
private IBackgroundTaskQueue TaskQueue {
get;
}
protected QueuedHostedService(IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
TaskQueue = taskQueue;
_logger = loggerFactory.CreateLogger < QueuedHostedService > ();
Console.WriteLine("QueuedHostedService initialized");
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
_logger.LogInformation("Queued Hosted Service is starting.");
while (!cancellationToken.IsCancellationRequested) {
var workItem = await TaskQueue.DequeueAsync(cancellationToken);
try {
await workItem(cancellationToken);
} catch (Exception ex) {
_logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
private void DoWork(object state) {
Console.WriteLine("PROCCESS FILEE???");
}
}
}
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Services.Services {
public class BackgroundTaskQueue: IBackgroundTaskQueue {
private ConcurrentQueue < Func < CancellationToken, Task >> _workItems =
new ConcurrentQueue < Func < CancellationToken, Task >> ();
private SemaphoreSlim _signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
if (workItem == null) {
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
}
public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out
var workItem);
return workItem;
}
}
}
Мой вопрос: где следует обрабатывать этот файл? В ImportService
? Или в QueuedHostedService
? Если в QueuedHostedService
как мне передать и получить доступ к этому файлу? Что было бы лучше всего для этого? Я хотел создать функцию DoWork()
в QueuedHostedService
, которая обрабатывает этот файл, но я не уверен, как