Я пытаюсь написать конвейер потока данных, который читает из pubsub и записывает в GCS.
Но когда я выполняю код, произошла ошибка.
Это
Рабочий процесс не выполнен. Причины. Из-за формы вашего конвейера оптимизатор заданий Cloud Dataflow создал график заданий, который нельзя обновить с помощью параметра конвейера --update. Это известная проблема, над которой мы работаем. См. https://issuetracker.google.com/issues/118375066 для получения информации о том, как изменить форму вашего конвейера, чтобы избежать этой ошибки. Вы можете переопределить эту ошибку и принудительно отправить задание, указав параметр --experiment = allow_non_updatable_job. Преобразование с состоянием с именем 'write to GCS / Write / WriteImpl / DoOnce / Decode Values.out / FromValue / ReadStream' состоит из двух или более вычислений.
Я смотрел Issuetracker, но не могу понять, что мне нужно делать.
Я выполняю код в облачной оболочке Google.
И я использую python2.7 и apache_beam-2.9.
#python2.7.15
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam import window
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io import WriteToText
#import apache_beam.io.gcp.gcsio
pipeline_options = {
'project': "my-project",
'staging_location': "staging_location",
'runner': 'DataflowRunner',
'job_name': "job_name",
'temp_location': "temp_location",
'streaming': True}
options = PipelineOptions.from_dictionary(pipeline_options)
def run():
p = beam.Pipeline(options=options)
(p | "Read from PubSub" >> ReadFromPubSub(topic='topic')
| "Windowing" >> beam.WindowInto(window.FixedWindows(10))
| "write to GCS" >> WriteToText("output-dir")
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Пожалуйста, дайте мне помощь ...