Преобразуйте дорогой вызов в асинхронный, сохраняя систему событий нетронутой - PullRequest
0 голосов
/ 15 декабря 2018

Я столкнулся с некоторыми проблемами, пытаясь оптимизировать старый код.Общая картина такова: существует «механизм экспорта», который раскручивает некоторый объект «записи» в зависимости от желаемого результата.Запись раскручивает объект DataReader и подписывается на его события, чтобы он мог обрабатывать считываемые данные.Затем он запускает длительный метод «GetData» в читателе.Это извлекает данные из устаревшей базы данных, что занимает много времени (!).Устройство чтения данных обрабатывает возвращенные значения и запускает несколько событий, позволяющих записывающему устройству обрабатывать данные.

Ниже приведен очень упрощенный пример псевдокода DataReader.

class DataReader
{
    // delegates
    internal delegate void DataRowReadHandler(object sender, DataRowReadArgs e);
    internal delegate void DataProgressChangedHandler(object sender, DataProgressChangedArgs e);
    internal delegate void DataReadCompleteHandler(object sender, DataReadCompleteArgs e);
    // events
    internal event DataProgressChangedHandler DataProgressChanged;
    internal event DataReadCompleteHandler DataReadCompleted;
    internal event DataRowReadHandler DataRowRead;

    // this methods chomps on and on and raises an event when the database read returns something
    internal void GetData()
    {
        for (int totalrows = 0; totalrows < _cursor.RowCount; totalrows += _maxrows)
        {
            // I want to keep GetRawData running while the data it fetched is being processed
            string[][] rawdata = _cursor.GetRawData(_maxrows);

            // -- a ton of post-processing I want to do while database is being read--

            // and then report progress
            foreach (row in rawdata)
            {
                DataRowReadArgs args = new DataRowReadArgs(row.Index)
                OnDataRowRead(args); // raise event after each row
            }
            DataProgressChangedArgs args = new DataProgressChangedArgs(batch, counter);
            OnDataProgressChanged(args); // raise event after each batch of rows
        }
        // report we're done
        DataReadCompleteArgs e = new DataReadCompleteArgs(counter);
        OnDataReadCompleted(e); // done with reading data
    }

    protected virtual void OnDataProgressChanged(DataProgressChangedArgs e)
    {
        DataProgressChangedHandler handler = DataProgressChanged;
        if (handler != null)
            handler(this, e);
    }

    protected virtual void OnDataReadCompleted(DataReadCompleteArgs e)
    {
        DataReadCompleteHandler handler = DataReadCompleted;
        if (handler != null)
            handler(this, e);
    }

    protected virtual void OnDataRowRead(DataRowReadArgs e)
    {
        DataRowReadHandler handler = DataRowReadRead;
        if (handler != null)
            handler(this, e);
    }
}

ЧтоЯ хочу: поддерживать чтение базы данных (которое будет самым медленным) и обрабатывать возвращаемые данные всякий раз, когда результат запроса становится доступным.То есть: постобработка данных в считывателе, запуск событий, и обработчики в обработчиках обрабатывают их , пока чтение базы данных продолжается .В идеале я также хочу, чтобы какой-нибудь токен отмены останавливал чтение, когда что-то идет не так, но обо всем по порядку.Я НЕ хочу касаться системы, основанной на событиях, на которую полагаются многие классы, я только хочу, чтобы чтение базы данных выполнялось параллельно и чтобы остальная часть кода отвечала всякий раз, когда есть результат.

Iбаловался с await / async и TaskCompletionSource и еще чем-то еще вот уже неделю, но, похоже, все еще не в состоянии обдумать это.Я подошел близко, мне действительно удалось составить список задач, передать его в промежуточный метод, который будет обрабатывать каждую задачу по мере ее завершения, и ждать этого.

internal async Task GetDataAsync()
{
    IList<Task<string[][]>> tasks = CreateCursorReadTasks();
    var processingTasks = tasks.Select(AwaitAndProcessAsync).ToList();
    await Task.WhenAll(processingTasks);
    // this isn't 'awaited' in the sense I expected
    // also, what order are they performed in? The database is single-threaded, no queues, nothing
    // I need to fire my 'done' event only after all tasks have finished
}

