Есть ли способ очистить WindowStore Kafka Streams в зависимости от размера? - PullRequest
0 голосов
/ 07 января 2019

Я пишу приложение Kafka Streams с использованием DSL API, которое будет читать кортежи из темы kafka. В топологии я хочу пакетировать кортежи. Затем я хочу записать ванну в файл на диске, если (1) прошло 30 секунд или (2) размер пакета превышен на 1 ГБ.

В топологии я написал кортежи групп, используя TimeWindowedKStream. Затем вызывает агрегат и передает Windowed Store.

Моя проблема в том, что когда хранилище состояний пыталось записать в Kafka ChangeLog, я получаю

org.apache.kafka.common.errors.RecordTooLargeException

исключение.

В частности:

Вызывается: org.apache.kafka.streams.errors.StreamsException: задача [1_1] Прервать отправку из-за ошибки, обнаруженной в предыдущей записи (ключ значение \ x00 \ x00 \ x00 \ x06 \ x00 \ x00 \ x01h $ \ xE7 \ x88 \ x00 \ x00 \ x00 \ x00 [B @ 419761c отметка времени 1546807396524) к теме ibv2-capt-consumer-group-3-record-store-changelog из-за org.apache.kafka.common.errors.RecordTooLargeException: запрос включено сообщение, превышающее максимальный размер сообщения, которое сервер будет принимаем ..

Я пытался установить CACHE_MAX_BYTES_BUFFERING_CONFIG равным 1 МБ, но, как указано в документации, эта конфигурация используется для всей топологии.

Вот моя топология

Вот код Scala, который я использовал. Заметьте, что здесь я использую kafka-streams-scala.

val builder = new StreamsBuilderS()

import com.lightbend.kafka.scala.streams.DefaultSerdes._

implicit val recordSerde = (new RecordSerde).asInstanceOf[Serde[Record]]
implicit val recordSeqSerde = (new RecordSeqSerde).asInstanceOf[Serde[RecordSeq]]

val inputStream: KStreamS[String, Record] = builder.stream[String,Record](topic)

val keyed = inputStream.selectKey[Int]((k,r) => random.nextInt(10)) 

val grouped: TimeWindowedKStreamS[Int, Record] = keyed.groupByKey.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30L)))

import org.apache.kafka.common.utils.Bytes

val windowedStore: Materialized[Int, RecordSeq, WindowStore[Bytes, Array[Byte]]] = Materialized
  .as[Int,RecordSeq,WindowStore[Bytes, Array[Byte]]]("record-store")
  .withKeySerde(integerSerde)
  .withValueSerde(recordSeqSerde)
  .withLoggingEnabled(ChangeLogConfig.getChangeLogConfig.asJava)  // increased max.request.size to 10 x default

val records: KTableS[Windowed[Int], RecordSeq] = grouped.aggregate(
  () => RecordSeq(Seq()),
  (randon: Int, record: Record, recordSeq: RecordSeq) => RecordSeq(recordSeq.records :+ record),
  windowedStore
)

val recordSeqStream: KStreamS[String, RecordSeq] = records.toStream((ws, r) => s"${ws.key()}-${ws.window().start()}-${ws.window().end()}")

recordSeqStream.foreach((k: String, rs: RecordSeq) => WrappedRecordFileWriter.write(k, rs))

Примечание: кейс-класс RecordSeq (records: Seq [Record])

1 Ответ

0 голосов
/ 07 января 2019

Тема может иметь запись с максимальным размером, указанным в свойстве message.max.bytes. Это самый большой размер сообщения, которое брокер может получить и добавить в тему. Возможно, ваш размер записи превышает этот предел. Следовательно, вам нужно изменить конфигурацию этого свойства, чтобы разрешить больший размер записи.

Может быть установлено как на уровне брокера, так и на уровне темы. Более подробную информацию вы можете получить здесь:

http://kafka.apache.org/documentation/#brokerconfigs

http://kafka.apache.org/documentation/#topicconfigs

...