Apache Beam TextIO.read, а затем объединить в партии - PullRequest
0 голосов
/ 29 мая 2018

После использования TextIO.read для получения PCollection<String> отдельных строк, возможно ли затем использовать какое-либо комбинированное преобразование в пакеты (например, группы по 25)?Таким образом, возвращаемый тип будет выглядеть примерно так: PCollection<String, List<String>>.Похоже, что это возможно при использовании какого-то CombineFn, но API все еще немного загадочен для меня.

В данном контексте я читаю файлы CSV (потенциально очень очень большие), анализирую+ обработка строк и превращение их в JSON, а затем вызов REST API ... но я не хочу использовать REST API для каждой строки в отдельности, поскольку REST API поддерживает несколько элементов одновременно (до 1000, поэтомуне вся партия).

1 Ответ

0 голосов
/ 30 мая 2018

Полагаю, вы можете выполнить несколько простых операций, как показано ниже (используя stateful API ).Состояние , которое вы хотите сохранить в BatchingFn, является текущим буфером строк или self._lines.Извините, я сделал это в python (не знаком с Java API)

from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo

MY_BATCH_SIZE = 512

class BatchingFn(DoFn):
  def __init__(self, batch_size=100):
    self._batch_size = batch_size

  def start_bundle(self):
    # buffer for string of lines
    self._lines = []

  def process(self, element):
    # Input element is a string (representing a CSV line)
    self._lines.append(element)
    if len(_lines) >= self._batch_size:
      self._flush_batch()

  def finish_bundle(self):
    # takes care of the unflushed buffer before finishing
    if self._lines:
      self._flush_batch()

  def _flush_batch(self):
    #### Do your REST API call here with self._lines
    # .....
    # Clear the buffer.
    self._lines = []

# pcoll is your PCollection of lines.
(pcoll | 'Call Rest API with batch data' >> ParDo(BatchingFn(MY_BATCH_SIZE)))

Что касается использования Управляемых данными триггеров , вы можете сослаться на Batch PCollection в Beam / Dataflow.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...