Spark структурированная потоковая передача с Kafka не учитывает startOffset = "самое раннее" - PullRequest
4 голосов
/ 19 июня 2019

Я настроил Spark Structured Streaming (Spark 2.3.2) для чтения из Kafka (2.0.0). Я не могу использовать с начала темы, если сообщения поступили в тему до запуска потоковой работы Spark. Является ли это ожидаемым поведением потоковой передачи Spark, когда он игнорирует сообщения Kafka, созданные до первоначального запуска задания Spark Stream (даже с .option ("stratingOffsets", "ранние"))?

Шаги для воспроизведения

  1. Перед началом потокового задания создайте тему test (один посредник, один раздел) и создайте сообщения для темы (3 сообщения в моем примере).

  2. Запустить spark-shell следующей командой: spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2.3.1.0.0-78 --repositories http://repo.hortonworks.com/content/repositories/releases/

  3. Выполните приведенный ниже код искры Скала.

// Local
val df = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9097")
  .option("failOnDataLoss","false")
  .option("stratingOffsets","earliest")
  .option("subscribe", "test")
  .load()

// Sink Console
val ds = df.writeStream.format("console").queryName("Write to console")
  .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 second"))
  .start()

Ожидаемый против фактического выхода

Я ожидаю, что поток начнется со смещения = 1. Однако он начинает читать со смещения = 3. Вы можете видеть, что клиент kafka фактически сбрасывает начальное смещение: 2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.

Я вижу, что искровой поток обрабатывает сообщения, которые я генерирую после запуска потокового задания.

Это ожидаемое поведение потоковой передачи Spark, когда он игнорирует сообщения Kafka, созданные до первоначального запуска задания Spark Stream (даже с .option("stratingOffsets","earliest"))?

2019-06-18 21:22:57 INFO  AppInfoParser:109 - Kafka version : 2.0.0.3.1.0.0-78
2019-06-18 21:22:57 INFO  AppInfoParser:110 - Kafka commitId : 0f47b27cde30d177
2019-06-18 21:22:57 INFO  MicroBatchExecution:54 - Starting new streaming query.
2019-06-18 21:22:57 INFO  Metadata:273 - Cluster ID: LqofSZfjTu29BhZm6hsgsg
2019-06-18 21:22:57 INFO  AbstractCoordinator:677 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Discovered group coordinator localhost:9097 (id: 2147483647 rack: null)
2019-06-18 21:22:57 INFO  ConsumerCoordinator:462 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Revoking previously assigned partitions []
2019-06-18 21:22:57 INFO  AbstractCoordinator:509 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] (Re-)joining group
2019-06-18 21:22:57 INFO  AbstractCoordinator:473 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Successfully joined group with generation 1
2019-06-18 21:22:57 INFO  ConsumerCoordinator:280 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Setting newly assigned partitions [test-0]
2019-06-18 21:22:57 INFO  Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO  KafkaSource:54 - Initial offsets: {"test":{"0":3}}
2019-06-18 21:22:58 INFO  Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO  MicroBatchExecution:54 - Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1560910978083,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))
2019-06-18 21:22:58 INFO  KafkaSource:54 - GetBatch called with start = None, end = {"test":{"0":3}}

Пакетный режим Spark

Мне удалось подтвердить, что пакетный режим читает с самого начала, поэтому никаких проблем с конфигурацией хранения Kafka

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9097")
  .option("subscribe", "test")
  .load()

df.count // Long = 3

Ответы [ 2 ]

0 голосов
/ 27 июня 2019

Ха-ха, это была простая опечатка: "stratingOffsets" должен быть "startOffsets"

0 голосов
/ 19 июня 2019

Вы можете сделать это двумя способами. загрузить данные из kafka в потоковый фрейм данных или загрузить данные из kafka в статический фрейм данных (для тестирования).

Я думаю, вы не видите данные из-за идентификатора группы. kafka передаст группу потребителей и смещения во внутреннюю тему. убедитесь, что имя группы уникально для каждого чтения.

Вот два варианта.

Вариант 1: чтение данных из кафки в потоковый фрейм данных

// spark streaming with kafka 

import org.apache.spark.sql.streaming.ProcessingTime

val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers","app01.app.test.net:9097,app02.app.test.net:9097")
.option("subscribe", "kafka-testing-topic")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("startingOffsets","earliest")
.option("maxOffsetsPerTrigger","6000")
.load()

val ds2 = ds1.select(from_json($"value".cast(StringType), dataSchema).as("data")).select("data.*")
val ds3 = ds2.groupBy("TABLE_NAME").count()
ds3.writeStream
.trigger(ProcessingTime("10 seconds"))
.queryName("query1").format("console")
.outputMode("complete")
.start()
.awaitTermination()

Вариант 2: чтение данных из kafka в статический фрейм данных (для тестирования он будет загружаться с начала)


// Subscribe to 1 topic defaults to the earliest and latest offsets
val ds1 = spark.read.format("kafka")
.option("kafka.bootstrap.servers","app01.app.test.net:9097,app02.app.test.net:9097")
.option("subscribe", "kafka-testing-topic")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("spark.streaming.kafka.consumer.cache.enabled","false")
.load()

val ds2 = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","topic","partition","offset","timestamp")
val ds3 = ds2.select("value").rdd.map(x => x.toString)
ds3.count()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...