Я пишу приложение Kafka Streams с использованием DSL API, которое будет читать кортежи из темы kafka. В топологии я хочу пакетировать кортежи. Затем я хочу записать ванну в файл на диске, если (1) прошло 30 секунд или (2) размер пакета превышен на 1 ГБ.
В топологии я написал кортежи групп, используя TimeWindowedKStream. Затем вызывает агрегат и передает Windowed Store.
Моя проблема в том, что когда хранилище состояний пыталось записать в Kafka ChangeLog, я получаю
org.apache.kafka.common.errors.RecordTooLargeException
исключение.
В частности:
Вызывается: org.apache.kafka.streams.errors.StreamsException: задача
[1_1] Прервать отправку из-за ошибки, обнаруженной в предыдущей записи (ключ
значение \ x00 \ x00 \ x00 \ x06 \ x00 \ x00 \ x01h $ \ xE7 \ x88 \ x00 \ x00 \ x00 \ x00
[B @ 419761c отметка времени 1546807396524) к теме
ibv2-capt-consumer-group-3-record-store-changelog из-за
org.apache.kafka.common.errors.RecordTooLargeException: запрос
включено сообщение, превышающее максимальный размер сообщения, которое сервер будет
принимаем ..
Я пытался установить CACHE_MAX_BYTES_BUFFERING_CONFIG
равным 1 МБ, но, как указано в документации, эта конфигурация используется для всей топологии.
Вот моя топология
Вот код Scala, который я использовал. Заметьте, что здесь я использую kafka-streams-scala.
val builder = new StreamsBuilderS()
import com.lightbend.kafka.scala.streams.DefaultSerdes._
implicit val recordSerde = (new RecordSerde).asInstanceOf[Serde[Record]]
implicit val recordSeqSerde = (new RecordSeqSerde).asInstanceOf[Serde[RecordSeq]]
val inputStream: KStreamS[String, Record] = builder.stream[String,Record](topic)
val keyed = inputStream.selectKey[Int]((k,r) => random.nextInt(10))
val grouped: TimeWindowedKStreamS[Int, Record] = keyed.groupByKey.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30L)))
import org.apache.kafka.common.utils.Bytes
val windowedStore: Materialized[Int, RecordSeq, WindowStore[Bytes, Array[Byte]]] = Materialized
.as[Int,RecordSeq,WindowStore[Bytes, Array[Byte]]]("record-store")
.withKeySerde(integerSerde)
.withValueSerde(recordSeqSerde)
.withLoggingEnabled(ChangeLogConfig.getChangeLogConfig.asJava) // increased max.request.size to 10 x default
val records: KTableS[Windowed[Int], RecordSeq] = grouped.aggregate(
() => RecordSeq(Seq()),
(randon: Int, record: Record, recordSeq: RecordSeq) => RecordSeq(recordSeq.records :+ record),
windowedStore
)
val recordSeqStream: KStreamS[String, RecordSeq] = records.toStream((ws, r) => s"${ws.key()}-${ws.window().start()}-${ws.window().end()}")
recordSeqStream.foreach((k: String, rs: RecordSeq) => WrappedRecordFileWriter.write(k, rs))
Примечание: кейс-класс RecordSeq (records: Seq [Record])