Я хочу построить конвейер данных, который выполняет серию операций со строками ряда данных.
Большинство функций будут работать по принципу «один ряд - один ряд», но некоторые из этих операций будут «расширять» ряды - я имею в виду, что одна строка войдет в функцию и болеечем одна строка может быть сгенерирована в результате этой функции.
Я хочу настроить цепочку функций, которые достаточно надежны, чтобы самим справляться с этим поведением без необходимости писать кучу кода надзора.
Использование yield
происходило как предоставление возможности - если каждая функция потребляла доход от предыдущей функции и действовала как генератор, тогда я могу произвольно связать воедино кучу этих правильно сформированных функций, которыебыло бы неплохо с точки зрения элегантности.
Вот мой установочный код: func_x
действует как простая функция 1-1, а func_y
выполняет расширение.
from collections import OrderedDict
data_source = [ OrderedDict({"id" : "1", "name" : "Tom", "sync" : "a"}),
OrderedDict({"id" : "2", "name" : "Steve", "sync" : "a"}),
OrderedDict({"id" : "3", "name" : "Ulrich", "sync" : "b"}),
OrderedDict({"id" : "4", "name" : "Victor", "sync" : "b"}),
OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c"}),
OrderedDict({"id" : "6", "name" : "Xavier", "sync" : "c"}),
OrderedDict({"id" : "7", "name" : "Yves", "sync" : "c"}),
OrderedDict({"id" : "8", "name" : "Zaphod", "sync" : "d"})]
def row_getter(source):
for content in source:
yield content.copy()
def func_x(row):
try:
q=next(row)
if q['name']=="Tom":
q['name']="Richard"
yield q.copy()
except StopIteration:
print ("Stop x")
def func_y(row):
try:
q=next(row)
for thingy in range(0,2):
q['thingy']=thingy
yield q.copy()
except StopIteration:
print ("Stop y")
rg = row_getter(data_source)
iter_func = func_y(func_x(rg))
Теперь я могу получить первый набор данныхитерируя по объекту iter_func:
print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 0)])
И снова:
print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 1)])
И снова, но на этот раз вместо просмотра записи для Стива (т.е. следующей записи в потокеТеперь расширение func_y для первой записи завершено) Я получаю ошибку StopIteration
.
print (next(iter_func))
>> StopIteration Traceback (most recent call last)
<ipython-input-15-0fd1ed48c61b> in <module>()
----> 1 print (next(iter_func))
StopIteration:
Так что я не понимаю, откуда это происходит, так как я пытался перехватить этив обоих func_x
и func_y
.