Пакетная обработка всех элементов в ConcurrentBag - PullRequest
0 голосов
/ 03 июня 2018

У меня есть следующий вариант использования.Несколько потоков создают точки данных, которые собираются в ConcurrentBag.Каждые x ms отдельный потребительский поток просматривает точки данных, поступившие с момента последнего времени, и обрабатывает их (например, подсчитывает их + вычисляет среднее).

Следующий код более или менее представляет решение, которое я нашелс:

private static ConcurrentBag<long> _bag = new ConcurrentBag<long>();

static void Main()
{
    Task.Run(() => Consume());
    var producerTasks = Enumerable.Range(0, 8).Select(i => Task.Run(() => Produce()));
    Task.WaitAll(producerTasks.ToArray());
}

private static void Produce()
{
    for (int i = 0; i < 100000000; i++)
    {
        _bag.Add(i);
    }
}

private static void Consume()
{
    while (true)
    {
        var oldBag = _bag;
        _bag = new ConcurrentBag<long>();
        var average = oldBag.DefaultIfEmpty().Average();
        var count = oldBag.Count;
        Console.WriteLine($"Avg = {average}, Count = {count}");
        // Wait x ms
    }
}
  • Является ли ConcurrentBag подходящим инструментом для работы здесь?
  • Является ли переключение пакетов правильным способом для очистки списка для новых точек данных и последующей обработки старых?
  • Безопасно ли работать со старым пакетом или у меня могут возникнуть проблемы, когда яперебирать oldBag и поток все еще добавляет элемент?
  • Должен ли я использовать Interlocked.Exchange () для переключения переменных?

РЕДАКТИРОВАТЬ

Полагаю, приведенный выше код не очень хорошо представлял то, чего я пытаюсь достичь.Итак, вот еще немного кода, чтобы показать проблему:

public class LogCollectorTarget : TargetWithLayout, ILogCollector
{
    private readonly List<string> _logMessageBuffer;

    public LogCollectorTarget()
    {
        _logMessageBuffer = new List<string>();
    }

    protected override void Write(LogEventInfo logEvent)
    {
        var logMessage = Layout.Render(logEvent);
        lock (_logMessageBuffer)
        {
            _logMessageBuffer.Add(logMessage);
        }
    }

    public string GetBuffer()
    {
        lock (_logMessageBuffer)
        {
            var messages =  string.Join(Environment.NewLine, _logMessageBuffer);
            _logMessageBuffer.Clear();
            return messages;
        }
    }
}

Цель класса - собирать журналы, чтобы их можно было отправлять на сервер партиями.Каждые x секунд вызывается GetBuffer.Это должно получить текущие сообщения журнала и очистить буфер для новых сообщений.Он работает с блокировками, но, поскольку они довольно дорогие, я не хочу блокировать каждую операцию регистрации в моей программе.Вот почему я хотел использовать ConcurrentBag в качестве буфера.Но тогда мне все еще нужно переключить или очистить его, когда я вызываю GetBuffer без потери каких-либо сообщений журнала, которые происходят во время переключения.

Ответы [ 2 ]

0 голосов
/ 03 июня 2018

Поскольку у вас есть один потребитель, вы можете работать с простым ConcurrentQueue без обмена коллекциями:

public class LogCollectorTarget : TargetWithLayout, ILogCollector
{
    private readonly ConcurrentQueue<string> _logMessageBuffer;

    public LogCollectorTarget()
    {
        _logMessageBuffer = new ConcurrentQueue<string>();
    }

    protected override void Write(LogEventInfo logEvent)
    {
        var logMessage = Layout.Render(logEvent);
        _logMessageBuffer.Enqueue(logMessage);
    }

    public string GetBuffer()
    {
        // How many messages should we dequeue?
        var count = _logMessageBuffer.Count;

        var messages = new StringBuilder();

        while (count > 0 && _logMessageBuffer.TryDequeue(out var message))
        {   
            messages.AppendLine(message);   
            count--;
        }       

        return messages.ToString();
    }
}

Если выделения памяти становятся проблемой, вы можете вместо этого удалить их из очереди до фиксированного размерамассив и вызов string.Join на нем.Таким образом, вы гарантированно сделаете только два выделения (тогда как StringBuilder может сделать гораздо больше, если исходный буфер не имеет правильного размера):

public string GetBuffer()
{
    // How many messages should we dequeue?
    var count = _logMessageBuffer.Count;
    var buffer = new string[count];

    for (int i = 0; i < count; i++)
    {   
        _logMessageBuffer.TryDequeue(out var message);
        buffer[i] = message;   
    }       

    return string.Join(Environment.NewLine, buffer);
}
0 голосов
/ 03 июня 2018

Является ли ConcurrentBag подходящим инструментом для работы здесь?

Это правильный инструмент для работы, это действительно зависит от того, что вы пытаетесь сделать, и почему.Пример, который вы привели, очень упрощен без контекста, поэтому трудно сказать.

Является ли переключение пакетов правильным способом для очистки списка для новых точек данных и последующей обработки старых?

Ответ - нет, возможно, по многим причинам.Что произойдет, если поток записывает в него, пока вы его переключаете?

Безопасно ли работать со oldBag или у меня могут возникнуть проблемы, когда я перебираю oldBag, а поток все еще добавляет элемент?

Нет, вы только что скопировали ссылку, это ничего не даст.

Должен ли я использовать Interlocked.Exchange () для переключения переменных?

Методы блокировки - замечательные вещи, однако это не поможет вам в вашей текущей проблеме, они предназначены для поточно-безопасного доступа к значениям целочисленных типов.Вы действительно сбиты с толку, и вам нужно поискать больше примеров, ориентированных на многопоточность.


Однако, давайте направим вас в правильном направлении.забудьте о ConcurrentBag и тех модных классах.Мой совет: начните с простого и используйте блокировку, чтобы вы понимали природу проблемы.

Если вы хотите, чтобы несколько задач / потоков имели доступ к списку, вы можете легко использовать оператор lock и защитить доступ к списку./ массив, поэтому другие неприятные потоки не изменяют его.

Очевидно, что написанный вами код является бессмысленным примером, я имею в виду, что вы просто добавляете последовательные числа в список и получаете другой поток для их усреднения.Вряд ли это вообще должен быть потребительский производитель, и имело бы больше смысла просто быть синхронным.

На этом этапе я хотел бы указать вам на лучшие архитектуры, которые позволили бы вам реализовать этот шаблон, например, Tpl Dataflow, но я боюсь, что это всего лишь учебный акциз, и, к сожалению, вам действительно нужно больше читать по многопоточности ипопробуйте больше примеров, прежде чем мы действительно сможем помочь вам с проблемой.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...