Идентификатор корреляции теряется при использовании Collection с ReplyingKafkaTemplate - PullRequest
1 голос
/ 09 июля 2019

Это мой слушатель

@KafkaListener(topics = ["cartListHashes"])
@SendTo
@Transactional
fun listHashes(token: String): Collection<String> {
    // get id
    return doListHashes(token)
}

private fun doListHashes(token: String): Collection<String> {
    val id = userService.lookupIdSync(token)
    if (id == null) {
        log.info("Cannot get user id with token $token")
        return emptyList()
    }
    return cartRepo.listHashes(id).map { base32.encodeToString(it) }
}

Проблема в том, что идентификатор корреляции утерян.

В ответе не найдено корреляции: xxx - для использования семантики запроса / ответа,отвечающий сервер должен вернуть идентификатор корреляции в заголовке «correlationId»

1 Ответ

0 голосов
/ 09 июля 2019

Оказывается, я не могу использовать Collection как тип возвращаемого значения. В MessagingMessageListenerAdapter:

protected void sendResponse(Object result, String topic, @Nullable Object source, boolean messageReturnType) {
    if (!messageReturnType && topic == null) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("No replyTopic to handle the reply: " + result);
        }
    }
    else if (result instanceof Message) {
        this.replyTemplate.send((Message<?>) result);
    }
    else {
        if (result instanceof Collection) {
            ((Collection<V>) result).forEach(v -> {
                if (v instanceof Message) {
                    this.replyTemplate.send((Message<?>) v);
                }
                else {
                    this.replyTemplate.send(topic, v);
                }
            });
        }
        else {
            sendSingleResult(result, topic, source);
        }
    }
}

Результат сбора будет рассматриваться как отдельные сообщения.

Работает после того, как я изменил тип возвращаемого значения на Array.

@KafkaListener(topics = ["cartListHashes"])
@SendTo
@Transactional
fun listHashes(token: String): Array<String> {
    // get id
    return doListHashes(token)
}

private fun doListHashes(token: String): Array<String> {
    val id = userService.lookupIdSync(token)
    if (id == null) {
        log.info("Cannot get user id with token $token")
        return emptyArray()
    }
    return cartRepo.listHashes(id).map { base32.encodeToString(it) }.toTypedArray()
}
...