Как обернуть генератор фильтром? - PullRequest
0 голосов
/ 26 апреля 2020

У меня есть серия подключенных генераторов, и я хочу создать фильтр, который можно использовать для обертывания одного из генераторов. Эта оболочка фильтра должна принимать генератор и функцию в качестве параметров. Если элемент данных во входящем потоке не соответствует требованиям фильтра, он должен быть передан следующему генератору без прохождения через обернутый генератор. Здесь я привел рабочий пример, который должен прояснить, к чему я стремлюсь:

import functools

is_less_than_three = lambda x : True if x < 3 else False

def add_one(numbers):
    print("new generator created")
    for number in numbers:
        yield number + 1

def wrapper(generator1, filter_):
    @functools.wraps(generator1)
    def wrapped(generator2):
        for data in generator2:
            if filter_(data):
                yield from generator1([data])
            else:
                yield data
    return wrapped

add_one_to_numbers_less_than_three = wrapper(add_one, is_less_than_three)
answers = add_one_to_numbers_less_than_three(range(6))
for answer in answers:
    print(answer)

#new generator created
#1
#new generator created
#2
#new generator created
#3
#3
#4
#5

Проблема в том, что для этого требуется создание нового генератора для каждого элемента данных. Должен быть лучший способ? Я также попытался использовать itertools.tee и разбить генератор, но это вызывает проблемы с памятью, когда генераторы выдают значения с разной скоростью (они это делают). Как я могу выполнить sh, что делает приведенный выше код без воссоздания генераторов и без проблем с памятью?

отредактировано для добавления справочной информации ниже

В качестве ввода I получит большие видеопотоки. Видеопотоки могут заканчиваться или не заканчиваться (это может быть веб-камера). Пользователи могут выбирать, какие этапы обработки изображения будут выполняться для видеокадров, поэтому порядок и количество функций будут меняться. Впоследствии функции должны иметь возможность принимать выходы друг друга в качестве входных данных.

Я достиг этого, используя серию генераторов. Отношение ввода: вывода для генераторов / функций является переменным - оно может быть 1: n, 1: 1 или n: 1 (например, для извлечения нескольких объектов (подизображений) из изображения, которое будет обрабатываться отдельно).

В настоящее время эти генераторы принимают несколько параметров, которые повторяются среди них (не DRY), и я пытаюсь уменьшить количество параметров путем рефакторинга их в отдельные генераторы или оболочки. Одним из наиболее сложных из них является фильтр потока данных, чтобы определить, следует ли применять функцию к кадру (функция может быть ресурсоемкой и не нужна для всех кадров).

Число параметров делает использование функции более сложным для понимания пользователем. Это также усложняет мне задачу, поскольку всякий раз, когда я хочу внести изменение в один из общих параметров, мне приходится редактировать его для всех функций.

edit2 переименовывает функцию в генератор в примере кода чтобы сделать его более понятным

edit3 решение Спасибо @Blckknght. Это можно решить путем создания бесконечного итератора, который передает значение локальной переменной в генератор. Я немного изменил свой пример, изменив add_one на генератор 1: n вместо генератора 1: 1, чтобы показать, как это решение может работать и для генераторов 1: n.

import functools

is_less_than_three = lambda x : True if x < 3 else False

def add_one(numbers):
    print("new generator created")
    for number in numbers:
        if number == 0:
            yield number - 1
            yield number
        else:
            yield number

def wrapper(generator1, filter_):
    @functools.wraps(generator1)
    def wrapped(generator2):
        local_variable_passer = generator1(iter(lambda: data, object()))
        for data in generator2:
            if filter_(data):
                next_data = next(local_variable_passer)
                if data == 0:
                    yield next_data
                    next_data = next(local_variable_passer)
                    yield next_data
                else:
                    yield next_data
            else:
                yield data
    return wrapped

add_one_to_numbers_less_than_three = wrapper(add_one, is_less_than_three)
answers = add_one_to_numbers_less_than_three(range(6))
for answer in answers:
    print(answer)

#new generator created
#-1
#0
#1
#2
#3
#3
#4
#5

Ответы [ 2 ]

1 голос
/ 28 апреля 2020

Насколько я понимаю, у вас есть поток видеокадров, и вы пытаетесь создать конвейер функций обработки, которые изменяют поток. Различные функции обработки могут изменять количество кадров, поэтому один входной кадр может привести к нескольким выходным кадрам, или несколько входных кадров могут быть использованы до создания одного выходного кадра. Некоторые функции могут быть 1: 1, но это не то, на что вы можете рассчитывать.

Ваша текущая реализация использует функции генератора для всей обработки. Выходная функция выполняет итерации по цепочке, и каждый шаг обработки в конвейере запрашивает кадры из предыдущего, используя итерацию.

Функция, которую вы пытаетесь написать прямо сейчас, является своего рода избирательным обходом. Вы хотите, чтобы некоторые кадры (удовлетворяющие некоторому условию) передавались какой-то уже существующей функции генератора, а другие кадры пропускали обработку и просто go прямо в вывод. К сожалению, это, вероятно, невозможно сделать с генераторами Python. Протокол итерации просто недостаточно сложен для его поддержки.

