Является ли результат itertools.tee () поточно-безопасным (Python) - PullRequest
11 голосов
/ 15 июля 2011

Предположим, у меня есть этот код Python:

from itertools import count, tee
original = count()     # just an example, can be another iterable
a, b = tee(original)

Вопрос в том, возникнут ли какие-либо проблемы, если я начну выполнять итерацию «a» в одном потоке и одновременно выполнять итерацию «b» в другом потоке? Ясно, что a и b совместно используют некоторые данные (исходные итерируемые, + некоторые дополнительные вещи, внутренние буферы или что-то). Итак, a.next () и b.next () будут выполнять соответствующую блокировку при доступе к этим общим данным?

Ответы [ 3 ]

10 голосов
/ 19 июня 2017

tl; dr

В CPython itertools.tee является потокобезопасным тогда и только тогда, когда оригинальный итератор реализован на C / C ++, т.е. не использует python.

Если оригинальный итератор it был написан на python, как экземпляр класса или генератор, то itertools.tee(it) является не поточно-ориентированным.В лучшем случае вы получите только исключение (что и будет), а в худшем случае произойдет сбой питона.

Вместо использования tee, здесь есть класс-обертка и функция, которые являютсяsafe:

class safeteeobject(object):
    """tee object wrapped to make it thread-safe"""
    def __init__(self, teeobj, lock):
        self.teeobj = teeobj
        self.lock = lock
    def __iter__(self):
        return self
    def __next__(self):
        with self.lock:
            return next(self.teeobj)
    def __copy__(self):
        return safeteeobject(self.teeobj.__copy__(), self.lock)

def safetee(iterable, n=2):
    """tuple of n independent thread-safe iterators"""
    lock = Lock()
    return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))

Теперь я расширю (много) информацию о том, когда tee не является поточно-ориентированным, и почему.

Пример, где все нормально

Давайте запустим некоторый код (это код на Python 3, для Python 2 используйте itertools.izip вместо zip, чтобы иметь такое же поведение):

>>> from itertools import tee, count
>>> from threading import Thread

>>> def limited_sum(it):
...     s = 0
...     for elem, _ in zip(it, range(1000000)):
...         s += elem
...     print(elem)

>>> a, b = tee(count())
>>> [Thread(target=limited_sum, args=(it,)).start() for it in [a, b]]
# prints 499999500000 twice, which is in fact the same 1+...+999999

itertools.count полностью написан на C ++ вфайл Modules/itertoolsmodule.c проекта CPython, поэтому он работает просто отлично.

То же самое верно для: списков, кортежей, наборов, диапазона, словарей (ключей, значений и элементов), collections.defaultdict (ключей, значения и предметы) и некоторые другие.

Пример, где это не работает - Генераторы

Очень короткий пример использования генератора:

>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]

Exception in thread Thread-10:
Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
ValueError: generator already executing

Да, tee написано на C ++, и это правда, что этот GIL выполняет по одному байтовому коду за раз.Но приведенный выше пример показывает, что этого недостаточно для обеспечения безопасности потоков.Где-то вдоль линии это то, что произошло:

  1. Два потока вызвали next на своих экземплярах tee_object столько раз,
  2. Поток 1 вызывает next(a),
  3. Требуется получить новый элемент, поэтому поток 1 теперь вызывает next(gen),
  4. gen написан на python.Например, первый байт-код gen.__next__ CPython решает переключить потоки,
  5. Поток 2 возобновляет и вызывает next(b),
  6. Требуется получить новый элемент, поэтому он вызываетnext(gen)
  7. Поскольку gen.__next__ уже запущен в потоке 1, мы получаем исключение.

Пример, где это не работает - объект итератора

Хорошо, может быть, это просто не потокобезопасно использовать генераторы внутри teeЗатем мы запускаем вариант вышеупомянутого кода, который использует объект итератора:

>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...     def __iter__(self):
...         return self
...     def __next__(self):
...         self.i -= 1
...         if self.i < 0:
...             raise StopIteration
...         return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)

Вышеупомянутый код вылетает в python 2.7.13 и 3.6 (и, вероятно, во всех версиях cpython), в Ubuntu, Windows 7 и OSX,Я пока не хочу раскрывать причину, еще один шаг до этого.

Что если я использую блокировки внутри моего итератора?

Может быть, приведенный выше код вылетает, потому что сам наш итератор не был потоком-безопасный.Давайте добавим блокировку и посмотрим, что произойдет:

>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...         self.lock = Lock()
...     def __iter__(self):
...         return self
...     def __next__(self):
...         with self.lock:
...             self.i -= 1
...             if self.i < 0:
...                 raise StopIteration
...             return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)

Добавление блокировки внутри нашего итератора недостаточно, чтобы сделать tee поточно-безопасным.

