Я в полной растерянности относительно того, как изменить какой-то код, полученный из задания, для выполнения того, что я хочу.Код был передан нам для загрузки в облачную платформу Google для подсчета слов в наборе образцов Шекспира.Мне удалось это сделать, но вторая часть моего задания - изменить код для подсчета чего-либо еще из любого другого набора данных в разделе примера BigQuery.Моя идея состояла в том, чтобы использовать набор данных Natality для подсчета числа мужчин, рожденных за год, по сравнению с женщинами.Вот с чем мне нужна помощь:
Я не уверен, как смоделировать таблицу, которую мне нужно создать в Google Cloud.Я создал логическое поле «is_male», такое как поле в наборе данных Natality, и целочисленное поле «source_year», подобное полю в наборе данных Natality.Я не уверен, как изменить код pyspark, чтобы сделать то, что я хочу.Я приложил код здесь, это выполняет счет слова Шекспира.В этом примере моя таблица «A4» содержит поле STRING «word» и поле INTEGER «word_count».
import json
import pprint
import subprocess
import pyspark
sc = pyspark.SparkContext()
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
#input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {
# Input Parameters
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'bigquery-public-data',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
}
# Output Parameters
#output_dataset = 'wordcount_dataset'
#output_table = 'wordcount_table'
output_dataset = 'CS410'
output_table = 'A4'
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
# Display 10 results.
pprint.pprint(word_counts.take(10))
# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-{:05}'.format(i) for i in partitions]
(word_counts
.map(lambda (w, c): json.dumps({'word': w, 'word_count': c}))
.saveAsTextFile(output_directory))
# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
'bq load --source_format NEWLINE_DELIMITED_JSON '
'--schema word:STRING,word_count:INTEGER '
'{dataset}.{table} {files}'.format(
dataset=output_dataset, table=output_table, files=','.join(output_files)
).split())
# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
output_path, True)
Я особенно не знаю, что делает этот кусок кода, и я предполагаю, что это часть, которую мне нужно изменить, чтобы иметь возможность реализовать мою идею (кроме указания ссылок на набор данных natality,и новая таблица).
# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
Любая помощь чрезвычайно ценится!