Потоки Java / Quarkus Kafka для чтения / записи в один и тот же Topi c в зависимости от условия - PullRequest
0 голосов
/ 09 июля 2020

Здравствуйте, у меня есть проблема, которую я пытаюсь решить. В основном у меня топология Kafka Streams, которая будет читать JSON сообщения из Kafka topi c, и это сообщение десериализуется в POJO. Затем, в идеале, он прочитает это сообщение для определенного логического флага. Если этот флаг истинен, он выполнит некоторую трансформацию и затем запишет его обратно в topi c. Однако, если флаг ложный, я пытаюсь, чтобы он ничего не писал, но я не уверен, go как я могу об этом *1016*. С помощью MP Reactive Messaging я могу просто использовать поток Rx Java 2 Flowable и вернуть что-то вроде Flowable.empty (), но я не могу использовать этот метод здесь, кажется.

JsonbSerde<FinancialMessage> financialMessageSerde = new JsonbSerde<>(FinancialMessage.class);

StreamsBuilder builder = new StreamsBuilder();

builder.stream(
        TOPIC_NAME,
        Consumed.with(Serdes.Integer(), financialMessageSerde)
    )
    .mapValues ( 
        message -> checkCondition(message)
    )
    .to (
        TOPIC_NAME,
        Produced.with(Serdes.Integer(), financialMessageSerde)
    );

Ниже лог вызова функции c.

public FinancialMessage checkCondition(FinancialMessage rawMessage) {
    FinancialMessage receivedMessage = rawMessage;

    if (receivedMessage.compliance_services) {
        receivedMessage.compliance_services = false;

        return receivedMessage;
    }

    else return null;
}

Если логическое значение false, оно просто возвращает тело JSON с «null».

Я попытался изменить тип возвращаемого значения функция checkCondition, обернутая как

public Flowable<FinancialMessage> checkCondition (FinancialMessage rawMessage) 

, а затем возврат из if будет похож на Flowable.just (receiveMessage) или Flowable.empty (), но я не могу сериализовать объект Flowable. Это может быть глупый вопрос, но есть ли лучший способ go по этому поводу?

1 Ответ

1 голос
/ 05 августа 2020

Обратите внимание, что сообщения Kafka неизменны и не удаляются после чтения, и если вы читаете / пишете из одного и того же c топа с одним приложением, сообщение будет обрабатываться бесконечно часто (или, точнее, разные его копии. ), если у вас нет условия для «разрыва» цикла.

Кроме того, если, например, 5 сервисов читают из одного и того же topi c, все 5 сервисов получают копию каждого события. И если одна служба ответит на обратную запись, все остальные 4 службы и сама служба записи прочитают сообщение снова. Таким образом, вы получаете некоторое усиление данных.

Если у вас есть разные сервисы, которые последовательно реагируют на исходное входное сообщение, у вас может быть один топи c между каждой парой последовательных сервисов, чтобы действительно построить конвейер.

Наконец, вы говорите, что если логический флаг равен true, вы хотите преобразовать сообщение и передать его (я предполагаю, что для следующей услуги потребителю). А для false ничего не нужно делать. Я также предполагаю, что для сообщения только один флаг будет true, а успешное преобразование также переключает флаг (чтобы разрешить обработку следующей службой). В этом случае лучше всего, если вы можете убедиться, что каждое исходное входное сообщение имеет один и тот же начальный логический флаг, установленный для построения вашего конвейера. Таким образом, только соответствующая служба будет читать сообщения с установленным логическим флагом (вам даже не нужно проверять логический флаг, поскольку ваша запись в восходящем потоке гарантирует, что он установлен; у вас может быть только проверка работоспособности).

Если вы не знаете, какой логический флаг установлен изначально, и все службы читают из одного и того же входа topi c, правильным будет просто отфильтровать сообщение. Если все службы читают все сообщения, 4 службы будут фильтровать сообщение, а одна служба обработает его и выдаст новое сообщение с другим флагом. Для этой архитектуры один topi c может работать : если сообщение обрабатывается всеми службами и все логические флаги ложны (после того, как все службы обработали сообщение), и вы записываете его обратно на вход topi c, все сервисы корректно сбросили бы последнюю копию. Однако использование одного topi c подразумевает большое количество избыточных операций чтения / записи.

Возможно, лучшая архитектура - иметь исходный вход topi c и один дополнительный вход topi c для каждого служба. Вы также можете использовать дополнительную службу «диспетчер», которая считывает исходные темы ввода и branches() KStream в темы ввода службы в соответствии с логическим флагом. Таким образом, каждая служба будет читать только сообщения с правильным флагом, установленным на true. Кроме того, каждая служба будет записывать на вход topi c других служб, также используя branch() после преобразования сообщения, чтобы записать его во входную topi c правильной следующей службы. Наконец, вам понадобится выходной topi c, в который каждая служба сможет писать после того, как сообщение будет полностью обработано.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...