Я использую Google Cloud Dataflow для Python SDK, чтобы прочитать 200k + сущностей из хранилища данных, используя функцию ReadFromDatastore()
для запроса без каких-либо фильтров.
def make_example_entity_query():
"""
make an unfiltered query on the `ExampleEntity` entity
"""
query = query_pb2.Query()
query.kind.add().name = "ExampleEntity"
return query
Затем я выполняю некоторую работу вконвейер с этим запросом
p = beam.Pipeline(options=PipelineOptions.from_dictionary(pipeline_options))
(
p
| 'read in the new donations from Datastore'
>> ReadFromDatastore(project, query, None)
|'protobuf2entity transformation'
>> beam.Map(entity_from_protobuf)
| 'do some work or something'
>> beam.Map(lambda item: item[0] + item[1])
)
return p.run()
работает локально, используя данные тестирования порядка нескольких тысяч записей, но когда я развертываю его в облаке и запускаю в нашей производственной базе данных с более чем 200 тысячами элементов, это просточерез час или около того без какого-либо прогресса.Кажется, что он полностью застрял в считываемой части.
также показывает, что было прочитано ноль элементов
и похоже, что только один рабочий когда-либо раскручивался
Так что я не совсемуверен, что здесь происходит.Мои вопросы:
- Существует ли какое-то разумное ограничение на объем данных, которые могут быть считаны из хранилища данных в качестве входных данных для конвейера?
- Почему, по-видимому, нет данных, делающих егов трубопровод вообще?Если я запускаю это локально, я вижу, как данные передаются через него, хотя и довольно медленно.
- Почему вращается только один работник?Я знаю, что если у вас есть фильтры для операции чтения, это приводит к тому, что чтение выполняется из одного узла, но это делается без фильтров неравенства для чтения из хранилища данных.