Как мне связаться с BigQuery через Spark SQL? - PullRequest
1 голос
/ 10 апреля 2019

У меня есть простой код Python, который включает соединение с bigQuery с использованием файла JSON с моими учетными данными.

data = pd.read_gbq(SampleQuery, project_id='XXXXXXXX', private_key='filename.json')

Здесь имя файла filename.json имеет следующий формат:

{
  "type": "service_account",
  "project_id": "projectId",
  "private_key_id": "privateKeyId",
  "private_key": "privateKey",
  "client_email": "clientEmail",
  "client_id": "clientId",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/clientEmail"
}

Теперь мне нужно перенести этот код на pyspark.Но я сталкиваюсь с трудностями при поиске запросов с использованием Spark SQL.Я использую кластер AWS EMR для выполнения этого запроса!

Любая помощь будет признательна!

1 Ответ

0 голосов
/ 19 апреля 2019

Поскольку для использования Spark SQL необходим объект SQLContext, сначала необходимо настроить SparkContext для подключения к BigQuery.С моей точки зрения, BigQuery Connector (адресованный sramalingam24 и Kenneth Jung) можно использовать для запроса данных в BigQuery.

Обратите внимание, что sramalingam24 предоставил ссылку с примером, следующим образомэто сводка кода:

bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = {
    # Input Parameters.
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'publicdata',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',
}

table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

word_counts = (
    table_data
    .map(lambda record: json.loads(record[1]))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

sql_context = SQLContext(sc)
(word_counts
 .toDF(['word', 'word_count'])
 .write.format('json').save(output_directory))

Затем вы можете скачать банку с соединителем для других кластеров Hadoop .И Кеннет Юнг предоставил ссылку с информацией, которая предполагает, что опция --jar может использоваться для включения соединителя (--jars gs: //spark-lib/bigquery/spark-bigquery-latest.jar), который можно включитьбаночка на пути к драйверу и исполнителю.

...