Ошибка структурированной потоковой передачи py4j.protocol.Py4JNetworkError: Ответ со стороны Java пуст - PullRequest
0 голосов
/ 04 мая 2018

Я пытаюсь сделать левое внешнее соединение между двумя Kafka Stream, используя PySpark и Structured Streaming (Spark 2.3).

import os
import time

from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, struct, explode, get_json_object
from ast import literal_eval
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'

spark = SparkSession \
    .builder \
    .appName("Spark Kafka Structured Streaming") \
    .getOrCreate()

schema_impressions = StructType() \
    .add("id_req", StringType()) \
    .add("ts_imp_request", TimestampType()) \
    .add("country", StringType()) \
    .add("TS_IMPRESSION", TimestampType()) 

schema_requests = StructType() \
    .add("id_req", StringType()) \
    .add("page", StringType()) \
    .add("conntype", StringType()) \
    .add("TS_REQUEST", TimestampType()) 

impressions = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \
  .option("subscribe", "ssp.datascience_impressions") \
  .load()

requests = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \
  .option("subscribe", "ssp.datascience_requests") \
  .option("startingOffsets", "latest") \
  .load()

query_requests = requests \
        .select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_requests).alias("parsed")) \
        .select(col("timestamp").alias("timestamp_req"), "parsed.id_req", "parsed.page", "parsed.conntype", "parsed.TS_REQUEST") \
        .withWatermark("timestamp_req", "120 seconds") 

query_impressions = impressions \
        .select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_impressions).alias("parsed")) \
        .select(col("timestamp").alias("timestamp_imp"), col("parsed.id_req").alias("id_imp"), "parsed.ts_imp_request", "parsed.country", "parsed.TS_IMPRESSION") \
        .withWatermark("timestamp_imp", "120 seconds") 

query_requests.printSchema()        
query_impressions.printSchema()

> root  
|-- timestamp_req: timestamp (nullable = true)  
|-- id_req: string (nullable = true)  
|-- page: string (nullable = true)  
|-- conntype: string (nullable = true)  
|-- TS_REQUEST: timestamp (nullable = true)
> 
> root  |-- timestamp_imp: timestamp (nullable = true)  
|-- id_imp: string (nullable = true)  
|-- ts_imp_request: timestamp (nullable = true)  
|-- country: string (nullable = true)  
|-- TS_IMPRESSION: timestamp (nullable = true)

В резюме я получу данные из двух потоков Kafka, а в следующих строках попытаюсь соединиться, используя идентификаторы.

rawQuery = query_requests.join(query_impressions,  expr(""" 
    (id_req = id_imp AND 
    timestamp_imp >= timestamp_req AND 
    timestamp_imp <= timestamp_req + interval 5 minutes) 
    """), 
  "leftOuter")

rawQuery = rawQuery \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/home/jovyan/streaming/applicationHistory") \
        .option("path", "/home/jovyan/streaming").start()
print(rawQuery.status)

{'message': 'Обработка новых данных', 'isDataAvailable': True, 'isTriggerActive': True} ОШИБКА: root: Исключительная ситуация при отправке команды. Traceback (последний вызов был последним): файл "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", строка 1062, в send_command поднять Py4JNetworkError («Ответ со стороны Java пуст») py4j.protocol.Py4JNetworkError: Ответ со стороны Java пуст

Во время обработки вышеуказанного исключения произошло другое исключение:

Traceback (последний вызов был последним): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", строка 908, в send_command response = connection.send_command (команда) Файл "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", строка 1067, в send_command «Ошибка при получении», e, proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError: Ошибка при получении ОШИБКА: py4j.java_gateway: произошла ошибка при попытке подключения к сервер Java (127.0.0.1:33968) Traceback (последний вызов был последним):
файл "/Opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py", строка 2910, в run_code exec (code_obj, self.user_global_ns, self.user_ns) Файл "", строка 3, в файл print (rawQuery.status) "/opt/conda/lib/python3.6/site-packages/pyspark/sql/streaming.py", строка 114, в статусе return json.loads (self._jsq.status (). json ()) Файл "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", строка 1160, в звоните ответ, self.gateway_client, self.target_id, self.name) Файл "/opt/conda/lib/python3.6/site-packages/pyspark/sql/utils.py", строка 63, в деко вернуть f (* a, ** kw) файл "/opt/conda/lib/python3.6/site-packages/py4j/protocol.py", строка 328, в get_return_value format (target_id, ".", name)) py4j.protocol.Py4JError: Произошла ошибка при вызове o92.status

Во время обработки вышеуказанного исключения произошло другое исключение:

Traceback (последний вызов был последним): File "/Opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py", линия 1828, в showtraceback stb = value._render_traceback_ () AttributeError: у объекта 'Py4JError' нет атрибута '_render_traceback _'

Во время обработки вышеуказанного исключения произошло другое исключение:

Traceback (последний вызов был последним): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", строка 852, в _get_connection connection = self.deque.pop () IndexError: всплывающее окно из пустой очереди

Я использую Spark локально, используя Jupyter Notebook. В spark / conf / spark-defaults.conf у меня есть:

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory             15g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

Если я пытаюсь использовать Spark после предыдущей ошибки, я получаю эту ошибку:

ОШИБКА: root: исключение при отправке команды. Traceback (самый последний последний звонок): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", строка 1062, в send_command поднять Py4JNetworkError («Ответ со стороны Java пуст») py4j.protocol.Py4JNetworkError: Ответ со стороны Java пуст

Во время обработки вышеуказанного исключения произошло другое исключение:

Traceback (последний вызов был последним): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", строка 908, в send_command response = connection.send_command (команда) Файл "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", строка 1067, в send_command «Ошибка при получении», e, proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError: Ошибка при получении

1 Ответ

0 голосов
/ 15 мая 2018

Я решил проблему! В основном, проблема была связана с Jupyter Notebook по какой-то причине. Я удалил следующую строку предыдущего кода:

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'

И я запустил код, используя консоль:

> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 spark_structured.py

Таким образом, я мог бы запустить весь код без проблем.

Если у вас возникла такая же проблема, вы также можете изменить spark-default.conf и увеличение spark.driver.memory и spark.executor.memory

...