Apache Beam: как читать из HDFS с токеном делегирования - PullRequest
0 голосов
/ 04 марта 2020
hdfs_options = {
    "hdfs_host": "...",
    "hdfs_port": 50070,
    "hdfs_user": "..."
}
opts = PipelineOptions(**hdfs_options)

token = run_shell_cmd('curl -s --negotiate -u : "http://nn:50070/webhdfs/v1/?op=GETDELEGATIONTOKEN"'
p = beam.Pipeline(options=opts)

p.apply(
    beam.io.ReadFromText(f"hdfs:///my_path/*.md?delegation={token}") # does not work
);

У меня есть token и файл токена делегирования, но я не могу выполнить Аутентификацию с помощью.

Match operation failed with exceptions {'hdfs:///my_path/*.md?delegation=...': BeamIOError("List operation failed with exceptions {'hdfs:///my_path': HdfsError('Authentication failure. Check your credentials.')}")}

Stacktrace

---------------------------------------------------------------------------
BeamIOError                               Traceback (most recent call last)
<ipython-input-251-127e501adfaa> in <module>()
      2 
      3 p.apply(
----> 4     beam.io.ReadFromText(f"hdfs:///my_path/*.md?delegation={token}")
      5 );

/root/miniconda3/lib/python3.7/site-packages/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, validate, skip_header_lines, **kwargs)
    540         file_pattern, min_bundle_size, compression_type,
    541         strip_trailing_newlines, coder, validate=validate,
--> 542         skip_header_lines=skip_header_lines)
    543 
    544   def expand(self, pvalue):

/root/miniconda3/lib/python3.7/site-packages/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, buffer_size, validate, skip_header_lines, header_processor_fns)
    124     super(_TextSource, self).__init__(file_pattern, min_bundle_size,
    125                                       compression_type=compression_type,
--> 126                                       validate=validate)
    127 
    128     self._strip_trailing_newlines = strip_trailing_newlines

/root/miniconda3/lib/python3.7/site-packages/apache_beam/io/filebasedsource.py in __init__(self, file_pattern, min_bundle_size, compression_type, splittable, validate)
    123     self._splittable = splittable
    124     if validate and file_pattern.is_accessible():
--> 125       self._validate()
    126 
    127   def display_data(self):

/root/miniconda3/lib/python3.7/site-packages/apache_beam/options/value_provider.py in _f(self, *args, **kwargs)
    138         if not obj.is_accessible():
    139           raise error.RuntimeValueProviderError('%s not accessible' % obj)
--> 140       return fnc(self, *args, **kwargs)
    141     return _f
    142   return _check_accessible

/root/miniconda3/lib/python3.7/site-packages/apache_beam/io/filebasedsource.py in _validate(self)
    181 
    182     # Limit the responses as we only want to check if something exists
--> 183     match_result = FileSystems.match([pattern], limits=[1])[0]
    184     if len(match_result.metadata_list) <= 0:
    185       raise IOError(

/root/miniconda3/lib/python3.7/site-packages/apache_beam/io/filesystems.py in match(patterns, limits)
    198       return []
    199     filesystem = FileSystems.get_filesystem(patterns[0])
--> 200     return filesystem.match(patterns, limits)
    201 
    202   @staticmethod

/root/miniconda3/lib/python3.7/site-packages/apache_beam/io/filesystem.py in match(self, patterns, limits)
    718 
    719     if exceptions:
--> 720       raise BeamIOError("Match operation failed", exceptions)
    721     return result
    722 
...