Создание точек сохранения
В настоящее время точки сохранения создаются путем сохранения конвейера в файл csv после каждого преобразования.Настройка в целом выглядит следующим образом:
Я делаю это, имея объект-преобразователь-обертку, для которого я могу предоставить функцию преобразования и дополнительныйпараметр точки сохранения.Объект преобразования может создавать точки сохранения, если они предоставлены:
class SavePointNode(beam.PTransform):
def __init__(self, transform, savepoint=None, **kwargs):
super(beam.PTransform, self).__init__(**kwargs)
self.transform = transform
self.savepoint = savepoint
def expand(self, p):
# transform data
result = p | beam.ParDo(self.transform)
# load to savepoint if available
if self.savepoint:
return result | beam.io.WriteToText(self.savepoint)
return result
Мой первый вопрос заключается в следующем:
- Существует ли уже используемый по умолчанию способ создания точек сохранения в Apache Beam?
Загрузка точек сохранения
Создавая точки сохранения, я хочу иметь возможность перезапустить конвейер на определенном узле, чтобы сэкономить время, не возвращаясь к каждому шагу до точки сохранения.Например: если на третьем шаге преобразования произошла ошибка:
Я хочу иметь возможность перезапустить конвейер с точки сохранения, созданной вторымшаг преобразования:
Мои вопросы по этому поводу следующие:
- Существует ли в настоящее время способ сделать это по умолчаниюв Apache Beam?
- Если нет других хороших способов добиться такого поведения?
- Я поступил неправильно и могли бы быть какие-то другие решения, чтобы сократить время перезагрузки, если произошла ошибкапредоставлено Apache Beam?