hdfs_options = {
"hdfs_host": "...",
"hdfs_port": 50070,
"hdfs_user": "..."
}
opts = PipelineOptions(**hdfs_options)
token = run_shell_cmd('curl -s --negotiate -u : "http://nn:50070/webhdfs/v1/?op=GETDELEGATIONTOKEN"'
p = beam.Pipeline(options=opts)
p.apply(
beam.io.ReadFromText(f"hdfs:///my_path/*.md?delegation={token}") # does not work
);
У меня есть token
и файл токена делегирования, но я не могу выполнить Аутентификацию с помощью.
Match operation failed with exceptions {'hdfs:///my_path/*.md?delegation=...': BeamIOError("List operation failed with exceptions {'hdfs:///my_path': HdfsError('Authentication failure. Check your credentials.')}")}
Stacktrace
---------------------------------------------------------------------------
BeamIOError Traceback (most recent call last)
<ipython-input-251-127e501adfaa> in <module>()
2
3 p.apply(
----> 4 beam.io.ReadFromText(f"hdfs:///my_path/*.md?delegation={token}")
5 );
/root/miniconda3/lib/python3.7/site-packages/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, validate, skip_header_lines, **kwargs)
540 file_pattern, min_bundle_size, compression_type,
541 strip_trailing_newlines, coder, validate=validate,
--> 542 skip_header_lines=skip_header_lines)
543
544 def expand(self, pvalue):
/root/miniconda3/lib/python3.7/site-packages/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, buffer_size, validate, skip_header_lines, header_processor_fns)
124 super(_TextSource, self).__init__(file_pattern, min_bundle_size,
125 compression_type=compression_type,
--> 126 validate=validate)
127
128 self._strip_trailing_newlines = strip_trailing_newlines
/root/miniconda3/lib/python3.7/site-packages/apache_beam/io/filebasedsource.py in __init__(self, file_pattern, min_bundle_size, compression_type, splittable, validate)
123 self._splittable = splittable
124 if validate and file_pattern.is_accessible():
--> 125 self._validate()
126
127 def display_data(self):
/root/miniconda3/lib/python3.7/site-packages/apache_beam/options/value_provider.py in _f(self, *args, **kwargs)
138 if not obj.is_accessible():
139 raise error.RuntimeValueProviderError('%s not accessible' % obj)
--> 140 return fnc(self, *args, **kwargs)
141 return _f
142 return _check_accessible
/root/miniconda3/lib/python3.7/site-packages/apache_beam/io/filebasedsource.py in _validate(self)
181
182 # Limit the responses as we only want to check if something exists
--> 183 match_result = FileSystems.match([pattern], limits=[1])[0]
184 if len(match_result.metadata_list) <= 0:
185 raise IOError(
/root/miniconda3/lib/python3.7/site-packages/apache_beam/io/filesystems.py in match(patterns, limits)
198 return []
199 filesystem = FileSystems.get_filesystem(patterns[0])
--> 200 return filesystem.match(patterns, limits)
201
202 @staticmethod
/root/miniconda3/lib/python3.7/site-packages/apache_beam/io/filesystem.py in match(self, patterns, limits)
718
719 if exceptions:
--> 720 raise BeamIOError("Match operation failed", exceptions)
721 return result
722