private IList<Task<string[][]>> CreateCursorReadTasks()
{
    IList<Task<string[][]>> retval = new List<Task<string[][]>>();
    for (int totalrows = 0; totalrows < this._cursor.RowCount; totalrows += _maxrows)
    {
        retval.Add(Task.Run(() => _cursor.GetRawData(_maxrows)));
    }
    return retval;
}

internal async Task AwaitAndProcessAsync(Task<string[][]> task)
{
    string[][] rawdata = await task;
    // Do all the post-processing and fire the events like in the GetData method of DataReader
}

Помимо всего этого, кажущегося чрезмерно сложным, я сталкиваюсь с двумя проблемами: а) все мои обработчики событий кажутся пустыми, даже если я подписан на них, и б) я не знаю, где и как повышатьзавершенное событие.

Мой вопрос: когда вы посмотрите на мой метод GetData в классе DataReader, как бы вы предложили мне сделать так, чтобы очень дорогие вызовы базы данных выполнялись асинхронно?

Ответы [ 2 ]

0 голосов
/ 15 декабря 2018

Давайте использовать современные возможности: модель производителя / потребителя через конвейеры с BlockingCollection класс.

Внутри вашего GetData метода запустите две задачи: одну для получения данныхсекунда для обработки данных.

Вы все еще можете использовать свою систему событий.Просто одновременно добавьте данные в коллекцию, что не займет много времени.

Во втором задании данные извлекаются из коллекции и обрабатываются.Ожидание по методу GetConsumingEnumerable выполняется очень эффективно.

class DataReader
{
    public CancellationTokenSource CTS { get; } = new CancellationTokenSource();

    internal void GetData()
    {
        // Use the desired data type instead of string
        var values = new BlockingCollection<string>();

        var readTask = Task.Factory.StartNew(() =>
        {
            try
            {
                // here your code
                for (...)
                {
                    if (CTS.Token.IsCancellationRequested)
                        break;

                    foreach (var row in rawdata)
                    {
                        DataRowReadArgs args = new DataRowReadArgs(row.Index);
                        //...
                        values.Add(args); // put value to blocking collection
                    }
                }
            }
            catch (Exception e) { /* process possible exception */}
            finally { values.CompleteAdding(); }

        }, TaskCreationOptions.LongRunning);

        var processTask = Task.Factory.StartNew(() =>
        {
            foreach (var value in values.GetConsumingEnumerable())
            {
                if (CTS.Token.IsCancellationRequested)
                    break;

                // process value
            }
        }, TaskCreationOptions.LongRunning);

        Task.WaitAll(readTask, processTask);            
    }
}

Вы можете отменить задачи в любое время:

var dataReader = new DataReader();
dataReader.GetData();
dataReader.CTS.Cancel();

Вместо Task.WaitAll вы можете использовать await Task.WhenAll(readTask, processTask);
В этом случае подпись метода должна быть следующей: async Task GetDataAsync()

0 голосов
/ 15 декабря 2018

Ваш псевдокод выглядит нормально, я попытался проверить его с помощью программы, в которой я имитировал вызовы к базе данных с Task.Delay(5000) и разрешал доступ только для одной задачи в любой момент (чтобы учесть тот факт, что база данных одиночная)с резьбой).

class Program
{
    public static async Task Main(string[] args)
    {
        var dataReader = new DataReader();
        dataReader.DataProgressChanged += (s, e) => Log.D($"*** Event - Processed {e.TaskId}");
        dataReader.DataReadCompleted += (s, e) => Log.D("*** Event - Data read complete");

        await dataReader.GetDataAsync();

        Console.ReadKey();
    }
}

public class DataReader
{
    internal delegate void DataProgressChangedHandler(object sender, DataProgressChangedArgs e);
    internal delegate void DataReadCompleteHandler(object sender, DataReadCompleteArgs e);

