Как мы управляем смещениями в Spark Structured Streaming? (Проблемы с _spark_metadata) - PullRequest
0 голосов
/ 17 июня 2020

Предыстория: Я написал простое приложение для паровой обработки со структурой искры для перемещения данных из Kafka в S3. Обнаружено, что для поддержки единовременной гарантии искра создает папку _spark_metadata, которая в конечном итоге становится слишком большой, когда потоковое приложение работает в течение длительного времени, папка метаданных становится настолько большой, что мы начинаем получать ошибки OOM. Я хочу избавиться от папок метаданных и контрольных точек Spark Structured Streaming и самостоятельно управлять смещениями.

Как мы управляли смещениями в Spark Streaming: Я использовал val offsetRanges = rdd.asInstanceOf [HasOffsetRanges ] .offsetRanges для получения смещений в Spark Structured Streaming. Но хочу знать, как получить смещения и другие метаданные для управления контрольными точками самостоятельно с помощью Spark Structured Streaming. У вас есть пример программы, реализующей контрольные точки?

Как мы управляли смещениями в Spark Structured Streaming ?? Глядя на этот JIRA https://issues-test.apache.org/jira/browse/SPARK-18258. похоже, офсеты не предусмотрены. Как нам go о?

Проблема в том, что за 6 часов размер метаданных увеличился до 45 МБ и увеличился до почти 13 ГБ. Выделенная память драйвера составляет 5 ГБ. В это время система вылетает с OOM. Хотите знать, как избежать увеличения размера этих метаданных? Как сделать так, чтобы метаданные не регистрировали так много информации.

Код:

1. Reading records from Kafka topic
  Dataset<Row> inputDf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("startingOffsets", "earliest") \
  .load()
2. Use from_json API from Spark to extract your data for further transformation in a dataset.
   Dataset<Row> dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event"))
       ....withColumn("oem_id", col("metadata.oem_id"));
3. Construct a temp table of above dataset using SQLContext
   SQLContext sqlContext = new SQLContext(sparkSession);
   dataDf.createOrReplaceTempView("event");
4. Flatten events since Parquet does not support hierarchical data.
5. Store output in parquet format on S3
   StreamingQuery query = flatDf.writeStream().format("parquet")

enter image description here

Dataset dataDf = inputDf.select (from_ json (col ("значение"). cast ("строка"), EVENT_SCHEMA) .alias ("событие")) .select ("event.metadata", "event.data", " event.connection »,« event.registration_event »,« event.version_event »); SQLContext sqlContext = новый SQLContext (sparkSession); dataDf.createOrReplaceTempView («событие»); Набор данных flatDf = sqlContext. sql («выберите» + «дата, время, идентификатор,» + flattenSchema (EVENT_SCHEMA, «событие») + «из события»); StreamingQuery query = flatDf .writeStream () .outputMode («добавить») .option («сжатие», «мгновенно») .format («паркет») .option («checkpointLocation», checkpointLocation) .option («path», outputPath ) .partitionBy ("дата", "время", "идентификатор") .trigger (Trigger.ProcessingTime (triggerProcessingTime)) .start (); query.awaitTermination ();

1 Ответ

1 голос
/ 17 июня 2020

Для не-пакетной Spark Structured Streaming Интеграция KAFKA:

Цитата:

Structured Streaming игнорирует смещения, фиксируемые в Apache Kafka.

Вместо этого он полагается на собственное управление смещениями на стороне драйвера, который отвечает за распределение смещений исполнителям и за их установление контрольных точек в конце цикла обработки (эпоха или микропакет).

Вам не нужно беспокоиться, если вы будете следовать руководствам по интеграции Spark KAFKA.

Отличная ссылка: https://www.waitingforcode.com/apache-spark-structured-streaming/apache-spark-structured-streaming-apache-kafka-offsets-management/read

Для партии ситуация иная, вы необходимо управлять этим самостоятельно и сохранять смещения.

ОБНОВЛЕНИЕ На основании комментариев я предлагаю, что вопрос немного отличается, и советую вам взглянуть на Spark Structured Streaming Checkpoint Cleanup . В дополнение к вашим обновленным комментариям и тому факту, что ошибки нет, я предлагаю вам ознакомиться с метаданными для Spark Structured Streaming https://www.waitingforcode.com/apache-spark-structured-streaming/checkpoint-storage-structured-streaming/read. Смотрю на код, отличный от моего стиля, но не вижу явных ошибок.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...