pyspark загружает dataframe в bigquery из локального - PullRequest
0 голосов
/ 27 апреля 2020

Я хочу загрузить фрейм данных pyspark в таблицу Google BigQuery.

Я запускаю задание, запустив

spark-submit --jars batch/jars/gcs-connector-hadoop2-latest.jar,batch/jars/spark-bigquery-latest.jar main_batch.py

Мне кажется, я установил все необходимые конфиги:

sc = SparkSession\
        .builder\
        .config("temporaryGcsBucket", "test_project_bucket_pyspark") \
        .config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")\
        .config("fs.gs.auth.service.account.enable", "true") \
        .config("google.cloud.auth.service.account.json.keyfile", "config/google/service_account_project.json") \
        .master("local[1]")\
        .appName("spark_etl")\
        .getOrCreate()

И я запускаю это для сохранения в BigQuery:

df.write \
        .format("bigquery") \
        .option('table', 'dataset_project_test.tickets') \
        .option("encoding", "UTF-8") \
        .option("nullValue", "\u0000") \
        .option("emptyValue", "\u0000") \
        .mode("append") \
        .save()

Но я получаю следующую ошибку:

py4j.protocol.Py4JJavaError: An error occurred while calling o1176.save.
: java.io.IOException: No FileSystem for scheme: gs

Что я делаю не так? Спасибо

1 Ответ

0 голосов
/ 04 мая 2020

Я предполагаю, что процесс или запись DataFrame в Bigquery, безусловно, требует определения временного сегмента GCS, как описано здесь .

Поэтому я не вижу в ваших примерах кода temporaryGcsBucket параметр, определенный внутри df.write функции для конкретного коннектора Spark Bigquery , который может привести к ошибке файловой системы gs.

Вы можете попробовать использовать глобальный параметр temporaryGcsBucket в конфигурации Spark ( если вы еще этого не сделали):

spark.conf.set('temporaryGcsBucket', bucket)

Или полностью измените его в df.write:

df.write
  .....
  .option("temporaryGcsBucket","some-bucket")
  .save()

Вы также можете проверить ссылку список принятых параметров ввода-вывода Bigquery API.

Обновление:

При условии, что параметр конфигурации .config("temporaryGcsBucket", "test_project_bucket_pyspark") был объявлен в настройках SparkSession в соответствии с примером кода, я могу подтвердить, что он также будет распространяться в SparkConf свойствах времени выполнения, что является здесь общим подходом.

Однако для дальнейшей отладки проблемы, которую вы ' столкнулся бы я рекомендуем проверить конфигурацию разъема Had oop :

  1. Убедитесь, что все связанные с Had oop конфиги правильно загружены во время выполнения сеанса Spark, проверяя ошибку вывод для любых записей java.lang.ClassNotFoundException;

  2. Если у вас также есть какие-либо ошибки, связанные с авторизацией, убедитесь, что у вас есть доступ к temporaryGcsBucket с использованием gsutil (gsutil ls -b gs://<some-bucket>), а также проверьте правильность учетных данных в вашей конфигурации.

  3. Попробуйте добавить параметр fs.gs.project.id в свойства среды выполнения Spark:

    .config("fs.gs.project.id", "<MY_PROJECT>")

...