Почему ти не является поточно-безопасным

Суть дела заключается в getitem методе teedataobject в файле Modules/itertoolsmodule.c CPython.Реализация tee действительно классная, с оптимизацией, которая экономит вызовы ОЗУ: tee возвращает «объекты-тройники», каждый из которых сохраняет ссылку на заголовок teedataobject.Они, в свою очередь, похожи на ссылки в связанном списке, но вместо того, чтобы содержать один элемент - они содержат 57. Это не очень важно для наших целей, но это то, что есть.Вот getitem функция teedataobject:

static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
    PyObject *value;

    assert(i < LINKCELLS);
    if (i < tdo->numread)
        value = tdo->values[i];
    else {
        /* this is the lead iterator, so fetch more data */
        assert(i == tdo->numread);
        value = PyIter_Next(tdo->it);
        if (value == NULL)
            return NULL;
        tdo->numread++;
        tdo->values[i] = value;
    }
    Py_INCREF(value);
    return value;
}

Когда запрашивается элемент, teedataobject проверяет, подготовлен ли он.Если это так, то возвращает.Если это не так, он вызывает next на исходном итераторе.Это где, если итератор написан на python, код может зависнуть.Итак, вот проблема:

  1. Два потока вызвали next одинаковое количество раз,
  2. Поток 1 вызывает next(a), и код C попадает в PyIter_Nextзвоните выше.Например, на первом байт-коде next(gen) CPython решает переключить потоки.
  3. Поток 2 вызывает next(b), и, поскольку ему все еще нужен новый элемент, код C попадает в PyIter_NextЗвоните,

В этот момент оба потока находятся в одном месте с одинаковыми значениями для i и tdo->numread.Обратите внимание, что tdo->numread - это просто переменная, которая отслеживает, где в ссылке из 57 ячеек teedataobject должно записывать следующее.

Поток 2 завершает свой вызов PyIter_Next и возвращает элемент.В какой-то момент CPython решает снова переключить потоки,

Поток 1 возобновляет работу, завершает свой вызов PyIter_Next, а затем запускает две строки:

    tdo->numread++;
    tdo->values[i] = value;

Но поток 2 уже установил tdo->values[i]!

Этого уже достаточно, чтобы показать, что tee не является потокобезопасным, так как мы теряем значение, введенное потоком 2tdo->values[i].Но это не объясняет сбой.

Скажем, i было 56. Поскольку оба потока вызывают tdo->numread++, теперь он достигает 58 - выше 57, выделенный размер tdo->values.После продолжения потока 1 объект tdo не имеет больше ссылок и готов к удалению.Это функция очистки для teedataobject:

static int
teedataobject_clear(teedataobject *tdo)
{
    int i;
    PyObject *tmp;

    Py_CLEAR(tdo->it);
    for (i=0 ; i<tdo->numread ; i++)
        Py_CLEAR(tdo->values[i]); // <----- PROBLEM!!!
    tmp = tdo->nextlink;
    tdo->nextlink = NULL;
    teedataobject_safe_decref(tmp);
    return 0;
}

В строке, отмеченной «ПРОБЛЕМА», CPython попытается очистить tdo->values[57].Вот где происходит сбой.Ну, иногда.Существует более одного места для сбоя, я просто хотел показать одно.

Теперь вы знаете - itertools.tee не является поточно-ориентированным.

Одно решение - внешняя блокировка

Вместо блокировки внутри нашего итератора __next__, мы можем поставить блокировку вокруг tee.__next__.Это означает, что весь метод teedataobject.__getitem__ будет вызываться одним потоком каждый раз.Я дал краткую реализацию в начале этого ответа.Это вставная замена для tee, которая является поточно-ориентированной.Единственное, что он не реализует, что делает tee - это травление.Так как замки не могут быть взломаны, добавить это не тривиально.Но, конечно, это можно сделать.

2 голосов
/ 15 июля 2011

Если эквивалентный код указан в документации, здесь:

правильно, то нет, это не будет потокобезопасным.

Обратите внимание, что, хотя deque задокументировано, что оно поддерживает потоковое добавление и удаление, оно не дает никаких гарантий для кода, который его использует.

Поскольку основной код может в конечном итоге запрашивать базовый итератор для элементов в нескольких потоках, вам необходимо иметь потокобезопасную коллекцию и итератор в качестве входных данных, чтобы обеспечить безопасность.

0 голосов
/ 15 июля 2011

В C-Python itertools.tee() и итератор, который он возвращает, реализованы с использованием C-кода. Это означает, что GIL должен защищать его от одновременного вызова несколькими потоками. Вероятно, он будет работать правильно, и это не приведет к сбою интерпретатора, , но не гарантируется, что будет потокобезопасным.

Проще говоря, не рискуйте.

...