Долгосрочная задача, не имеющая доступа к консоли? - PullRequest
0 голосов
/ 18 января 2019

Я пишу продюсер-> Очередь -> Потребитель -> Очередь2 -> Приложение Потребитель2

У меня потребитель2 ждет, пока список достигнет порогового значения, а затем запускает другую задачу, имитирующую долгосрочную задачу (например, SQL Multi = insert).

Однако, когда я запускаю приложение, «долго выполняющаяся задача» (LongRunningTaskInsert ()), кажется, ждет, пока все очереди не дадут сигнал о завершении, прежде чем писать в консоль.

Когда я отлаживаю, переменная List Lists показывает, что некоторые задачи завершаются в середине приложения. Я делаю что-то неправильно / непослушное с заданиями?

Код:

class Program
{
    static  void Main(string[] args)
    {
        BlockingCollection<string> bag1 = new BlockingCollection<string>();
        BlockingCollection<string> bag2 = new BlockingCollection<string>();
        var Tasks = new List<Task>();
        List<string> Container = new List<string>();

        Task consumer2 = Task.Factory.StartNew(() =>
        {
            foreach (var item in bag2.GetConsumingEnumerable())
            {
                Container.Add(item);
                if (bag2.IsCompleted || Container.Count > 5)
                {
                    Console.WriteLine("Container:");
                    Container.ForEach(y =>
                    {
                        Console.Write($"Value: {y}, ");
                    });
                    Console.Write("\n");

                    var newTask = Task.Factory.StartNew(() => {
                        Thread.Sleep(2000);
                        LongRunningTaskInsert();

                    }
                    );
                    Tasks.Add(newTask);

                    Container.Clear();
                }
            }

            Task.WhenAll(Tasks);
        });

        //this is a task that evaluates all available elements on separate threads.
        Task consumer1 = Task.Factory.StartNew(() =>
        {
            //do something with the consumer
            Parallel.ForEach(
                bag1.GetConsumingEnumerable(),
                (x) =>
                {
                    Console.WriteLine($"Consumer {x} => bag2, thread {Thread.CurrentThread.ManagedThreadId}");
                    bag2.Add(x);
                });
            bag2.CompleteAdding();

        });

        Task producer = Task.Factory.StartNew(() =>
        {
            //do something to put records into the bad
            for (int i = 0; i < 10; i++)
            {


                System.Threading.Thread.Sleep(500);

                bag1.Add(i.ToString());
                bag1.Add((i * 10).ToString());
                bag1.Add((i + 10).ToString());
                Console.WriteLine($"Producer: {i} & { i * 10} & {i + 10}");
            }

            bag1.CompleteAdding();
        });

        producer.Wait();
        consumer1.Wait();
        consumer2.Wait();
        Console.Read();
    }

    private static bool LongRunningTaskInsert()
    {
        //Thread.Sleep(1000);
        Console.WriteLine("Long Running Task Complete");
        return true;
    }
}

редактирование: Вывод, который я получаю:

Производитель: 0 & 0 & 10
Потребитель 0 => bag2, нить 4
Потребитель 0 => bag2, нить 6
Потребитель 10 => bag2, нить 5
Производитель: 1 & 10 & 11
Потребитель 10 => bag2, нить 8
Потребитель 11 => bag2, нить 10
Потребитель 1 => bag2, нить 9
Контейнер:
Значение: 0, Значение: 0, Значение: 10, Значение: 10, Значение: 11, Значение: 1,
Производитель: 2 & 20 & 12
Потребитель 20 => bag2, нить 4
Потребитель 2 => мешок2, нить 6
Потребитель 12 => bag2, нить 5
Производитель: 3 & 30 & 13
Потребитель 3 => bag2, нить 10
Потребитель 30 => bag2, нить 9
Потребитель 13 => bag2, нить 8
Контейнер:
Значение: 20, Значение: 2, Значение: 12, Значение: 3, Значение: 30, Значение: 13,
Производитель: 4 & 40 & 14
Потребитель 4 => bag2, нить 4
Потребитель 40 => bag2, нить 6
Потребитель 14 => bag2, нить 5
Производитель: 5 & 50 & 15
Потребитель 5 => bag2, нить 10
Потребитель 15 => bag2, нить 8
Потребитель 50 => bag2, нить 9
Контейнер:
Значение: 4, Значение: 40, Значение: 14, Значение: 5, Значение: 15, Значение: 50,
Производитель: 6 & 60 & 16
Потребитель 6 => bag2, нить 6
Потребитель 60 => bag2, нить 6
Производитель: 7 & 70 & 17
Потребитель 16 => bag2, нить 4
Потребитель 70 => bag2, нить 5
Потребитель 17 => bag2, нить 5
Потребитель 7 => bag2, нить 4
Контейнер:
Значение: 6, Значение: 60, Значение: 16, Значение: 70, Значение: 17, Значение: 7,
Производитель: 8 & 80 & 18
Потребитель 8 => bag2, нить 6
Потребитель 80 => bag2, нить 6
Производитель: 9 & 90 & 19
Потребитель 90 => bag2, нить 4
Потребитель 19 => bag2, нить 4
Потребитель 18 => bag2, нить 8
Потребитель 9 => bag2, нить 8
Контейнер:
Значение: 8, значение: 80, значение: 90, значение: 19, значение: 18, значение: 9,
Длительное задание выполнено
Длительное задание выполнено
Длительное задание выполнено
Длительное задание выполнено
Длительное задание выполнено

Я ожидаю, что «Длительная задача выполнена» будет смешана, а не все кластеризованы в конце.

1 Ответ

0 голосов
/ 14 февраля 2019

Оператор Parallel.Foreach порождает кучу потоков, и мой метод LongRunningTaskInsert () не получает никакого времени. Если я изменяю это на синхронный цикл foreach, мое количество потоков уменьшается с 8 до 4, и яполучить ожидаемые результаты (где смешиваются длительные вызовы консоли задач).

...