Pyspark: как прочитать CSV-файл в Google Bucket? - PullRequest
2 голосов
/ 23 марта 2020

У меня есть несколько файлов, хранящихся в ведре Google. Это мои настройки, как предложено здесь .

spark = SparkSession.builder.\
        master("local[*]").\
        appName("TestApp").\
        config("spark.serializer", KryoSerializer.getName).\
        config("spark.jars", "/usr/local/.sdkman/candidates/spark/2.4.4/jars/gcs-connector-hadoop2-2.1.1.jar").\
        config("spark.kryo.registrator", GeoSparkKryoRegistrator.getName).\
        getOrCreate()
#Recommended settings for using GeoSpark
spark.conf.set("spark.driver.memory", 6)
spark.conf.set("spark.network.timeout", 1000)
#spark.conf.set("spark.driver.maxResultSize", 5)
spark.conf.set

spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
# This is required if you are using service account and set true, 
spark._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'false')
spark._jsc.hadoopConfiguration().set('google.cloud.auth.service.account.json.keyfile', "myJson.json")



path = 'mBucket-c892b51f8579.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path
client = storage.Client()
name = 'https://console.cloud.google.com/storage/browser/myBucket/'
bucket_id = 'myBucket'
bucket = client.get_bucket(bucket_id)

Я могу прочитать их просто, используя следующее:

df = pd.read_csv('gs://myBucket/myFile.csv.gz', compression='gzip')
df.head()

    time_zone_name           province_short
0   America/Chicago              US.TX
1   America/Chicago              US.TX
2   America/Los_Angeles          US.CA
3   America/Chicago              US.TX
4   America/Los_Angeles          US.CA

Я пытаюсь прочитать тот же файл с pyspark

myTable = spark.read.format("csv").schema(schema).load('gs://myBucket/myFile.csv.gz', compression='gzip')

но я получаю следующую ошибку

Py4JJavaError: An error occurred while calling o257.load.
: java.lang.NoClassDefFoundError: Could not initialize class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:45)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:332)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

1 Ответ

0 голосов
/ 29 марта 2020

Добро пожаловать в ад oop зависимостей!

1. Используйте пакеты вместо jars

Ваша конфигурация в основном правильная, но когда вы добавляете gcs-соединитель в качестве локального jar, вам также необходимо вручную убедиться, что все его зависимости доступны в пути к классам JVM.

Обычно проще добавить соединитель в виде пакета и позволить spark справиться с зависимостями, поэтому вместо config("spark.jars", "/usr/local/.sdkman/candidates/spark/2.4.4/jars/gcs-connector-hadoop2-2.1.1.jar") используйте config('spark.jars.packages', 'com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.1.1')

2. Управление проблемами разрешения зависимостей ivy2

Когда вы делаете, как указано выше, spark, скорее всего, будет жаловаться, что не может загрузить некоторые зависимости из-за различий в разрешении между maven (используется для публикации) и ivy2 (используется spark для разрешение зависимостей).

Обычно это можно исправить, просто попросив spark игнорировать неразрешенные зависимости, используя spark.jars.excludes, поэтому добавьте новую строку конфигурации, например config('spark.jars.excludes','androidx.annotation:annotation,org.slf4j:slf4j-api')

3. Управление конфликтами путей к классам

Когда это будет сделано, SparkSession запустится, но файловая система все равно не будет работать, потому что стандартное распределение pyspark упаковывает старую версию библиотеки guava, которая не реализует API gcs- Разъем зависит от.

Необходимо убедиться, что gcs-connector сначала найдет ожидаемую версию, используя следующие конфигурации config('spark.driver.userClassPathFirst','true') и config('spark.executor.userClassPathFirst','true')

4. Управление конфликтами зависимостей

Теперь вы можете думать, что все в порядке, но на самом деле нет, потому что дистрибутив pyspark по умолчанию содержит версию 2.7.3 с библиотеками oop, но версия 2.1.1 gcs-connector использует версию 2.8 + только API.

Теперь вы можете:

  • использовать пользовательскую сборку spark с более новым oop (или пакет без встроенного oop библиотеки)
  • использовать старую версию gcs-разъема (версия 1.9.17 работает нормально)

5. Наконец-то работающий конфиг

Предполагая, что вы хотите использовать последний дистрибутив pyspark PyPi или Anaconda, следующий конфиг должен работать как положено.

Я включил только gcs соответствующие конфиги, перенесли конфигурацию Had oop непосредственно в конфигурацию spark и предположили, что вы правильно настраиваете GOOGLE_APPLICATION_CREDENTIALS:

from pyspark.sql import SparkSession

spark = SparkSession.builder.\
        master("local[*]").\
        appName("TestApp").\
        config('spark.jars.packages', 
               'com.google.cloud.bigdataoss:gcs-connector:hadoop2-1.9.17').\
        config('spark.jars.excludes',
               'javax.jms:jms,com.sun.jdmk:jmxtools,com.sun.jmx:jmxri').\
        config('spark.driver.userClassPathFirst','true').\
        config('spark.executor.userClassPathFirst','true').\
        config('spark.hadoop.fs.gs.impl',
               'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem').\
        config('spark.hadoop.fs.gs.auth.service.account.enable', 'false').\
        getOrCreate()

Обратите внимание, что версия 1.9.17 gcs-соединителя имеет другой набор исключений, чем 2.1 .1 потому что почему бы и нет ...

PS: Вы также должны убедиться, что вы используете Java 1.8 JVM, потому что Spark 2.4 не работает на более новых JVM.

...