Как отменить GetConsumingEnumerable () для BlockingCollection - PullRequest
8 голосов
/ 02 февраля 2012

В следующем коде я использую CancellationToken, чтобы разбудить GetConsumingEnumerable (), когда производитель не производит, и я хочу выйти из foreach и выйти из Задачи.Но я не вижу, чтобы IsCancellationRequested регистрировался, и мой Task.Wait (timeOut) ждет полного периода timeOut.Что я делаю не так?

userToken.Task = Task.Factory.StartNew(state =>
{
    userToken.CancelToken = new CancellationTokenSource();

    foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token))
    {
        if (userToken.CancelToken.IsCancellationRequested)
        {
            Log.Write("BroadcastQueue IsCancellationRequested");
            break;
            ...
        }
    }

    return 0;
}, "TaskSubscribe", TaskCreationOptions.LongRunning);

позже ...

UserToken.CancelToken.Cancel();          
try
{
    task.Wait(timeOut);
}
catch (AggregateException ar)
{
    Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg);
}
catch (OperationCanceledException)
{
    Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg);
}

Ответы [ 2 ]

10 голосов
/ 27 сентября 2012

Вы можете использовать CompleteAdding (), который означает, что больше элементов не будет добавлено в коллекцию.Если используется GetConsumingEnumerable, то foreach будет корректно завершаться, так как он будет знать, что нет смысла ждать больше элементов.

В основном, как только вы закончите добавлять элементы в BlockingCollection, просто сделайте: myBlockingCollection.CompleteAdding ()

Любые потоки, которые выполняют цикл foreach с GetConsumingEnumerable, прекратят цикл.

5 голосов
/ 04 февраля 2012

Я создал быстрый прототип, и мне кажется, что он работает.

Примечание Thread.Sleep (1000) прямо перед запросом отмены токена. Возможно, вы создаете условие гонки для переменной Token, поскольку вы создаете и получаете доступ к переменной item.CancelToken в разных потоках.

например. код, предназначенный для отмены задачи, может вызывать отмену при неправильном (предыдущем или нулевом) токене отмены.

static void Main(string[] args)
{
    CancellationTokenSource token = null;
    BlockingCollection<string> coll = new BlockingCollection<string>();
    var t = Task.Factory.StartNew(state =>
    {
        token = new CancellationTokenSource();
        try
        {
            foreach (var broadcast in coll.GetConsumingEnumerable(token.Token))
            {
                if (token.IsCancellationRequested)
                {
                    return;
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Cancel");
            return;
        }
    }, "TaskSubscribe", TaskCreationOptions.LongRunning);
    Thread.Sleep(1000);
    token.Cancel();
    t.Wait();
}
...