Spring-kafka-test - не удалось получить значение из ConsumerRecord - PullRequest
0 голосов
/ 27 ноября 2018

У меня есть следующий класс модульных тестов, который тестирует Kafka Producer с использованием spring-kafka-test (EmbeddedKafka).

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class KafkaProducerTests {

  private final static String TOPIC_NAME = "greeting-topic";

  @Autowired
  private KafkaProducer kafkaProducer;

  private KafkaMessageListenerContainer<String, Greeting> container;

  private BlockingQueue<ConsumerRecord<String, Greeting>> records;

  @ClassRule
  public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC_NAME);

  @Before
  public void setUp() throws Exception {
    Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps("greeting-group", "false", embeddedKafka.getEmbeddedKafka());
    val consumerFactory = new DefaultKafkaConsumerFactory<String, Greeting>(consumerProperties);
    val containerProperties = new ContainerProperties(TOPIC_NAME);
    container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

    // create a thread safe queue to store the received message
    records = new LinkedBlockingQueue<>();

    // setup a Kafka message listener
    container
        .setupMessageListener(new MessageListener<String, Greeting>() {
          @Override
          public void onMessage(
              ConsumerRecord<String, Greeting> record) {
            log.info("test-listener received message='{}'", record.toString());
            records.add(record);
          }
        });

    // start the container and underlying message listener
    container.start();

    // wait until the container has the required number of assigned partitions
    ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
  }

  @After
  public void tearDown() {
    // stop the container
    container.stop();
  }

  @Test
  public void testSendMessageSuccessfully() throws Exception {
    Greeting greeting = new Greeting("Hello Shaun!", "Shaun");
    kafkaProducer.sendMessage(TOPIC_NAME, greeting);

    // check that the message was received
    ConsumerRecord<String, Greeting> received = records.poll(10, TimeUnit.SECONDS);

    Greeting resultGreeting = received.value();

    assertThat(resultGreeting).isEqualTo(greeting);
  }
}

Приведенный выше модульный тест дает мне ClassCastException в 4-й последней строке:

java.lang.ClassCastException: java.lang.String cannot be cast to com.shaunthomas999.kafka.model.Greeting

Как получить значение из ConsumerRecord?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...