Я предполагаю, что процесс или запись DataFrame в Bigquery, безусловно, требует определения временного сегмента GCS, как описано здесь .
Поэтому я не вижу в ваших примерах кода temporaryGcsBucket
параметр, определенный внутри df.write
функции для конкретного коннектора Spark Bigquery , который может привести к ошибке файловой системы gs.
Вы можете попробовать использовать глобальный параметр temporaryGcsBucket
в конфигурации Spark ( если вы еще этого не сделали):
spark.conf.set('temporaryGcsBucket', bucket)
Или полностью измените его в df.write
:
df.write
.....
.option("temporaryGcsBucket","some-bucket")
.save()
Вы также можете проверить ссылку список принятых параметров ввода-вывода Bigquery API.
Обновление:
При условии, что параметр конфигурации .config("temporaryGcsBucket", "test_project_bucket_pyspark")
был объявлен в настройках SparkSession
в соответствии с примером кода, я могу подтвердить, что он также будет распространяться в SparkConf свойствах времени выполнения, что является здесь общим подходом.
Однако для дальнейшей отладки проблемы, которую вы ' столкнулся бы я рекомендуем проверить конфигурацию разъема Had oop :
Убедитесь, что все связанные с Had oop конфиги правильно загружены во время выполнения сеанса Spark, проверяя ошибку вывод для любых записей java.lang.ClassNotFoundException
;
Если у вас также есть какие-либо ошибки, связанные с авторизацией, убедитесь, что у вас есть доступ к temporaryGcsBucket
с использованием gsutil
(gsutil ls -b gs://<some-bucket>
), а также проверьте правильность учетных данных в вашей конфигурации.
Попробуйте добавить параметр fs.gs.project.id
в свойства среды выполнения Spark:
.config("fs.gs.project.id", "<MY_PROJECT>")