Создание файла PySpark для образцов BigQuery - PullRequest
0 голосов
/ 10 ноября 2018

Я в полной растерянности относительно того, как изменить какой-то код, полученный из задания, для выполнения того, что я хочу.Код был передан нам для загрузки в облачную платформу Google для подсчета слов в наборе образцов Шекспира.Мне удалось это сделать, но вторая часть моего задания - изменить код для подсчета чего-либо еще из любого другого набора данных в разделе примера BigQuery.Моя идея состояла в том, чтобы использовать набор данных Natality для подсчета числа мужчин, рожденных за год, по сравнению с женщинами.Вот с чем мне нужна помощь:

Я не уверен, как смоделировать таблицу, которую мне нужно создать в Google Cloud.Я создал логическое поле «is_male», такое как поле в наборе данных Natality, и целочисленное поле «source_year», подобное полю в наборе данных Natality.Я не уверен, как изменить код pyspark, чтобы сделать то, что я хочу.Я приложил код здесь, это выполняет счет слова Шекспира.В этом примере моя таблица «A4» содержит поле STRING «word» и поле INTEGER «word_count».

import json
import pprint
import subprocess
import pyspark

sc = pyspark.SparkContext()

# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
#input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = {
    # Input Parameters
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'bigquery-public-data',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',
}

# Output Parameters
#output_dataset = 'wordcount_dataset'
#output_table = 'wordcount_table'
output_dataset = 'CS410'
output_table = 'A4'

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

# Perform word count.
word_counts = (
    table_data
    .map(lambda (_, record): json.loads(record))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

# Display 10 results.
pprint.pprint(word_counts.take(10))

# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-{:05}'.format(i) for i in partitions]

(word_counts
 .map(lambda (w, c): json.dumps({'word': w, 'word_count': c}))
 .saveAsTextFile(output_directory))

# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
    'bq load --source_format NEWLINE_DELIMITED_JSON '
    '--schema word:STRING,word_count:INTEGER '
    '{dataset}.{table} {files}'.format(
        dataset=output_dataset, table=output_table, files=','.join(output_files)
    ).split())

# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
    output_path, True)

Я особенно не знаю, что делает этот кусок кода, и я предполагаю, что это часть, которую мне нужно изменить, чтобы иметь возможность реализовать мою идею (кроме указания ссылок на набор данных natality,и новая таблица).

# Perform word count.
word_counts = (
    table_data
    .map(lambda (_, record): json.loads(record))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

Любая помощь чрезвычайно ценится!

...