Beam - функции для запуска только один раз в начале и в конце трубопровода Beam. - PullRequest
1 голос
/ 09 июля 2020

У меня есть конвейер Beam, который запрашивает BigQuery, а затем загружает результаты в BigTable. Я хотел бы масштабировать свой экземпляр BigTable (с 1 до 10 узлов) перед запуском конвейера, а затем снова масштабировать (с 10 до 1 узла) после загрузки результатов в BigTable. Есть ли какой-либо механизм для этого с помощью Beam?

По сути, я бы хотел иметь два отдельных преобразования: одно в начале конвейера и одно в конце, которые увеличивают и уменьшают узлы соответственно. Или используйте DoFn, который запускает только setup() и teardown() для одного рабочего.

Я попытался использовать setup() и teardown() из функций жизненного цикла DoFn. Но эти функции выполняются один раз для каждого рабочего (а я использую сотни рабочих), поэтому он будет пытаться увеличивать и уменьшать BigTable несколько раз (и достигать дневных квот записи экземпляра и кластера). Так что это не работает с моим вариантом использования. В любом случае вот фрагмент BigTableWriteFn, с которым я экспериментировал:

class _BigTableWriteFn(beam.DoFn):

    def __init__(self, project_id, instance_id, table_id, cluster_id, node_count):
        beam.DoFn.__init__(self)
        self.beam_options = {
            'project_id': project_id,
            'instance_id': instance_id,
            'table_id': table_id,
            'cluster_id': cluster_id,
            'node_count': node_count
        }
        self.table = None
        self.initial_node_count = None
        self.batcher = None
        self.written = Metrics.counter(self.__class__, 'Written Row')

    def setup(self):
        client = Client(project=self.beam_options['project_id'].get(), admin=True)
        instance = client.instance(self.beam_options['instance_id'].get())
        node_count = self.beam_options['node_count'].get()
        cluster = instance.cluster(self.beam_options['cluster_id'].get())
        self.initial_node_count = cluster.serve_nodes
        if node_count != self.initial_node_count:  # I realize this logic is flawed since the cluster.serve_nodes will change after the first setup() call, but I first thought setup() and teardown() was run once for the whole transform...
            cluster.serve_nodes = node_count
            cluster.update()

    ## other life cycle methods in between but aren't important to the question

    def teardown(self):
        client = Client(project=self.beam_options['project_id'].get(), admin=True)
        instance = client.instance(self.beam_options['instance_id'].get())
        cluster = instance.cluster(self.beam_options['cluster_id'].get())
        if cluster.serve_nodes != self.initial_node_count: # I realize this logic is flawed since the cluster.serve_nodes will change after the first setup() call, but I first thought setup() and teardown() was run once for the whole transform...
            cluster.serve_nodes = self.initial_node_count
            cluster.update()

Я также использую параметры RuntimeValueProvider для идентификаторов bigtable (project_id, instance_id, cluster_id, et c), поэтому Я чувствую, что какой бы тип преобразования я ни делал для масштабирования, мне нужно использовать DoFn.

Любая помощь будет принята с благодарностью!

Ответы [ 2 ]

1 голос
/ 09 июля 2020

Итак, я придумал хакерский подход, но он работает.

Во время setup() моего WriteFn я получаю количество кластеров .serve_nodes (это, очевидно, изменится после того, как первый рабочий вызовет setup() ) и масштабируйте кластер, если это не желаемое количество. И в функции process() я получаю это количество. Затем я делаю beam.CombineGlobally и нахожу Smallest(1) этих подсчетов. Затем я передаю это другому DoFn, который масштабирует кластер до этого минимального количества.

Вот несколько фрагментов кода того, что я делаю.

