Структурированная потоковая передача с использованием PySpark и Kafka, Py4JJavaError: при вызове o70.awaitTermination произошла ошибка - PullRequest
0 голосов
/ 26 апреля 2018

Я пытался использовать Kafka, используя Spark, а точнее PySpark и Structured Streaming.

import os
import time
import time

from ast import literal_eval
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, struct, explode
from pyspark.sql import SparkSession
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'

spark = SparkSession \
    .builder \
    .appName("Structured Streaming") \
    .getOrCreate()

    requests = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ip-ec2:9092") \
  .option("subscribe", "ssp.requests") \
  .option("startingOffsets", "earliest") \
  .load()

requests.printSchema()

# root  |-- key: binary (nullable = true)  |-- value: binary (nullable =
# true)  |-- topic: string (nullable = true)  |-- partition: integer
# (nullable = true)  |-- offset: long (nullable = true)  |-- timestamp:
# timestamp (nullable = true)  |-- timestampType: integer (nullable =
# true)

Когда я запускаю следующие строки кода

rawQuery = requests \
        .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
        .writeStream.trigger(processingTime="5 seconds") \
        .format("parquet") \
        .option("checkpointLocation", "/home/user/folder/applicationHistory") \
        .option("path", "/home/user/folder") \
        .start()
rawQuery.awaitTermination()    

Py4JJavaError Traceback (самый последний вызов последний) /opt/conda/lib/python3.6/site-packages/pyspark/sql/utils.py в деко (* а, ** квт) 62 попробуйте: ---> 63 возврата f (* a, ** кВт) 64 за исключением py4j.protocol.Py4JJavaError как e:

/ opt / conda / lib / python3.6 / site-packages / py4j / protocol.py в get_return_value (ответ, gateway_client, target_id, name) 319 "Произошла ошибка при вызове {0} {1} {2}. \ N". -> формат 320 (target_id, ".", Name), значение) 321 остальное:

Py4JJavaError: Произошла ошибка при вызове o70.awaitTermination. : org.apache.spark.sql.streaming.StreamingQueryException: задание прервано. === Потоковый запрос === Идентификатор: [id = c2b48840-5ba4-416e-a192-dcae94007856, runId = 4afcca20-00cd-4187-a70b-1b742f1f5c0d] Текущие подтвержденные смещения: {} Текущие доступные смещения: {KafkaSource [Subscribe [ssp.requests]]:

Я не могу понять причину этой ошибки

Py4JJavaError: Произошла ошибка при вызове o70.awaitTermination

1 Ответ

0 голосов
/ 04 мая 2018

Я только что заменил строку rawQuery.awaitTermination () на

print(rawQuery.status)
time.sleep(60)
print(rawQuery.status)
rawQuery.stop()

и это работает.

...