Создание точек сохранения и запуск с определенного узла в Apache Beam - PullRequest
0 голосов
/ 25 июня 2018

Создание точек сохранения

В настоящее время точки сохранения создаются путем сохранения конвейера в файл csv после каждого преобразования.Настройка в целом выглядит следующим образом:

savepoint pipeline

Я делаю это, имея объект-преобразователь-обертку, для которого я могу предоставить функцию преобразования и дополнительныйпараметр точки сохранения.Объект преобразования может создавать точки сохранения, если они предоставлены:

class SavePointNode(beam.PTransform):
    def __init__(self, transform, savepoint=None, **kwargs):
        super(beam.PTransform, self).__init__(**kwargs)
        self.transform = transform
        self.savepoint = savepoint

    def expand(self, p):

        # transform data
        result = p | beam.ParDo(self.transform)

        # load to savepoint if available
        if self.savepoint:
            return result | beam.io.WriteToText(self.savepoint)

        return result

Мой первый вопрос заключается в следующем:

  • Существует ли уже используемый по умолчанию способ создания точек сохранения в Apache Beam?

Загрузка точек сохранения

Создавая точки сохранения, я хочу иметь возможность перезапустить конвейер на определенном узле, чтобы сэкономить время, не возвращаясь к каждому шагу до точки сохранения.Например: если на третьем шаге преобразования произошла ошибка:

error in third transform step

Я хочу иметь возможность перезапустить конвейер с точки сохранения, созданной вторымшаг преобразования:

restart at second savepoint

Мои вопросы по этому поводу следующие:

  • Существует ли в настоящее время способ сделать это по умолчаниюв Apache Beam?
  • Если нет других хороших способов добиться такого поведения?
  • Я поступил неправильно и могли бы быть какие-то другие решения, чтобы сократить время перезагрузки, если произошла ошибкапредоставлено Apache Beam?
...