Ошибка при использовании Dataproc и Spark BigQuery Connector в блокнотах DataLab - PullRequest
0 голосов
/ 12 января 2019

Возможность запуска Google Datalab (ноутбуков) в Google Chrome с правильными разрешениями брандмауэра TCP. Используя простой скрипт, он запускает самый последний искровой кластер (1 мастер с 3 рабочими, использующими Dataproc). Сначала мы тестируем приведенный ниже код в spark-submit, затем после запуска DataLab я не уверен, как исправить приведенную ниже ошибку.

Первый шаг: запустить кластер Dataproc из Cloud Shell

gcloud dataproc clusters create cluster1021 \
    --subnet default --zone us-west1-a \
    --master-machine-type n1-standard-2 \
    --master-boot-disk-size 30 --num-workers 2 \
    --worker-machine-type n1-standard-2 \
    --worker-boot-disk-size 30 --image-version 1.3-deb9 \
    --project bigdata-228217 \
    --initialization-actions 'gs://dataproc-initialization-actions/datalab/datalab.sh','gs://dataproc-initialization-actions/connectors/connectors.sh' \
    --metadata 'gcs-connector-version=1.9.11' \
    --metadata 'bigquery-connector-version=0.13.11'

После успешного запуска я проверил, что коннектор Bigquery работает с spark-submit wordcount.py примером из Google здесь.

Второй шаг: включите этот код в главный домашний каталог как wordcount.py с touch wordcount.py, затем вставьте приведенный ниже код из nano wordcount.py и сохраните.

#!/usr/bin/python
"""BigQuery I/O PySpark example."""
from __future__ import absolute_import
import json
import pprint
import subprocess
import pyspark
from pyspark.sql import SQLContext

sc = pyspark.SparkContext()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = {
    # Input Parameters.
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'publicdata',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',
}

# Output Parameters.
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

# Perform word count.
word_counts = (
    table_data
    .map(lambda record: json.loads(record[1]))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

# Display 10 results.
pprint.pprint(word_counts.take(10))

# Stage data formatted as newline-delimited JSON in Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
output_files = output_directory + '/part-*'

sql_context = SQLContext(sc)
(word_counts
 .toDF(['word', 'word_count'])
 .write.format('json').save(output_directory))

# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
    'bq load --source_format NEWLINE_DELIMITED_JSON '
    '--replace '
    '--autodetect '
    '{dataset}.{table} {files}'.format(
        dataset=output_dataset, table=output_table, files=output_files
    ).split())

# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
    output_path, True)

Теперь из оболочки выводим spark-submit вот результаты - показывающие, что коннектор BigQuery работает.

spark-submit wordcount.py
...
(pinnace,3)
(bone,21)
(lug,2)
(vailing,2)
(bombast,3)
(gaping,11)
(hem,5)
('non,1)
(stinks,1)
(forsooth,48)

Шаг 3 Настройте брандмауэр, чтобы разрешить представление TCP DataLab в браузере

enter image description here

Создание правила брандмауэра для DataLab

enter image description here

На странице настройки вы создадите имя для правила брандмауэра DataLab и разрешите использовать следующие порты TCP вместе с "/ 32" сразу после IP-адреса вашей сети - который вы можете найти здесь .

Шаг 4: Запустите DataLab в Google Chrome с <YOUR IP>:8080, и вы должны увидеть Блокнот DataLab

enter image description here

Вы увидите это. Теперь откройте новый блокнот, и в первой ячейке я остановил контекст искры и вставил приведенный выше код Шекспира во вторую ячейку.

enter image description here

Вот вывод. Вопрос, что мне нужно сделать, чтобы Bigquery Connector работал с Pyspark в Datalab ??

Py4JJavaErrorTraceback (most recent call last)
<ipython-input-4-62761a09a7c5> in <module>()
     36     'org.apache.hadoop.io.LongWritable',
     37     'com.google.gson.JsonObject',
---> 38     conf=conf)
     39 
     40 # Perform word count.

/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py in newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
    735         jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
    736                                                    valueClass, keyConverter, valueConverter,
--> 737                                                    jconf, batchSize)
    738         return RDD(jrdd, self)
    739 

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.lang.ClassNotFoundException: com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:239)
    at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDDFromClassNames(PythonRDD.scala:313)
    at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:296)
    at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

1 Ответ

0 голосов
/ 12 января 2019

Судя по этой строке , действие инициализации Datalab монтирует соединители BQ и GCS в контейнер Docker.

Поскольку Dataproc 1.3 не поставляется с соединителем BQ по умолчанию и поскольку вы указали действие инициализации Connectors, которое устанавливает соединитель BQ в кластере, после действия инициализации DataLab Docker не может монтировать соединитель BQ в контейнер Datalab во время выполнения действия инициализации Datalab.

Чтобы исправить эту проблему, вам нужно изменить порядок действий инициализации:

gcloud dataproc clusters create \
    . . .
    --initialization-actions=gs://dataproc-initialization-actions/datalab/connectors.sh,gs://dataproc-initialization-actions/connectors/datalab.sh

В качестве незначительного улучшения вам не нужно указывать версию соединителя GCS (--metadata 'gcs-connector-version=1.9.11') на данный момент, поскольку в последнем образе Dataproc 1.3 уже установлен соединитель GCS 1.9.11.

...