Я пытаюсь запустить скрипт в PySpark, используя Dataproc.
Этот скрипт является своего рода объединением этого примера и того, что мне нужно сделать, как я хотелпроверьте, все ли работает.Очевидно, что это не так.
Я получаю ошибку:
Файл "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py ", строка 328, в get_return_value py4j.protocol.Py4JJavaError: Произошла ошибка при вызове z: org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.: java.lang.ClassNotFoundException: com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat
Я удостоверился, что у меня есть все банки, добавил некоторые новые банки, как предлагалось в других подобных постах.Я также проверил переменную SPARK_HOME
.
Ниже вы можете увидеть код;ошибка появляется при попытке создать экземпляр table_data.
"""BigQuery I/O PySpark example."""
from __future__ import absolute_import
import json
import pprint
import subprocess
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'publicdata',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
}
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)