Предыстория: Я написал простое приложение для паровой обработки со структурой искры для перемещения данных из Kafka в S3. Обнаружено, что для поддержки единовременной гарантии искра создает папку _spark_metadata, которая в конечном итоге становится слишком большой, когда потоковое приложение работает в течение длительного времени, папка метаданных становится настолько большой, что мы начинаем получать ошибки OOM. Я хочу избавиться от папок метаданных и контрольных точек Spark Structured Streaming и самостоятельно управлять смещениями.
Как мы управляли смещениями в Spark Streaming: Я использовал val offsetRanges = rdd.asInstanceOf [HasOffsetRanges ] .offsetRanges для получения смещений в Spark Structured Streaming. Но хочу знать, как получить смещения и другие метаданные для управления контрольными точками самостоятельно с помощью Spark Structured Streaming. У вас есть пример программы, реализующей контрольные точки?
Как мы управляли смещениями в Spark Structured Streaming ?? Глядя на этот JIRA https://issues-test.apache.org/jira/browse/SPARK-18258. похоже, офсеты не предусмотрены. Как нам go о?
Проблема в том, что за 6 часов размер метаданных увеличился до 45 МБ и увеличился до почти 13 ГБ. Выделенная память драйвера составляет 5 ГБ. В это время система вылетает с OOM. Хотите знать, как избежать увеличения размера этих метаданных? Как сделать так, чтобы метаданные не регистрировали так много информации.
Код:
1. Reading records from Kafka topic
Dataset<Row> inputDf = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.option("startingOffsets", "earliest") \
.load()
2. Use from_json API from Spark to extract your data for further transformation in a dataset.
Dataset<Row> dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event"))
....withColumn("oem_id", col("metadata.oem_id"));
3. Construct a temp table of above dataset using SQLContext
SQLContext sqlContext = new SQLContext(sparkSession);
dataDf.createOrReplaceTempView("event");
4. Flatten events since Parquet does not support hierarchical data.
5. Store output in parquet format on S3
StreamingQuery query = flatDf.writeStream().format("parquet")
Dataset dataDf = inputDf.select (from_ json (col ("значение"). cast ("строка"), EVENT_SCHEMA) .alias ("событие")) .select ("event.metadata", "event.data", " event.connection »,« event.registration_event »,« event.version_event »); SQLContext sqlContext = новый SQLContext (sparkSession); dataDf.createOrReplaceTempView («событие»); Набор данных flatDf = sqlContext. sql («выберите» + «дата, время, идентификатор,» + flattenSchema (EVENT_SCHEMA, «событие») + «из события»); StreamingQuery query = flatDf .writeStream () .outputMode («добавить») .option («сжатие», «мгновенно») .format («паркет») .option («checkpointLocation», checkpointLocation) .option («path», outputPath ) .partitionBy ("дата", "время", "идентификатор") .trigger (Trigger.ProcessingTime (triggerProcessingTime)) .start (); query.awaitTermination ();