Как я могу читать с S3 в pyspark, работающем в локальном режиме? - PullRequest
0 голосов
/ 05 мая 2018

Я использую PyCharm 2018.1, используя Python 3.4 с Spark 2.3, установленным через pip в virtualenv. На локальном хосте нет установки hadoop, поэтому нет установки Spark (таким образом, нет SPARK_HOME, HADOOP_HOME и т. Д.)

Когда я пытаюсь это сделать:

from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()\
    .setMaster("local")\
    .setAppName("pyspark-unittests")\
    .set("spark.sql.parquet.compression.codec", "snappy")
sc = SparkContext(conf = conf)
inputFile = sparkContext.textFile("s3://somebucket/file.csv")

Я получаю:

py4j.protocol.Py4JJavaError: An error occurred while calling o23.partitions.
: java.io.IOException: No FileSystem for scheme: s3

Как я могу читать с s3 при запуске pyspark в локальном режиме без полной установки Hadoop локально?

FWIW - это прекрасно работает, когда я выполняю его на узле EMR в нелокальном режиме.

Следующее не работает (та же ошибка, хотя она разрешает и загружает зависимости):

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:3.1.0" pyspark-shell'
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()\
    .setMaster("local")\
    .setAppName("pyspark-unittests")\
    .set("spark.sql.parquet.compression.codec", "snappy")
sc = SparkContext(conf = conf)
inputFile = sparkContext.textFile("s3://somebucket/file.csv")

Те же (плохие) результаты с:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars "/path/to/hadoop-aws-3.1.0.jar" pyspark-shell'
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()\
    .setMaster("local")\
    .setAppName("pyspark-unittests")\
    .set("spark.sql.parquet.compression.codec", "snappy")
sc = SparkContext(conf = conf)
inputFile = sparkContext.textFile("s3://somebucket/file.csv")

Ответы [ 3 ]

0 голосов
/ 09 мая 2018

Так что ответ Гленни был близок, но не то, что сработало бы в вашем случае. Ключевым моментом было выбрать правильную версию зависимостей. Если вы посмотрите на виртуальную среду

Jars

Все указывает на одну версию, которая 2.7.3, которую вы также должны использовать

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell'

Вы должны проверить версию, которую использует ваша установка, проверив путь venv/Lib/site-packages/pyspark/jars в виртуальной среде вашего проекта

И после этого вы можете использовать s3a по умолчанию или s3, определив класс обработчика для того же

# Only needed if you use s3://
sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', 'awsKey')
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', 'awsSecret')
s3File = sc.textFile("s3a://myrepo/test.csv")

print(s3File.count())
print(s3File.id())

И выход ниже

OutputSpark

0 голосов
/ 13 ноября 2018

подготовка:

Добавьте следующие строки в ваш конфигурационный файл spark, для моего локального pyspark это /usr/local/spark/conf/spark-default.conf

spark.hadoop.fs.s3a.access.key=<your access key>
spark.hadoop.fs.s3a.secret.key=<your secret key>

Содержимое файла python:

from __future__ import print_function
import os

from pyspark import SparkConf
from pyspark import SparkContext

os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"


if __name__ == "__main__":

    conf = SparkConf().setAppName("read_s3").setMaster("local[2]")
    sc = SparkContext(conf=conf)

    my_s3_file3 = sc.textFile("s3a://store-test-1/test-file")
    print("file count:", my_s3_file3.count())

совершают:

spark-submit --master local \
--packages org.apache.hadoop:hadoop-aws:2.7.3,\
com.amazonaws:aws-java-sdk:1.7.4,\
org.apache.hadoop:hadoop-common:2.7.3 \
<path to the py file above>
0 голосов
/ 08 мая 2018

Вы должны использовать протокол s3a при локальном доступе к S3. Сначала убедитесь, что вы добавили свой ключ и секрет к SparkContext. Как это:

sc = SparkContext(conf = conf)
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', 'awsKey')
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', 'awsSecret')

inputFile = sparkContext.textFile("s3a://somebucket/file.csv")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...