Я запускаю следующий успешный Тестовый конвейер Apache Beam в Google Dataflow.Он использует Datastore в качестве источника и приемника.Многие объекты в нашей базе данных назначены пространствам имен.Этот конвейер предназначен для выполнения _do_work()
на всех объектах определенного вида в заданных пространствах имен.Обратите внимание, что аналогичный тестовый конвейер, который делает то же самое с сущностями без пространства имен, также успешно выполняется:
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1 import helper as apache_helper
from apache_beam.io.gcp.datastore.v1 import datastoreio
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper
from .pipelines.dataflow_settings import (
PROJECT, NAMESPACES_PER_PIPELINE
)
class NamespacedDatastoreMigration(_DatastoreMigrationBase):
"""
Map a do-function over a query multiplexed across several namespaces.
The inheritor must implement the following:
- a PROJECT class attribute
- a do-function (_do_work())
- a method to get the namespaces across which to shard the query (
get_namespaces())
"""
_NAMESPACES_PER_PIPELINE = NAMESPACES_PER_PIPELINE # 25
def __init__(self, argv, migration_history_obj, model):
super(NamespacedDatastoreMigration, self).__init__(argv, migration_history_obj, model)
self._namespaces = self.get_namespaces()
self._pipelines = self._create_pipelines(argv)
def get_namespaces(self):
query_pb = query_pb2.Query()
helper.set_kind(query_pb, "__namespace__")
client = apache_helper.get_datastore(PROJECT)
namespace_entities = apache_helper.fetch_entities(PROJECT, '', query_pb, client)
namespaces = []
for n in namespace_entities:
# Get namespace name or id
key_path = n.key.path[-1]
if key_path.HasField('id'):
name_or_id = key_path.id
else:
name_or_id = key_path.name
# Avoid duplicates and test namespaces
if len(str(name_or_id)) > 1 and name_or_id not in namespaces:
namespaces.append(name_or_id)
return namespaces
def run(self):
for pipeline in self._pipelines:
pipeline.run()
def _create_pipelines(self, argv):
pipelines = []
for namespaces in zip(*[iter(self._namespaces)] * self._NAMESPACES_PER_PIPELINE):
p = beam.Pipeline(argv=argv)
(
(
p | 'ReadNamespace_{}'.format(
ns
) >> datastoreio.ReadFromDatastore(
project=self.PROJECT,
query=self.query(),
namespace=ns
)
for ns in namespaces
)
| 'JoinNamespaceEntities' >> beam.Flatten()
| self.__class__.__name__ >> beam.FlatMap(self._do_work)
| self._get_sink()
)
pipelines.append(p)
return pipelines
model = "App"
NamespacedDatastoreMigration(
argv,
kwargs.get('migration_history_obj'), # Irrelevant here
model # Entity kind
).run()
, где argv:
argv = [
'--project={0}'.format(PROJECT),
'--job_name=' + name, # A human readable descriptor that's been cleaned
'--staging_location=gs://{0}/migrations/'.format(BUCKET),
'--temp_location=gs://{0}/migrations/'.format(BUCKET),
'--setup_file=./setup.py',
'--runner=DataflowRunner'
]
Это основано на подклассе:
class _DatastoreMigrationBase(object):
PROJECT = PROJECT
def __init__(self, argv, migration_history_obj, model):
self.migration_history_obj = migration_history_obj
if not model:
raise Exception('This operation requires a model class name.')
self.model = model
def query(self):
# Instantiate a filter protobuf
filter_pb = query_pb2.Filter()
# Get all non-deleted model instances
helper.set_property_filter(
filter_pb,
'deleted',
query_pb2.PropertyFilter.EQUAL,
False
)
# Instantiate a query protobuf
query_pb = query_pb2.Query(
filter=filter_pb
)
helper.set_kind(query_pb, self.model)
return query_pb
def _get_source(self):
return 'DatastoreRead' >> datastoreio.ReadFromDatastore(
self.PROJECT,
self.query(),
)
@staticmethod
def _do_work(entity):
return entity
def _get_sink(self):
return 'WriteToDatastore' >> datastoreio.WriteToDatastore(
self.PROJECT
)
Однако, когда я подкласс NamespacedDatastoreMigration
, например, так:
from ..helpers import create_argv
from ..mappers import NamespacedDatastoreMigration
class CampaignActionField(NamespacedDatastoreMigration):
@staticmethod
def _do_work(entity):
target_url = entity.properties.get('target_url').string_value
message = entity.properties.get('message').string_value
path = entity.properties.get('path').string_value
if target_url and not message and not path:
entity.properties.get('action').string_value = 'webhook'
return entity
model = "Campaign" # Entity kind
CampaignActionField(
create_argv(kwargs.get('name')), # "ED-2275 Campaign action field"
kwargs.get('migration_history_obj'), # Irrelevant here
model
).run()
, и этот новый конвейер работает в потоке данных, он выходит из строя.Сначала все начинается хорошо.Под этим я подразумеваю, что вижу следующие журналы INFO:
2018-11-20 (11:02:57) Worker configuration: n1-standard-1 in us-central1-b.
2018-11-20 (11:03:15) Starting 1 workers in us-central1-b.
# SEVERAL OF THE FOLLOWING FOR DIFFERENT NAMESPACES:
2018-11-20 (11:03:15) Executing operation ReadNamespace_xxxx_1/GroupByKey/Create
2018-11-20 (11:03:17) Executing operation ReadNamespace_xxxx_1/UserQuery/Read+ReadNamespace_xxxx_1/SplitQuery+ReadNa...
2018-11-20 (11:05:58) Executing operation ReadNamespace_xxxx_1/GroupByKey/Close
И затем я получаю эту трассировку:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
def start(self):
File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 66, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.shuffle_source.reader() as reader:
File "dataflow_worker/shuffle_operations.py", line 70, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/shuffle_operations.py", line 229, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 236, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
self.output(wvalue.with_value((k, wvalue.value)))
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 717, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 763, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
TypeError: 'Entity' object is not iterable [while running 's152-c260']
Я предполагаю, что это как-то связано с разницей между двумя _do_work()
функционирует в NamespacedDatastoreMigration
и CampaignActionField
, поскольку первое является успешным, а второе - неудачным, и _do_work()
является единственным отличием между ними (кроме преобразуемого вида сущности).Но я не могу думать о том, что именно идет не так и как обойти это.У кого-нибудь есть мысли?