Поскольку для использования Spark SQL необходим объект SQLContext, сначала необходимо настроить SparkContext для подключения к BigQuery.С моей точки зрения, BigQuery Connector (адресованный sramalingam24 и Kenneth Jung) можно использовать для запроса данных в BigQuery.
Обратите внимание, что sramalingam24 предоставил ссылку с примером, следующим образомэто сводка кода:
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {
# Input Parameters.
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'publicdata',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
}
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
word_counts = (
table_data
.map(lambda record: json.loads(record[1]))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
sql_context = SQLContext(sc)
(word_counts
.toDF(['word', 'word_count'])
.write.format('json').save(output_directory))
Затем вы можете скачать банку с соединителем для других кластеров Hadoop .И Кеннет Юнг предоставил ссылку с информацией, которая предполагает, что опция --jar может использоваться для включения соединителя (--jars gs: //spark-lib/bigquery/spark-bigquery-latest.jar), который можно включитьбаночка на пути к драйверу и исполнителю.