Во-первых, можно сделать это за 1: 1 с генераторами, но вы не можете легко обобщить до n: 1 или 1: n случаев. Вот как это может выглядеть для 1: 1:

def selective_processing_1to1(processing_func, condition, input_iterable):
    processing_iterator = processing_func(iter(lambda: input_value, object()))
    for input_value in input_iterator:
        if condition(input_value):
            yield next(processing_iterator)
        else:
            yield input_value

На этапе создания processing_iterator проделана большая работа. Используя форму с двумя аргументами iter с функцией lambda и сторожевым объектом (который никогда не будет получен), я создаю бесконечный итератор, который всегда возвращает текущее значение локальной переменной input_value. Затем я передаю этот итератор в функцию processing_func. Я могу выборочно вызвать next для объекта генератора, если я хочу применить обработку, которую фильтр представляет к текущему значению, или я могу просто получить значение самостоятельно, не обрабатывая его.

Но поскольку это работает только на один кадр за раз, это не подходит для фильтров n: 1 или 1: n (и я даже не хочу думать о m: n видах сценариев ios).

«Выглядимый» итератор, который позволяет вам видеть, каким будет следующее значение, перед тем, как вы его итерируете, может позволить вам поддерживать ограниченную форму выборочной фильтрации для n: 1 процессов (то есть где возможно переменная n входные кадры go в один выходной кадр). Ограничение состоит в том, что вы можете выполнять выборочную фильтрацию только для первого из n кадров, которые будут использованы обработкой, остальные будут приняты без того, чтобы у вас была возможность проверить их в первую очередь. Может быть, этого достаточно?

Во всяком случае, вот как это выглядит:

_sentinel = object()
class PeekableIterator:
    def __init__(self, input_iterable):
        self.iterator = iter(input_iterable)
        self.next_value = next(self.iterator, _sentinel)

    def __iter__(self):
        return self

    def __next__(self):
        if self.next_value != _sentinel:
            return_value = self.next_value
            self.next_value = next(self.iterator, _sentinel)
            return return_value
        raise StopIteration

    def peek(self):                 # this is not part of the iteration protocol!
        if self.next_value != _sentinel:
            return self.next_value
        raise ValueError("input exhausted")

def selective_processing_Nto1(processing_func, condition, input_iterable):
    peekable = PeekableIterator(input_iterable)
    processing_iter = processing_func(peekable)
    while True:
        try:
            value = peekable.peek()
            print(value, condition(value))
        except ValueError:
            return
        try:
            yield next(processing_iter) if condition(value) else next(peekable)
        except StopIteration:
            return

Это так же хорошо, как мы можем практически сделать, когда функция обработки является генератором. Если бы мы хотели сделать больше, например, поддерживать обработку 1: n, нам понадобился бы какой-то способ узнать, насколько большим будет n, чтобы мы могли получить столько значений, прежде чем решить, будем ли мы передавать следующее входное значение или нет. Хотя вы можете написать собственный класс для обработки, который бы сообщал об этом, это, вероятно, менее удобно, чем простой повторный вызов функции обработки, как в вопросе.

1 голос
/ 26 апреля 2020

Архитектура является условной картой - поэтому каждый элемент должен отображаться индивидуально. Это означает, что function должен получать один номер, а не много номеров.


Пока существует соединение 1: 1 без сохранения состояния, используйте функция вместо генератора.

def add_one(number):  # takes one number
    return number + 1  # provides one number

def conditional_map(function, condition):
    @functools.wraps(function)
    def wrapped(generator):
        return (
            function(item) if condition(item)
            else item for item in generator
        )
    return wrapped

for answer in conditional_map(add_one, lambda x: x < 3)(range(6)):
    print(answer)

Если данные должны быть переданы в «генератор» с состоянием, это сопрограмма и должна быть разработана как таковая. Это означает, что yield используется как для получения , так и для предоставления данных.

from itertools import count

def add_increment(start=0):
    # initially receive data
    number = yield
    for increment in count(start):
        # provide and receive data
        number = yield number + increment

Поскольку это все еще соединение 1: 1, его можно использовать с предыдущим conditional_map .

mapper = add_increment()
next(mapper)  # prime the coroutine - this could be done with a decorator

for answer in conditional_map(mapper.send, lambda x: x < 3)(range(6)):
    print(answer)

Если необходимо соединение 1: n, ожидать получения генератора для каждого входа.

def add_some(number):  # takes one number
    yield number - 1
    yield number
    yield number + 1

def conditional_map(function, condition):
    @functools.wraps(function)
    def wrapped(generator):
        for data in generator:
            if filter_(data):
                yield from function(data)  # passes one *one* item
            else:
                yield data
    return wrapped

Если с состоянием 1: n требуется соединение, можно использовать сопрограмму, которая создает генератор / итерацию.

def add_increments(start=0):
    # initially receive data
    number = yield
    for increment in count(start):
        # provide and receive data
        number = yield (number + increment + i for i in (-1, 0, 1))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...