class _BigTableWriteFn(beam.DoFn):
    """ Creates the connector can call and add_row to the batcher using each
    row in beam pipe line
    """
    def __init__(self, project_id, instance_id, table_id, cluster_id, node_count):
        """ Constructor of the Write connector of Bigtable
        Args:
        project_id(str): GCP Project of to write the Rows
        instance_id(str): GCP Instance to write the Rows
        table_id(str): GCP Table to write the `DirectRows`
        cluster_id(str): GCP Cluster to write the scale
        node_count(int): Number of nodes to scale to before writing
        """
        beam.DoFn.__init__(self)
        self.beam_options = {
            'project_id': project_id,
            'instance_id': instance_id,
            'table_id': table_id,
            'cluster_id': cluster_id,
            'node_count': node_count
        }
        self.table = None
        self.current_node_count = None
        self.batcher = None
        self.written = Metrics.counter(self.__class__, 'Written Row')

    def __getstate__(self):
        return self.beam_options

    def __setstate__(self, options):
        self.beam_options = options
        self.table = None
        self.current_node_count = None
        self.batcher = None
        self.written = Metrics.counter(self.__class__, 'Written Row')

    def setup(self):
        client = Client(project=self.beam_options['project_id'].get(), admin=True)
        instance = client.instance(self.beam_options['instance_id'].get())
        cluster = instance.cluster(self.beam_options['cluster_id'].get())
        cluster.reload()
        desired_node_count = self.beam_options['node_count'].get()
        self.current_node_count = cluster.serve_nodes
        if desired_node_count != self.current_node_count:
            cluster.serve_nodes = desired_node_count
            cluster.update()

    def start_bundle(self):
        if self.table is None:
            client = Client(project=self.beam_options['project_id'].get())
            instance = client.instance(self.beam_options['instance_id'].get())
            self.table = instance.table(self.beam_options['table_id'].get())

        self.batcher = self.table.mutations_batcher()

    def process(self, row):
        self.written.inc()
        # You need to set the timestamp in the cells in this row object,
        # when we do a retry we will mutating the same object, but, with this
        # we are going to set our cell with new values.
        # Example:
        # direct_row.set_cell('cf1',
        #                     'field1',
        #                     'value1',
        #                     timestamp=datetime.datetime.now())
        self.batcher.mutate(row)
        # return the initial node count so we can find the minimum value and scale down BigTable latter
        if self.current_node_count:
            yield self.current_node_count

    def finish_bundle(self):
        self.batcher.flush()
        self.batcher = None


class _BigTableScaleNodes(beam.DoFn):

    def __init__(self, project_id, instance_id, cluster_id):
        """ Constructor of the Scale connector of Bigtable
        Args:
        project_id(str): GCP Project of to write the Rows
        instance_id(str): GCP Instance to write the Rows
        cluster_id(str): GCP Cluster to write the scale
        """
        beam.DoFn.__init__(self)
        self.beam_options = {
            'project_id': project_id,
            'instance_id': instance_id,
            'cluster_id': cluster_id,
        }
        self.cluster = None

    def setup(self):
        if self.cluster is None:
            client = Client(project=self.beam_options['project_id'].get(), admin=True)
            instance = client.instance(self.beam_options['instance_id'].get())
            self.cluster = instance.cluster(self.beam_options['cluster_id'].get())


    def process(self, min_node_counts):
        if len(min_node_counts) > 0 and self.cluster.serve_nodes != min_node_counts[0]:
            self.cluster.serve_nodes = min_node_counts[0]
            self.cluster.update()

def run():
    custom_options = PipelineOptions().view_as(CustomOptions)
    
    pipeline_options = PipelineOptions()

    p = beam.Pipeline(options=pipeline_options)
    (p
    | 'Query BigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
    | 'Map Query Results to BigTable Rows' >> beam.Map(to_direct_rows)
    | 'Write BigTable Rows' >> beam.ParDo(_BigTableWriteFn(
        custom_options.bigtable_project_id, 
        custom_options.bigtable_instance_id, 
        custom_options.bigtable_table_id,
        custom_options.bigtable_cluster_id,
        custom_options.bigtable_node_count))
    | 'Find Global Min Node Count' >> beam.CombineGlobally(beam.combiners.Smallest(1))
    | 'Scale Down BigTable' >> beam.ParDo(_BigTableScaleNodes(
        custom_options.bigtable_project_id, 
        custom_options.bigtable_instance_id, 
        custom_options.bigtable_cluster_id))
    )

    result = p.run()
    result.wait_until_finish()

1 голос
/ 09 июля 2020

Если вы запускаете задание потока данных не как шаблон, а как jar в виртуальной машине или модуле, то вы можете сделать это до и после запуска конвейера, выполнив команды bash из java. Обратитесь к этому - { ссылка }

Команда для выполнения -

gcloud bigtable clusters update CLUSTER_ID --instance=INSTANCE_ID --num-nodes=NUM_NODES

Но если вы работаете как шаблон, тогда файл шаблона не будет учитывать ничего, кроме что между началом и концом конвейера

...