Непонятная ошибка во время Apache Beam `process_outputs` на работнике Google Dataflow - PullRequest
0 голосов
/ 21 ноября 2018

Я запускаю следующий успешный Тестовый конвейер Apache Beam в Google Dataflow.Он использует Datastore в качестве источника и приемника.Многие объекты в нашей базе данных назначены пространствам имен.Этот конвейер предназначен для выполнения _do_work() на всех объектах определенного вида в заданных пространствах имен.Обратите внимание, что аналогичный тестовый конвейер, который делает то же самое с сущностями без пространства имен, также успешно выполняется:

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1 import helper as apache_helper
from apache_beam.io.gcp.datastore.v1 import datastoreio
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper

from .pipelines.dataflow_settings import (
    PROJECT, NAMESPACES_PER_PIPELINE
)

class NamespacedDatastoreMigration(_DatastoreMigrationBase):
    """
    Map a do-function over a query multiplexed across several namespaces.

    The inheritor must implement the following:
     - a PROJECT class attribute
     - a do-function (_do_work())
     - a method to get the namespaces across which to shard the query (
       get_namespaces())
    """
    _NAMESPACES_PER_PIPELINE = NAMESPACES_PER_PIPELINE  # 25

    def __init__(self, argv, migration_history_obj, model):
        super(NamespacedDatastoreMigration, self).__init__(argv, migration_history_obj, model)
        self._namespaces = self.get_namespaces()
        self._pipelines = self._create_pipelines(argv)

    def get_namespaces(self):
        query_pb = query_pb2.Query()
        helper.set_kind(query_pb, "__namespace__")
        client = apache_helper.get_datastore(PROJECT)
        namespace_entities = apache_helper.fetch_entities(PROJECT, '', query_pb, client)

        namespaces = []
        for n in namespace_entities:
            # Get namespace name or id
            key_path = n.key.path[-1]
            if key_path.HasField('id'):
                name_or_id = key_path.id
            else:
                name_or_id = key_path.name

            # Avoid duplicates and test namespaces
            if len(str(name_or_id)) > 1 and name_or_id not in namespaces:
                namespaces.append(name_or_id)

        return namespaces

    def run(self):
        for pipeline in self._pipelines:
            pipeline.run()

    def _create_pipelines(self, argv):
        pipelines = []
        for namespaces in zip(*[iter(self._namespaces)] * self._NAMESPACES_PER_PIPELINE):
            p = beam.Pipeline(argv=argv)
            (
                (
                    p | 'ReadNamespace_{}'.format(
                        ns
                    ) >> datastoreio.ReadFromDatastore(
                        project=self.PROJECT,
                        query=self.query(),
                        namespace=ns
                    )
                    for ns in namespaces
                )
                | 'JoinNamespaceEntities' >> beam.Flatten()
                | self.__class__.__name__ >> beam.FlatMap(self._do_work)
                | self._get_sink()
            )
            pipelines.append(p)

        return pipelines

model = "App"
NamespacedDatastoreMigration(
    argv,
    kwargs.get('migration_history_obj'),  # Irrelevant here
    model  # Entity kind
).run()

, где argv:

argv = [
    '--project={0}'.format(PROJECT),
    '--job_name=' + name,  # A human readable descriptor that's been cleaned
    '--staging_location=gs://{0}/migrations/'.format(BUCKET),
    '--temp_location=gs://{0}/migrations/'.format(BUCKET),
    '--setup_file=./setup.py',
    '--runner=DataflowRunner'
]

Это основано на подклассе:

class _DatastoreMigrationBase(object):
    PROJECT = PROJECT

    def __init__(self, argv, migration_history_obj, model):
        self.migration_history_obj = migration_history_obj

        if not model:
            raise Exception('This operation requires a model class name.')
        self.model = model

    def query(self):
        # Instantiate a filter protobuf
        filter_pb = query_pb2.Filter()

        # Get all non-deleted model instances
        helper.set_property_filter(
            filter_pb,
            'deleted',
            query_pb2.PropertyFilter.EQUAL,
            False
        )

        # Instantiate a query protobuf
        query_pb = query_pb2.Query(
            filter=filter_pb
        )
        helper.set_kind(query_pb, self.model)

        return query_pb

    def _get_source(self):
        return 'DatastoreRead' >> datastoreio.ReadFromDatastore(
            self.PROJECT,
            self.query(),
        )

    @staticmethod
    def _do_work(entity):
        return entity

    def _get_sink(self):
        return 'WriteToDatastore' >> datastoreio.WriteToDatastore(
            self.PROJECT
        )

Однако, когда я подкласс NamespacedDatastoreMigration, например, так:

from ..helpers import create_argv
from ..mappers import NamespacedDatastoreMigration


class CampaignActionField(NamespacedDatastoreMigration):
    @staticmethod
    def _do_work(entity):
        target_url = entity.properties.get('target_url').string_value
        message = entity.properties.get('message').string_value
        path = entity.properties.get('path').string_value
        if target_url and not message and not path:
            entity.properties.get('action').string_value = 'webhook'

        return entity


model = "Campaign"  # Entity kind
CampaignActionField(
    create_argv(kwargs.get('name')),  # "ED-2275 Campaign action field"
    kwargs.get('migration_history_obj'),  # Irrelevant here
    model
).run()

, и этот новый конвейер работает в потоке данных, он выходит из строя.Сначала все начинается хорошо.Под этим я подразумеваю, что вижу следующие журналы INFO:

2018-11-20 (11:02:57) Worker configuration: n1-standard-1 in us-central1-b.

2018-11-20 (11:03:15) Starting 1 workers in us-central1-b.


# SEVERAL OF THE FOLLOWING FOR DIFFERENT NAMESPACES:

2018-11-20 (11:03:15) Executing operation ReadNamespace_xxxx_1/GroupByKey/Create

2018-11-20 (11:03:17) Executing operation ReadNamespace_xxxx_1/UserQuery/Read+ReadNamespace_xxxx_1/SplitQuery+ReadNa...

2018-11-20 (11:05:58) Executing operation ReadNamespace_xxxx_1/GroupByKey/Close

И затем я получаю эту трассировку:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
    op.start()
  File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    def start(self):
  File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.scoped_start_state:
  File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.scoped_process_state:
  File "dataflow_worker/shuffle_operations.py", line 66, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.shuffle_source.reader() as reader:
  File "dataflow_worker/shuffle_operations.py", line 70, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    self.output(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/shuffle_operations.py", line 229, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
    with self.scoped_process_state:
  File "dataflow_worker/shuffle_operations.py", line 236, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
    self.output(wvalue.with_value((k, wvalue.value)))
  File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 717, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 763, in apache_beam.runners.common._OutputProcessor.process_outputs
    for result in results:
TypeError: 'Entity' object is not iterable [while running 's152-c260']

Я предполагаю, что это как-то связано с разницей между двумя _do_work() функционирует в NamespacedDatastoreMigration и CampaignActionField, поскольку первое является успешным, а второе - неудачным, и _do_work() является единственным отличием между ними (кроме преобразуемого вида сущности).Но я не могу думать о том, что именно идет не так и как обойти это.У кого-нибудь есть мысли?

1 Ответ

0 голосов
/ 21 ноября 2018

Оказывается, изменение метода FlatMap на Map в NamespacedDatastoreMigration _create_pipelines исправило это для меня.Я также глупо вызывал NamespacedDatastoreMigration с моделью без пространства имен, поэтому она была успешной, в то время как CamapaignActionField (которая использует модель с пространством имен) не была.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...