    internal event DataProgressChangedHandler DataProgressChanged;
    internal event DataReadCompleteHandler DataReadCompleted;

    private SemaphoreSlim semaphore = new SemaphoreSlim(1);

    internal async Task GetDataAsync()
    {
        Log.D("Start");
        var tasks = CreateCursorReadTasks();
        var processingTasks = tasks.Select(AwaitAndProcessAsync).ToList();
        await Task.WhenAll(processingTasks);
        OnDataReadCompleted(new DataReadCompleteArgs());
    }

    private IList<ReadTaskWrapper> CreateCursorReadTasks()
    {
        var retval = new List<ReadTaskWrapper>();
        for (int totalrows = 0; totalrows < 4; totalrows++)
        {
            int taskId = totalrows;
            retval.Add(new ReadTaskWrapper
            {
                Task = Task.Run(async () => { return await SimulateDbReadAsync(taskId); }),
                Id = taskId
            });
        }
        return retval;
    }

    private async Task<string[][]> SimulateDbReadAsync(int taskId)
    {
        await semaphore.WaitAsync();
        Log.D($"Starting data read task {taskId}");
        await Task.Delay(5000);
        Log.D($"Finished data read task {taskId}");
        semaphore.Release();
        return new string[1][];
    }

    internal async Task AwaitAndProcessAsync(ReadTaskWrapper task)
    {
        string[][] rawdata = await task.Task;
        Log.D($"Start postprocessing of task {task.Id}");
        await Task.Delay(3000);
        Log.D($"Finished prostprocessing of task {task.Id}");

        OnDataProgressChanged(new DataProgressChangedArgs { TaskId = task.Id });
    }

    internal void OnDataProgressChanged(DataProgressChangedArgs args)
    {
        DataProgressChanged?.Invoke(this, args);
    }

    internal void OnDataReadCompleted(DataReadCompleteArgs args)
    {
        DataReadCompleted?.Invoke(this, args);
    }

    internal class DataProgressChangedArgs : EventArgs
    {
        public int TaskId { get; set; }
    }

    internal class DataReadCompleteArgs : EventArgs
    {
    }
}

public class ReadTaskWrapper
{
    public int Id { get; set; }
    public Task<string[][]> Task { get; set; }
}

public static class Log
{
    public static void D(string msg)
    {
        Console.WriteLine($"{DateTime.Now.ToLongTimeString()}: {msg}");
    }
}

Вывод предполагает, что он работает правильно.В примере запускаются 4 задачи, и доступ к базе данных занимает 5 секунд каждый раз, а постобработка - 3 секунды (чтобы сделать это очевидным).Общее время выполнения составляет около 23 секунд (4 * 5 + 3), это означает, что постобработка выполняется параллельно чтению из базы данных.События происходят также, как и ожидалось.Порядок, в котором выполняются задачи, отличается при каждом запуске программы.См. Следующий вывод программы:

15:54:20: Start
15:54:20: Starting data read task 2
15:54:25: Finished data read task 2
15:54:25: Starting data read task 0
15:54:25: Start postprocessing of task 2
15:54:28: Finished prostprocessing of task 2
15:54:28: *** Event - Processed 2
15:54:30: Finished data read task 0
15:54:30: Starting data read task 3
15:54:30: Start postprocessing of task 0
15:54:33: Finished prostprocessing of task 0
15:54:33: *** Event - Processed 0
15:54:35: Finished data read task 3
15:54:35: Start postprocessing of task 3
15:54:35: Starting data read task 1
15:54:38: Finished prostprocessing of task 3
15:54:38: *** Event - Processed 3
15:54:40: Finished data read task 1
15:54:40: Start postprocessing of task 1
15:54:43: Finished prostprocessing of task 1
15:54:43: *** Event - Processed 1
15:54:43: *** Event - Data read complete

Для дальнейшего изучения: где вы создаете экземпляр класса DataReader в своей программе и как вы подписываетесь на события?Можете ли вы описать более подробно, что вы подразумеваете под комментарием «этого не« ожидают »в том смысле, в котором я ожидал»?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...