Можно ли сбрасывать / контролировать промежуточное состояние в структурированной потоковой передаче Spark в режиме полного вывода? (Спарк 2.4.0) - PullRequest
0 голосов
/ 06 марта 2020

У меня есть сценарий, в котором я хочу обработать данные из kafka topi c. У меня есть этот конкретный код java для чтения данных в виде потока из kafka topi c.

Dataset<Row> streamObjs = sparkSession.readStream().format("kafka")
                .option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", streamTopic)
                .option("failOnDataLoss", false).load();

Я приведу его к String, определю схему, затем попытаюсь использовать водяной знак (для поздних данных ) и окно (для группировки и агрегирования) и, наконец, вывод в приемник kafka.

Dataset<Row> selectExprImporter = streamObjs.selectExpr("CAST(value AS STRING)");

StructType streamSchema = new StructType().add("id", DataTypes.StringType)
                .add("timestamp", DataTypes.LongType)
                .add("values", new MapType(DataTypes.StringType, DataTypes.DoubleType, false));

Dataset<Row> selectValueImporter = selectExprImporter
                .select(functions.from_json(new Column("value"), streamSchema ).alias("data"));
.
.
(More transformations/operations)
.
.

Dataset<Row> aggCount_15min = streamData.withWatermark("timestamp", "2 minute")
                .withColumn("frequency", functions.lit(15))
                .groupBy(new Column("id"), new Column("frequency"),
                        functions.window(new Column("timestamp"), "15 minute").as("time_range"))
                .agg(functions.mean("value").as("mean_value"), functions.sum("value").as("sum"),
                        functions.count(functions.lit(1)).as("number_of_values"))
                .filter("mean_value > 35").orderBy("id", "frequency", "time_range");

aggCount_15min.selectExpr("to_json(struct(*)) AS value").writeStream()
                .outputMode(OutputMode.Complete()).format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
                .option("topic", outputTopic).option("checkpointLocation", checkpointLocation).start().awaitTermination();

Вопросы

  1. Правильно ли я понимаю, что при использовании Complete Режим вывода в приемнике Кафки, промежуточное состояние будет увеличиваться вечно до тех пор, пока я не получу исключение OutOfMemory?

  2. Кроме того, каков идеальный вариант использования для режима полного вывода? Использовать его только тогда, когда промежуточные данные / состояние не увеличиваются?

  3. В моем случае необходим полный режим вывода, поскольку я хочу использовать предложение orderBy . Есть ли какой-нибудь способ, с помощью которого я могу заставить спарк отбрасывать его состояние после каждых 30 минут и снова работать с новыми данными?

  4. Есть ли лучший способ не использовать полный вывод режим, но все же получить желаемый результат? Должен ли я использовать что-то еще, кроме потоковой структурированной искры?

Желаемым результатом является агрегация и группировка данных в соответствии с приведенным выше запросом, затем, когда 1-й пакет создан, отбросьте все состояние и запустите fre sh для следующей партии. Здесь партия может быть функцией последней обработанной метки времени. Например, скажем, сбросить все состояния и запускать fre sh, когда текущая временная метка пересекла 20 минут от первой полученной временной метки или лучше, если это функция времени окна (в этом примере 15 минут), как, например, сказать, когда 4 пакета по 15 минут windows имеют была обработана, и временная метка для 5-й партии прибывает в отбрасываемое состояние для предыдущих 4-х партий и запускается fre sh для этой партии.

1 Ответ

1 голос
/ 07 марта 2020

Вопрос задает много вещей и меньше фокусируется на том, что фактически делает Spark Structured Streaming (SSS). Отвечая на ваши пронумерованные вопросы, заглавные и ненумерованные вопросы:

A. Заголовок Вопрос:

Не как таковой, но в режиме Complete хранятся только агрегаты, поэтому сохраняются не все данные, а состояние, позволяющее пересчитывать данные на основе добавочного добавления данных. Я нахожу руководство вводящим в заблуждение с точки зрения его описания, но оно может быть справедливым для меня. Но в противном случае вы получите эту ошибку:

org. apache .spark. sql .AnalysisException: режим полного вывода не поддерживается, если потоковые агрегаты при потоковой передаче данных DataFrames / Datasets

не поддерживаются.
  1. Правильно ли я понимаю, что при использовании режима полного вывода в приемнике кафки промежуточное состояние будет увеличиваться вечно до тех пор, пока я не получу исключение OutOfMemory?

Раковина кафки здесь не фигурирует. Промежуточное состояние - это то, что Spark Structured Streaming необходимо хранить. Он хранит агрегаты и отбрасывает новые данные. Но в конце концов вы получите OOM из-за этой или другой ошибки, которую я подозреваю.

Кроме того, каков идеальный вариант использования для режима полного вывода? Использовать его только тогда, когда промежуточные данные / состояние не увеличиваются?

Для агрегирования по всем полученным данным. Вторая часть вашего вопроса не логична, и поэтому я не могу ответить. Состояние обычно увеличивается со временем.

В моем случае необходим режим полного вывода, так как я хочу использовать предложение orderBy. Есть ли какой-нибудь способ, с помощью которого я могу заставить искру сбросить состояние, которое она имеет после каждых 30 минут, и снова работать с новыми данными?

Нет, нет. Даже попытка изящно остановиться не является идеей, а затем перезапускается, так как период на самом деле не 15 минут. И это в любом случае против подхода SSS. Из руководств: Операции сортировки поддерживаются в потоковых наборах данных только после агрегирования и в режиме полного вывода. Вы не можете отбросить состояние так, как хотели бы, опять же агрегирует обсуждение.

Есть ли лучший способ не использовать режим полного вывода, но при этом получить желаемый результат? Должен ли я использовать что-то еще, кроме искровой структурированной потоковой передачи?

Нет, поскольку у вас много требований, которые не могут быть удовлетворены текущей реализацией. Если вы не отбрасываете порядок и не выполняете неперекрывающуюся оконную операцию (15,15) в режиме добавления с небольшим водяным знаком, если память работает правильно. Затем вы будете полагаться на последующую сортировку путем обработки в нисходящем направлении, поскольку порядок не поддерживается.

Окончательный общий вопрос: желаемый результат - агрегирование и группировка данных в соответствии с запросом, приведенным выше, а затем при 1-й партии был создан, сбросьте все состояние и запустите fre sh для следующей партии. Здесь партия может быть функцией последней обработанной метки времени. Например, скажем, сбросить все состояния и запускать fre sh, когда текущая временная метка пересекла 20 минут от первой полученной временной метки или лучше, если это функция времени окна (15 минут в этом примере), как, например, сказать, когда 4 пакета по 15 минут windows имеют была обработана, и временная метка для 5-го пакета прибывает в отбрасываемое состояние для предыдущих 4-х партий и начинается с sh для этого пакета.

Хотя ваши идеи могут считаться понятными, SSS-среда не поддерживает все это и именно то, что вы хотите (только пока).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...