pyspark - поток kafka - не хватает памяти - PullRequest
0 голосов
/ 12 декабря 2018

Я пытаюсь проверить потоковую передачу kafka с версией брокера 0.10 с этим кодом.Это просто простой код для печати содержимого темы.Ничего страшного!Но по какой-то причине памяти недостаточно (10 ГБ ОЗУ в ВМ)!Код:

# coding: utf-8

"""
kafka-test-003.py: test with broker 0.10(new Spark Stream API)

How to run this script?

spark-submit --jars jars/spark-sql-kafka-0-10_2.11-2.3.0.jar,jars/kafka-clients-0.11.0.0.jar kafka-test-003.py



"""


import pyspark 
from pyspark import SparkContext
from pyspark.sql.session import SparkSession,Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


# starting spark session
spark = SparkSession.builder.appName("Kakfa-test").getOrCreate()
spark.sparkContext.setLogLevel('WARN')

# getting streaming context
sc = spark.sparkContext
ssc = StreamingContext(sc, 2) # batching duration: each 2 seconds

broker = "kafka.some.address:9092"
topic = "my.topic"

### Streaming

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", broker) \
  .option("startingOffsets", "earliest") \
  .option("subscribe", topic) \
  .load() \
  .select(col('key').cast("string"),col('value').cast("string"))

query = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .outputMode("append") \
  .format("console") \
  .start()

### End Streaming

query.awaitTermination()

Запущенная искра отправки:

spark-submit --master local[*] --driver-memory 5G --executor-memory 5G --jars jars/kafka-clients-0.11.0.0.jar,jars/spark-sql-kafka-0-10_2.11-2.3.0.jar kafka-test-003.py

К сожалению, результат:

java.lang.OutOfMemoryError: пространство кучи Java

Я предполагаю, что Кафка должен приносить небольшие порции данных за раз точно, чтобы избежать этой проблемы, верно?Итак, что я делаю не так?

1 Ответ

0 голосов
/ 12 декабря 2018

управление искровой памятью - сложный процесс.Оптимальное решение зависит не только от ваших данных и типа операций, но и от поведения системы вы можете повторить следующую команду spark:

spark-submit --master local [*] --driver-memory 4G--executor-memory 2G --executor-cores 5 --num-executors 8 --jars jars / kafka-clients-0.11.0.0.jar, jars / spark-sql-kafka-0-10_2.11-2.3.0.jar kafka-test-003.py

Можно ли настроить вышеуказанные параметры памяти по следующей ссылке, настроив производительность? Как использовать опцию --total-executor-cores с использованием spark-submit?

...