обработка колонок в apache beam? в основном прямая заливка - PullRequest
0 голосов
/ 20 мая 2019

Я пытаюсь интерполировать (вперед-заполнить) значения таблицы. input: таблица BigQuery с n + 1 столбцами, где n - набор показаний, а +1 - столбец Time (время, когда было выполнено чтение). Большинство из этих столбцов пусты. вывод: таблица BigQuery с теми же n + 1 столбцами, что пустые значения заменяются последними известными значениями. (пустые значения в начале времени игнорируются).

Это эквивалентно пандам df.fillna (method = 'pad').

Я хотел бы запустить эту проблему на огромных таблицах, используя службу потока данных googles через apache-beam.

Кажется, Beam хорош в обработке строк, но я не могу найти способ обработки столбцов. Очевидно, что когда у меня есть столбец, я могу легко перебирать его и интерполировать значения по ходу работы.

Хотя я не уверен, как работает память в потоке данных. Мы должны убедиться, что он может обрабатывать необходимый объем данных.

beam.io.Read(beam.io.BigQuerySource(table_path))

При чтении таблицы из большого запроса вы получаете коллекцию строк как мне получить столбец? Даже запрос возвращает то же самое ....

Ответы [ 2 ]

1 голос
/ 21 мая 2019

Если прямое заполнение вашей попытки выполняется только в конце каждого столбца, я бы предложил использовать объединитель, чтобы найти последнее значение в каждом столбце, которое было заполнено, на основе временной отметки строки.

ALL_MY_COLUMNS = ['foo', 'bar', ...]


class FindLastValue(core.CombineFn):
  def create_accumulator(self, *args, **kwargs):
    # first dict stores timestamps for columns while second dict stores last value seen
    return ({}, {})

  def add_input(self, mutable_accumulator, element, *args, **kwargs):
    for column in ALL_MY_COLUMNS:
      # if the column is populated and we haven't captured the value before or the timestamp of the element is greater then the value we have seen in the past then we will record this as the last known value. 
      if element[column] is not None and (mutable_accumulator[0][column] is None or mutable_accumulator[0][column] < element['timestamp']):
            mutable_accumulator[0][column] = element['timestamp']
            mutable_accumulator[1][column] = element[column]

  def merge_accumulators(self, accumulators, *args, **kwargs):
    # merge the accumulators based upon which has the smallest timestamp per column
    merged = ({}, {})
    for accum in accumulators:
      if element[column] is not None:
         if merged[0][column] is None or merged[0][column] > accum[0][column]:
            merged[0][column] = accum[0][column]
            merged[1][column] = accum[1][column]
    return merged

  def extract_output(self, accumulator, *args, **kwargs):
    # return a dict of column to last known value
    return accumulator[1]


def update_to_last_value(value, side_input):
  for column in ALL_MY_COLUMNS:
    if value[column] is None:
      if side_input[column] is None:
        # What do you want to do if the column is empty for all values?
      else:
        value[column] = side_input[column]


p = ... create pipeline ...
data = 'Read' >> p | beam.io.Read(beam.io.BigQuerySource(table_path))
side_input = 'Last Value' | CombineGlobally(sum).as_singleton_view()
# take the data that you computed as the 'last' value for each column and provide it to a function which updates any columns that are unset.
output = 'Output' >> data | Map(lambda main, s: update_to_last_value(main, side_input), side_input)
... any additional transforms that you want.

Вышеупомянутый конвейер будет хорошо распараллеливаться, потому что вы будете вычислять последнее значение параллельно (это мощность сумматора). После этого вы сможете обновлять все записи параллельно, так как было вычислено последнее значение.

Обратите внимание, что это не решит произвольные разреженные секции в столбцах. Эти показания происходят с регулярной частотой, так что вы можете гарантировать, что в каждой строке Y будет определенное значение?

0 голосов
/ 20 мая 2019

Боюсь, если вы используете луч, вам придется написать свой собственный DoFn, чтобы справиться с этим.Что-то вроде (псевдокод):

DoFn(input_element):
  for all the field_to_fill repeat:
    input_element.field_to_fill = NEW_VALUE;
  emit input_element

И применить это ко всему набору данных (то есть, от beam.io.read ()).

Мой ответ ограничен лучом,Там может быть функция в BigQuery может сделать доступ к столбцу легко.

...