Как указать несколько входных файлов для Apache Beam ReadFromTextIO - PullRequest
0 голосов
/ 24 мая 2019

Я использую apache-beam python 2.12.0 SDK. У меня проблемы при использовании какого-то специального символа * для beam.io.ReadFromText

gs://mybucket/learning/pack_operation/20190524_1_0_/extracted-*.json

Это вход для моего луча Работа

with beam.Pipeline(args.runner, pipeline_options) as pipeline:
        outputs = (
                pipeline
                | 'ReadFromFile' >> beam.io.ReadFromText(options['input_filebase'])
                | 'DecodeLine' >> beam.Map(Utils.decode_input(ids))
                | 'Batch' >> beam.ParDo(BatchDoFn(options['batch_size']))
                | 'Predict' >> beam.ParDo(PredictDoFn(model_file, fields))
                | 'Unbatch' >> beam.ParDo(UnBatchDoFn())
                | 'FormatOutput' >> beam.Map(Utils.format_output)
)

Дело в том, что у меня такое же исключение даже при жестком кодировании пути input_file

gs://mybucket/learning/pack_operation/20190524_1_0_/extracted-000000000000.json.json

Вот исключение

Output:
[2019-05-24 15:02:59,997] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,996] {bash_operator.py:101} INFO - /usr/local/lib/python2.7/site-packages/oauth2client/contrib/gce.py:99: UserWarning: You have requested explicit scopes to be used with a GCE service account.
[2019-05-24 15:02:59,999] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,997] {bash_operator.py:101} INFO - Using this argument will have no effect on the actual scopes for tokens
[2019-05-24 15:02:59,999] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,997] {bash_operator.py:101} INFO - requested. These scopes are set at VM instance creation time and
[2019-05-24 15:03:00,000] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,997] {bash_operator.py:101} INFO - can't be overridden in the request.
[2019-05-24 15:03:00,000] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,997] {bash_operator.py:101} INFO - 
[2019-05-24 15:03:00,000] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,997] {bash_operator.py:101} INFO - warnings.warn(_SCOPES_WARNING)
[2019-05-24 15:03:00,001] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,997] {bash_operator.py:101} INFO - Traceback (most recent call last):
[2019-05-24 15:03:00,001] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,998] {bash_operator.py:101} INFO - File "/home/airflow/gcs/dags/data_learning_tools/inference/model_predict/sklearn_api/predictor.py", line 87, in <module>
[2019-05-24 15:03:00,001] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,998] {bash_operator.py:101} INFO - main()
[2019-05-24 15:03:00,002] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,998] {bash_operator.py:101} INFO - File "/home/airflow/gcs/dags/data_learning_tools/inference/model_predict/sklearn_api/predictor.py", line 73, in main
[2019-05-24 15:03:00,002] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,998] {bash_operator.py:101} INFO - | 'FormatOutput' >> beam.Map(Utils.format_output)
[2019-05-24 15:03:00,002] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,998] {bash_operator.py:101} INFO - File "/usr/local/lib/python2.7/site-packages/apache_beam/io/textio.py", line 536, in __init__
[2019-05-24 15:03:00,003] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,998] {bash_operator.py:101} INFO - skip_header_lines=skip_header_lines)
[2019-05-24 15:03:00,003] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,999] {bash_operator.py:101} INFO - File "/usr/local/lib/python2.7/site-packages/apache_beam/io/textio.py", line 120, in __init__
[2019-05-24 15:03:00,003] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,999] {bash_operator.py:101} INFO - validate=validate)
[2019-05-24 15:03:00,004] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,999] {bash_operator.py:101} INFO - File "/usr/local/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py", line 121, in __init__
[2019-05-24 15:03:00,004] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,999] {bash_operator.py:101} INFO - self._validate()
[2019-05-24 15:03:00,004] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,999] {bash_operator.py:101} INFO - File "/usr/local/lib/python2.7/site-packages/apache_beam/options/value_provider.py", line 137, in _f
[2019-05-24 15:03:00,005] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,999] {bash_operator.py:101} INFO - return fnc(self, *args, **kwargs)
[2019-05-24 15:03:00,005] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:02:59,999] {bash_operator.py:101} INFO - File "/usr/local/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py", line 178, in _validate
[2019-05-24 15:03:00,005] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:03:00,000] {bash_operator.py:101} INFO - match_result = FileSystems.match([pattern], limits=[1])[0]
[2019-05-24 15:03:00,006] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:03:00,000] {bash_operator.py:101} INFO - File "/usr/local/lib/python2.7/site-packages/apache_beam/io/filesystems.py", line 187, in match
[2019-05-24 15:03:00,006] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:03:00,000] {bash_operator.py:101} INFO - return filesystem.match(patterns, limits)
[2019-05-24 15:03:00,006] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:03:00,000] {bash_operator.py:101} INFO - File "/usr/local/lib/python2.7/site-packages/apache_beam/io/filesystem.py", line 723, in match
[2019-05-24 15:03:00,006] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:03:00,000] {bash_operator.py:101} INFO - raise BeamIOError("Match operation failed", exceptions)
[2019-05-24 15:03:00,007] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:03:00,000] {bash_operator.py:101} INFO - apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions {'gs://mybucket/learning/pack_operation/20190524_1_0_0/extracted-000000000000.json': TypeError("__init__() got an unexpected keyword argument 'response_encoding'",)}
[2019-05-24 15:03:00,365] {base_task_runner.py:98} INFO - Subtask: [2019-05-24 15:03:00,363] {bash_operator.py:105} INFO - Command exited with return code 1

У меня BeamIOError

apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions {'gs://mybucket/learning/pack_operation/20190524_1_0_0/extracted-000000000000.json': TypeError("__init__() got an unexpected keyword argument 'response_encoding'",)}
Command exited with return code 1

Любая помощь будет очень полезна. Весь код прекрасно работал до обновления до Apache-Beam 2.12.0 (с использованием Apache-Beam 2.5.0). Самая интересная часть -

TypeError("__init__() got an unexpected keyword argument 'response_encoding'",)

Я прочитал код для первой функции, вызывающей исключение, но я не смог получить никакой информации о проблеме

...