Передача данных из одного потока в другой (постоянно) - PullRequest
0 голосов
/ 07 апреля 2020

У меня есть два постоянно работающих потока:

  1. Источник - на каждой итерации он генерирует значения и помещает их в массив типа int. Массив передается в поток потребителя. Источник больше не будет использовать сгенерированные значения.

  2. Потребитель - на каждой итерации создается строка из полученного массива и выводится результат на консоль.

Существует порядок сгенерированных значений массива. Он должен быть одинаковым в обеих темах.


public class Producer
{
    private int[] m_outputData;

    private Random m_numberGenerator = null;

    private Thread m_runner = null;

    private Consumer m_consumer = null;

    public Producer()
    {
        m_outputData = new int[10];

        m_numberGenerator = new Random();

        m_runner = new Thread(new ThreadStart(Run));
        m_runner.IsBackground = true;

        m_consumer = new Consumer();
    }

    public void Start()
    {
        m_consumer.Start();

        m_runner.Start();
    }

    private void Run()
    {
        while (true)
        {
            for (int i = 0; i < m_outputData.Length; i++)
            {
                m_outputData[i] = m_numberGenerator.Next(1, 100);
            }

            m_consumer.SetData(m_outputData);

            Thread.Sleep(100);
        }
    }
}

public class Consumer
{
    private int[] m_inputData;

    private Thread m_runner = null;

    private readonly object m_dataLock = new object();

    public Consumer()
    {
        m_inputData = new int[10];

        m_runner = new Thread(new ThreadStart(Run));
        m_runner.IsBackground = true;
    }
    public void Start()
    {
        m_runner.Start();
    }

    public void SetData(int[] arr)
    {
        lock (m_dataLock)
        {
            for (int i = 0; i < m_inputData.Length; i++)
            {
                m_inputData[i] = arr[i];
            }
        }
    }

    private void Run()
    {
        while (true)
        {
            string data = "";

            lock (m_dataLock)
            {
                for (int i = 0; i < m_inputData.Length; i++)
                {
                    data += m_inputData[i].ToString();
                    data += ",";
                }
            }

            Console.WriteLine(data);

            Thread.Sleep(100);
        }
    }
}

В настоящее время я использую простой l oop для копирования значений по одному. Этот l oop защищен замком. Я обратился к методу Array.Copy, однако он выглядит более дорогим при использовании небольших массивов (вызов функции).

Какой самый оптимальный способ передачи массива (с точки зрения производительности) между моими потоками?

1 Ответ

0 голосов
/ 07 апреля 2020

ОБНОВЛЕНИЕ: Использование ConcurrentQueue для поддержки нескольких потребителей

Это очень простой пример производителя / потребителя, использующий Task.Run (), а не потоки.

Здесь очень важно отметить, что реализации производителя и потребителя полностью отделены друг от друга. и использует ConcurrentQueue для обмена информацией между производителем и потребителями.

private static void Producer(ConcurrentQueue<int[]> eventQueue, CancellationToken cancellationToken)
{
    Random m_numberGenerator = new Random();

    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();

        int[] m_outputData = new int[10];

        for (int i = 0; i < m_outputData.Length; i++)
        {
            m_outputData[i] = m_numberGenerator.Next(1, 100);
        }

        eventQueue.Enqueue(m_outputData);

        Thread.Sleep(m_numberGenerator.Next(100, 201));
    }
}

private static void Consumer(ConcurrentQueue<int[]> eventQueue, CancellationToken cancellationToken)
{
    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();

        int[] m_inputData;
        if (eventQueue.TryDequeue(out m_inputData))
        {
            Console.WriteLine(string.Join(" ", m_inputData));
        }
        else
        {
            Console.WriteLine("No new data is available.");
        }
        Thread.Sleep(100);
    }
}

static void Main(string[] args)
{
    ConcurrentQueue<int[]> eventQueue = new ConcurrentQueue<int[]>();

    CancellationTokenSource consumerTokenSource = new CancellationTokenSource();
    CancellationToken consumerCancellationToken = consumerTokenSource.Token;

    Task.Run(() => Consumer(eventQueue, consumerCancellationToken), consumerCancellationToken);

    CancellationTokenSource producerTokenSource = new CancellationTokenSource();
    CancellationToken producerCancellationToken = consumerTokenSource.Token;

    Task.Run(() => Producer(eventQueue, producerCancellationToken), producerCancellationToken);

    Console.WriteLine("Press any ket to exit.");

    Console.ReadLine();
}

Вывод:

No new data is available.
8 57 70 28 78 58 40 79 39 87
61 5 12 75 21 56 77 15 96 29
No new data is available.
33 89 26 11 51 72 28 41 78 84
No new data is available.
92 74 80 54 78 24 78 68 46 16
64 78 84 88 40 60 46 1 41 81
19 25 80 2 68 7 68 51 98 9
No new data is available.
67 13 38 95 66 78 54 36 29 82
No new data is available.
53 25 48 26 63 32 68 94 78 43
89 56 55 73 76 55 71 1 95 18
20 12 71 51 45 93 68 46 5 52
No new data is available.
45 72 56 54 42 44 36 83 5 31
54 24 82 40 92 2 98 64 29 87
81 6 37 54 13 87 74 26 32 39
No new data is available.
68 11 83 18 22 14 28 64 13 14
18 58 48 69 5 2 67 48 40 24
No new data is available.
89 91 51 18 87 32 48 9 78 98
92 55 71 57 75 79 85 37 64 74
No new data is available.
40 87 79 7 43 82 73 59 84 98
35 21 92 67 5 27 96 16 4 35
55 65 93 97 48 26 87 62 4 59
No new data is available.
47 51 63 39 81 88 66 36 20 15
No new data is available.
55 21 50 65 9 75 59 41 48 71
98 33 14 14 46 35 47 72 22 95
No new data is available.
75 74 42 4 66 45 8 50 67 15
89 10 72 48 82 17 12 4 62 12
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...