Если прямое заполнение вашей попытки выполняется только в конце каждого столбца, я бы предложил использовать объединитель, чтобы найти последнее значение в каждом столбце, которое было заполнено, на основе временной отметки строки.
ALL_MY_COLUMNS = ['foo', 'bar', ...]
class FindLastValue(core.CombineFn):
def create_accumulator(self, *args, **kwargs):
# first dict stores timestamps for columns while second dict stores last value seen
return ({}, {})
def add_input(self, mutable_accumulator, element, *args, **kwargs):
for column in ALL_MY_COLUMNS:
# if the column is populated and we haven't captured the value before or the timestamp of the element is greater then the value we have seen in the past then we will record this as the last known value.
if element[column] is not None and (mutable_accumulator[0][column] is None or mutable_accumulator[0][column] < element['timestamp']):
mutable_accumulator[0][column] = element['timestamp']
mutable_accumulator[1][column] = element[column]
def merge_accumulators(self, accumulators, *args, **kwargs):
# merge the accumulators based upon which has the smallest timestamp per column
merged = ({}, {})
for accum in accumulators:
if element[column] is not None:
if merged[0][column] is None or merged[0][column] > accum[0][column]:
merged[0][column] = accum[0][column]
merged[1][column] = accum[1][column]
return merged
def extract_output(self, accumulator, *args, **kwargs):
# return a dict of column to last known value
return accumulator[1]
def update_to_last_value(value, side_input):
for column in ALL_MY_COLUMNS:
if value[column] is None:
if side_input[column] is None:
# What do you want to do if the column is empty for all values?
else:
value[column] = side_input[column]
p = ... create pipeline ...
data = 'Read' >> p | beam.io.Read(beam.io.BigQuerySource(table_path))
side_input = 'Last Value' | CombineGlobally(sum).as_singleton_view()
# take the data that you computed as the 'last' value for each column and provide it to a function which updates any columns that are unset.
output = 'Output' >> data | Map(lambda main, s: update_to_last_value(main, side_input), side_input)
... any additional transforms that you want.
Вышеупомянутый конвейер будет хорошо распараллеливаться, потому что вы будете вычислять последнее значение параллельно (это мощность сумматора). После этого вы сможете обновлять все записи параллельно, так как было вычислено последнее значение.
Обратите внимание, что это не решит произвольные разреженные секции в столбцах. Эти показания происходят с регулярной частотой, так что вы можете гарантировать, что в каждой строке Y будет определенное значение?