Не могу читать / писать s3 на pyspark aws emr от pandas / smart_open - PullRequest
0 голосов
/ 31 января 2019

Я хочу прочитать / записать некоторые данные pandas в s3, через pyspark intepreter.

Я попытался:

  1. read

    pd.read_csv('s3://xxx/userdevice/part-00000-3332c494-9b5b-4781-8482-1c96f3efda21-c000.csv')
    

    Получена ошибка:

    Fail to execute line 2: pd.read_csv('s3://xxx/userdevice/part-00000-3332c494-9b5b-4781-8482-1c96f3efda21-c000.csv')
    Traceback (most recent call last):
      File "/tmp/zeppelin_pyspark-3312558659230129318.py", line 380, in <module>
        exec(code, _zcUserQueryNameSpace)
      File "<stdin>", line 2, in <module>
      File "/usr/local/lib64/python2.7/site-packages/pandas/io/parsers.py", line 678, in parser_f
        return _read(filepath_or_buffer, kwds)
      File "/usr/local/lib64/python2.7/site-packages/pandas/io/parsers.py", line 424, in _read
        filepath_or_buffer, encoding, compression)
      File "/usr/local/lib64/python2.7/site-packages/pandas/io/common.py", line 209, in get_filepath_or_buffer
        mode=mode)
      File "/usr/local/lib64/python2.7/site-packages/pandas/io/s3.py", line 38, in get_filepath_or_buffer
        filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer), mode)
      File "/usr/local/lib/python2.7/site-packages/s3fs/core.py", line 335, in open
        s3_additional_kwargs=kw)
      File "/usr/local/lib/python2.7/site-packages/s3fs/core.py", line 1143, in __init__
        info = self.info()
      File "/usr/local/lib/python2.7/site-packages/s3fs/core.py", line 1161, in info
        refresh=refresh, **kwargs)
      File "/usr/local/lib/python2.7/site-packages/s3fs/core.py", line 478, in info
        raise FileNotFoundError(path)
    FileNotFoundError: xxx/userdevice/part-00000-3332c494-9b5b-4781-8482-1c96f3efda21-c000.csv
    
  2. Боюсь, что некоторые искровые узлы не конфигурируют s3, поэтому я создаю сеанс вручную

    def s3_open(path, *args, **kwargs):
        import boto3
        session = boto3.Session(
        aws_access_key_id='xxxx',
        aws_secret_access_key='xxxxxx',
        )
        f = smart_open.smart_open(path, *args, s3_session=session, **kwargs)
        return f
    
    with s3_open('s3://xxx/balance/%s.xlsx' % name, 'w') as f:
        r.to_excel(f)
    

    получена ошибка

    Fail to execute line 10: with s3_open('s3://xxx/balance/%s.xlsx' % name, 'w') as f:
    Traceback (most recent call last):
      File "/tmp/zeppelin_pyspark-3312558659230129318.py", line 380, in <module>
        exec(code, _zcUserQueryNameSpace)
      File "<stdin>", line 10, in <module>
      File "<stdin>", line 7, in s3_open
      File "/usr/local/lib/python2.7/site-packages/smart_open/smart_open_lib.py", line 231, in smart_open
        binary, filename = _open_binary_stream(uri, binary_mode, **kw)
      File "/usr/local/lib/python2.7/site-packages/smart_open/smart_open_lib.py", line 338, in _open_binary_stream
        return _s3_open_uri(parsed_uri, mode, **kw), filename
      File "/usr/local/lib/python2.7/site-packages/smart_open/smart_open_lib.py", line 400, in _s3_open_uri
        return smart_open_s3.open(parsed_uri.bucket_id, parsed_uri.key_id, mode, **kwargs)
      File "/usr/local/lib/python2.7/site-packages/smart_open/s3.py", line 74, in open
        fileobj = BufferedOutputBase(bucket_id, key_id, min_part_size=s3_min_part_size, **kwargs)
      File "/usr/local/lib/python2.7/site-packages/smart_open/s3.py", line 369, in __init__
        raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket)
    ValueError: the bucket u'xxx' does not exist, or is forbidden for access
    

Вышеупомянутый код панды работает на этой машине, просто не может работать в искре.


Но pyspark в порядке, как показано ниже

df1.coalesce(1).write.csv('s3://xxx/storeincomelogs', header=True, mode="overwrite")
df2.coalesce(1).write.csv('s3://xxx/storeproductincomelogs', header=True, mode="overwrite")
...