У меня есть конвейер, который создает граф потока данных (сериализованное представление JSON), который превышает допустимый предел для API и, следовательно, не может быть запущен через обработчик потока данных для луча apache, как это обычно делается. И запуск обработчика потока данных с указанным параметром --experiments=upload_graph
не работает и завершается ошибкой, говоря, что не указано никаких шагов.
При получении уведомления об этой проблеме размера через ошибку, предоставляется следующая информация:
the size of the serialized JSON representation of the pipeline exceeds the allowable limit for the API.
Use experiment 'upload_graph' (--experiments=upload_graph)
to direct the runner to upload the JSON to your
GCS staging bucket instead of embedding in the API request.
Теперь использование этого параметра действительно приводит к тому, что обработчик потока данных загружает дополнительный файл dataflow_graph.pb
в промежуточную папку рядом с обычным файлом pipe.pb. То, что я проверял, действительно существует в gcp хранилище.
Однако задание в потоке данных gcp сразу же завершается ошибкой после запуска со следующей ошибкой:
Runnable workflow has no steps specified.
Я пробовал этот флаг с различными конвейерами, даже с примерами конвейеров лучей Apache, и вижу то же поведение.
Это можно воспроизвести, используя пример подсчета слов:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.11.0 \
-DgroupId=org.example \
-DartifactId=word-count-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
cd word-count-beam/
Запуск без параметра experiments=upload_graph
работает:
(не забудьте указать свой проект и корзины, если вы хотите его запустить)
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner --project=<your-gcp-project> \
--gcpTempLocation=gs://<your-gcs-bucket>/tmp \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
-Pdataflow-runner
Запуск с experiments=upload_graph
приводит к сбою канала с сообщением workflow has no steps specified
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner --project=<your-gcp-project> \
--gcpTempLocation=gs://<your-gcs-bucket>/tmp \
--experiments=upload_graph \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
-Pdataflow-runner
Теперь я ожидаю, что обработчик потока данных будет направлять поток данных gcp для чтения шагов из корзины, указанной, как видно из исходного кода:
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L881
Однако, похоже, это не так. Кто-нибудь заставил это сработать, или нашел какую-то документацию относительно этой функции, которая может указать мне правильное направление?