Потоки Kafka: не удалось загрузить sh хранилище состояний, вызванное java .lang.ClassCastException: невозможно найти ключ к значению - PullRequest
6 голосов
/ 06 января 2020

Некоторое время мы использовали потоки Kafka, но никогда не писали тесты, чтобы охватить наши топологии. Мы решили дать ему go и использовать драйвер Topology Test, предоставляемый библиотекой потоков. К сожалению, мы столкнулись с проблемой, которую не можем решить. Вот фиктивная версия нашего производственного кода с той же семантикой.

Она объединяет 2 темы, содержащие 2 типа документов. Наша цель - объединить документы в «папку» на человека, где используется информация из разных документов. При запуске теста мы встречаемся с исключением, которое вызвано неправильным приведением от PersonKey к DocumentA. Ниже вы можете увидеть настройки тестирования, схему структур данных и трассировку стека исключения.

package com.zenjob.me.indexer.application.domain;

import com.demo.DocumentA;
import com.demo.DocumentB;
import com.demo.DocumentFolder;
import com.demo.PersonKey;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import lombok.extern.log4j.Log4j2;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.junit.Assert;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@SuppressWarnings("SimplifiableJUnitAssertion")
@Log4j2
class DemoTest {

    private SchemaRegistryClient schemaRegistryClient     = new MockSchemaRegistryClient();
    private String               documentATopicName       = "documentATopicName";
    private String               documentBTopicName       = "documentBTopicName";
    private String               documentFoldersTopicName = "documentFoldersTopicName";

    private <T extends SpecificRecord> SpecificAvroSerde<T> getSerde(boolean isForKey) {
        Map<String, Object> serdeConfig = new HashMap<>();
        serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wat-ever-url-anyway-it-is-mocked");

        SpecificAvroSerde<T> serde = new SpecificAvroSerde<>(schemaRegistryClient);
        serde.configure(serdeConfig, isForKey);
        return serde;
    }

    @Test
    void test() {

        StreamsBuilder builder = new StreamsBuilder();

        SpecificAvroSerde<PersonKey> keySerde = this.getSerde(true);
        SpecificAvroSerde<DocumentA> documentASerde = this.getSerde(false);
        SpecificAvroSerde<DocumentB> documentBSerde = this.getSerde(false);
        SpecificAvroSerde<DocumentFolder> documentFolderSerde = this.getSerde(false);

        KTable<PersonKey, DocumentA> docATable = builder.table(documentATopicName, Consumed.with(keySerde, documentASerde), Materialized.with(keySerde, documentASerde));
        KTable<PersonKey, DocumentB> docBTable = builder.table(documentBTopicName, Consumed.with(keySerde, documentBSerde), Materialized.with(keySerde, documentBSerde));

        docATable
                .mapValues(documentA ->
                                DocumentFolder.newBuilder()
                                        .setPropertyA(documentA.getPropertyA())
                                        .build(),
                        Materialized.with(keySerde, documentFolderSerde))
                .leftJoin(docBTable,
                        (folder, documentB) -> {
                            if (documentB == null) {
                                return folder;
                            }
                            return DocumentFolder.newBuilder(folder)
                                    .setPropertyB(documentB.getPropertyB())
                                    .build();
                        },
                        Materialized.with(keySerde, documentFolderSerde)
                )
                .toStream()
                .to(documentFoldersTopicName, Produced.with(keySerde, documentFolderSerde));

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

        TopologyTestDriver driver = new TopologyTestDriver(builder.build(), config);

        ConsumerRecordFactory<PersonKey, DocumentA> documentAConsumerRecordFactory = new ConsumerRecordFactory<>(documentATopicName, keySerde.serializer(), documentASerde.serializer());
        ConsumerRecordFactory<PersonKey, DocumentB> documentBConsumerRecordFactory = new ConsumerRecordFactory<>(documentBTopicName, keySerde.serializer(), documentBSerde.serializer());

        // When
        String personId = "person-id";
        PersonKey key = PersonKey.newBuilder().setPropertyA(personId).build();
        DocumentA documentA = DocumentA.newBuilder().setPropertyA("docA-propA").build();
        DocumentB documentB = DocumentB.newBuilder().setPropertyB("docB-propB").build();

        driver.pipeInput(documentAConsumerRecordFactory.create(key, documentA));
        driver.pipeInput(documentBConsumerRecordFactory.create(key, documentB));

        ProducerRecord<PersonKey, DocumentFolder> output1 = driver.readOutput(documentFoldersTopicName, keySerde.deserializer(), documentFolderSerde.deserializer());
        ProducerRecord<PersonKey, DocumentFolder> output2 = driver.readOutput(documentFoldersTopicName, keySerde.deserializer(), documentFolderSerde.deserializer());

        log.info(output1);
        log.info(output2);

        Assert.assertEquals(documentA.getPropertyA(), output1.value().getPropertyA());
        Assert.assertEquals(null, output1.value().getPropertyB());

        Assert.assertEquals(documentA.getPropertyA(), output2.value().getPropertyA());
        Assert.assertEquals(documentB.getPropertyB(), output2.value().getPropertyB());

        driver.close();
    }
}

DocumentA

{
  "type" : "record",
  "name" : "DocumentA",
  "namespace" : "com.demo",
  "fields" : [
    {
      "name" : "propertyA",
      "type" : "string"
    }
  ]
}

DocumentB

{
  "type" : "record",
  "name" : "DocumentB",
  "namespace" : "com.demo",
  "fields" : [
    {
      "name" : "propertyB",
      "type" : "string"
    }
  ]
}

DocumentFolder

{
  "type" : "record",
  "name" : "DocumentFolder",
  "namespace" : "com.demo",
  "fields" : [
    {
      "name" : "propertyA",
      "type" : "string"
    },
    {
      "name" : "propertyB",
      "type" : [
        "null",
        "string"
      ],
      "default" : null
    }
  ]
}

PersonKey

{
  "type" : "record",
  "name" : "PersonKey",
  "namespace" : "com.demo",
  "fields" : [
    {
      "name" : "propertyA",
      "type" : "string"
    }
  ]
}

Исключение

task [0_0] Failed to flush state store documentATopicName-STATE-STORE-0000000000
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store documentATopicName-STATE-STORE-0000000000
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
    at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:421)
    at com.zenjob.me.indexer.application.domain.DemoTest.test(DemoTest.java:97)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:92)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$100(JUnitPlatformTestClassProcessor.java:77)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:73)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.stop(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:131)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
    at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
    at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: com.demo.PersonKey cannot be cast to com.demo.DocumentA
    at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues.computeValue(KTableMapValues.java:78)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues.access$400(KTableMapValues.java:27)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:117)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:97)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:131)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:237)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
    ... 68 more

Кафка v2.3.0 Avro v1.9.1 'KafkaAvroSerde v5.2.1'

ОБНОВЛЕНИЕ

Я попытался переписать топологию с помощью процессора API, но безуспешно. После этого я попытался использовать реальный реестр схемы, и тест прошел, поэтому, похоже, проблема в MockSchemaRegistry. Будет опубликовано другое обновление, когда я найду причину.

ОБНОВЛЕНИЕ 2

Мне удалось заставить его работать с реестром фиктивной схемы, но мне пришлось вручную зарегистрировать все схемы включая те, которые хранятся в государственных хранилищах и в журналах изменений внутреннего хранилища состояний

...