Цепные вложенные урожаи - PullRequest
1 голос
/ 01 мая 2019

Я хочу построить конвейер данных, который выполняет серию операций со строками ряда данных.

Большинство функций будут работать по принципу «один ряд - один ряд», но некоторые из этих операций будут «расширять» ряды - я имею в виду, что одна строка войдет в функцию и болеечем одна строка может быть сгенерирована в результате этой функции.

Я хочу настроить цепочку функций, которые достаточно надежны, чтобы самим справляться с этим поведением без необходимости писать кучу кода надзора.

Использование yield происходило как предоставление возможности - если каждая функция потребляла доход от предыдущей функции и действовала как генератор, тогда я могу произвольно связать воедино кучу этих правильно сформированных функций, которыебыло бы неплохо с точки зрения элегантности.

Вот мой установочный код: func_x действует как простая функция 1-1, а func_y выполняет расширение.

from collections import OrderedDict
data_source = [ OrderedDict({"id" : "1", "name" : "Tom", "sync" : "a"}),
            OrderedDict({"id" : "2", "name" : "Steve", "sync" : "a"}),
            OrderedDict({"id" : "3", "name" : "Ulrich", "sync" : "b"}),
            OrderedDict({"id" : "4", "name" : "Victor", "sync" : "b"}),
            OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c"}),
            OrderedDict({"id" : "6", "name" : "Xavier", "sync" : "c"}),
            OrderedDict({"id" : "7", "name" : "Yves", "sync" : "c"}),
            OrderedDict({"id" : "8", "name" : "Zaphod", "sync" : "d"})]
def row_getter(source):
    for content in source:
        yield content.copy()

def func_x(row):
    try:
        q=next(row)
        if q['name']=="Tom":
            q['name']="Richard"
        yield q.copy()
    except StopIteration:
        print ("Stop x")


def func_y(row):
    try:
        q=next(row)
        for thingy in range(0,2):
            q['thingy']=thingy
            yield q.copy()
    except StopIteration:
        print ("Stop y")

rg = row_getter(data_source)
iter_func = func_y(func_x(rg))

Теперь я могу получить первый набор данныхитерируя по объекту iter_func:

print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 0)])

И снова:

print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 1)])

И снова, но на этот раз вместо просмотра записи для Стива (т.е. следующей записи в потокеТеперь расширение func_y для первой записи завершено) Я получаю ошибку StopIteration.

print (next(iter_func))
>> StopIteration                             Traceback (most recent call last)
<ipython-input-15-0fd1ed48c61b> in <module>()
----> 1 print (next(iter_func))

StopIteration: 

Так что я не понимаю, откуда это происходит, так как я пытался перехватить этив обоих func_x и func_y.

Ответы [ 2 ]

1 голос
/ 01 мая 2019

Встроенные инструменты (в частности, map и itertools.chain) могут сделать это за вас.

from collections import OrderedDict
from itertools import chain


data_source = [ OrderedDict({"id" : "1", "name" : "Tom", "sync" : "a"}),
            OrderedDict({"id" : "2", "name" : "Steve", "sync" : "a"}),
            OrderedDict({"id" : "3", "name" : "Ulrich", "sync" : "b"}),
            OrderedDict({"id" : "4", "name" : "Victor", "sync" : "b"}),
            OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c"}),
            OrderedDict({"id" : "6", "name" : "Xavier", "sync" : "c"}),
            OrderedDict({"id" : "7", "name" : "Yves", "sync" : "c"}),
            OrderedDict({"id" : "8", "name" : "Zaphod", "sync" : "d"})]


def rename(d):
    if d['name'] == "Tom":
        d['name'] = "Richard"
    return d


def add_thingy(d):
    for y in range(2):
        yield {'thingy': y, **d}

for x in chain.from_iterable(add_thingy(d) 
                             for d in map(rename,
                                          data_source)):
    print(x)

map на самом деле не нужно;мы можем применить rename к каждому dict, прежде чем передать его в add_thingy в выражении генератора.

for x in chain.from_iterable(add_thingy(rename(d)) for d in data_source):
    print(x)

или пойти другим путем и дважды использовать map:

for x in chain.from_iterable(map(add_thingy, map(rename, data_source))):
    print(x)
1 голос
/ 01 мая 2019

Ваша функция func_x возвращает только один элемент, поэтому она завершается после того, как этот предмет будет израсходован. Попробуйте что-то вроде этого:

def func_x(row):
    try:
        for q in row:
            if q['name']=="Tom":
                q['name']="Richard"
            yield q
    except StopIteration:
        print ("Stop x")

Кстати, обратите внимание, что каждый урожай дает , а не делает копию объекта. Это может быть хорошо во многих случаях, но обратите внимание, что в func_y вы получаете один и тот же объект дважды, устанавливая для 'thingy' другое значение. Это означает, что если, например, вы делаете (после кода, который вы опубликовали):

d1 = next(iter_func)
d2 = next(iter_func)

d1 и d2 будут одним и тем же объектом, и, в частности, оба будут иметь 'thingy', установленный на 1.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...