Прекратите обработку сообщений кафки, если что-то пойдет не так во время процесса - PullRequest
1 голос
/ 08 апреля 2020

В моем процессоре API я храню сообщения в хранилище значений ключей, и каждые 100 сообщений я делаю запрос POST. Если что-то не получается при попытке отправить сообщение (API не отвечает и т. Д. c.) Я хочу прекратить обработку сообщений. Пока нет доказательств того, что вызовы API работают. Вот мой код:

public class BulkProcessor implements Processor<byte[], UserEvent> {

    private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;

    private BulkAPIClient bulkClient;

    private String storeName;

    private ProcessorContext context;

    private int count;

    @Autowired
    public BulkProcessor(String storeName, BulkClient bulkClient) {
        this.storeName = storeName;
        this.bulkClient = bulkClient;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
        count = 0;
        // to check every 15 minutes if there are any remainders in the store that are not sent yet
        this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
            if (count > 0) {
                sendEntriesFromStore();
            }
        });
    }

    @Override
    public void process(byte[] key, UserEvent value) {
        int userGroupId = Integer.valueOf(value.getUserGroupId());
        ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
        if (userEventArrayList == null) {
            userEventArrayList = new ArrayList<>();
        }
        userEventArrayList.add(value);
        keyValueStore.put(userGroupId, userEventArrayList);
        if (count == 100) {
            sendEntriesFromStore();
        }
    }

    private void sendEntriesFromStore() {
        KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
        while (iterator.hasNext()) {
            KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
            BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
            if (bulkRequest.getLocation() != null) {
                URI url = bulkClient.buildURIPath(bulkRequest);
                try {
                    bulkClient.postRequestBulkApi(url, bulkRequest);
                    keyValueStore.delete(entry.key);
                } catch (BulkApiException e) {
                    logger.warn(e.getMessage(), e.fillInStackTrace());
                }
            }
        }
        iterator.close();
        count = 0;
    }

    @Override
    public void close() {
    }
}

В настоящее время в моем коде, если вызов API завершается неудачно, он будет повторять следующие 100 (и это будет происходить до тех пор, пока он не будет работать) и добавлять их в keyValueStore. Я не хочу, чтобы это случилось. Вместо этого я бы предпочел остановить поток и продолжить после очистки keyValueStore. Возможно ли это?
Могу ли я выбросить StreamsException?

try {
    bulkClient.postRequestBulkApi(url, bulkRequest);
    keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
    throw new StreamsException(e);
}

Это убило бы мое потоковое приложение, и процесс умирает?

Ответы [ 2 ]

0 голосов
/ 28 апреля 2020

В конце я использовал KafkaConsumer вместо KafkaStreams, но суть в том, что я изменил BulkApiException на расширение RuntimeException, которое я снова выбрасываю после регистрации. Теперь это выглядит следующим образом:

        } catch (BulkApiException bae) {
            logger.error(bae.getMessage(), bae.fillInStackTrace());
            throw new BulkApiException();
        } finally {
            consumer.close();
            int exitCode = SpringApplication.exit(ctx, () -> 1);
            System.exit(exitCode);
        }

Таким образом, приложение закрывается и k8s перезапускает модуль. Это произошло потому, что если API, на котором я пытаюсь переслать запросы, не работает, то нет смысла продолжать читать сообщения. Так что, пока другой API не восстановится, k8s перезапустит модуль.

0 голосов
/ 08 апреля 2020
  1. Вы должны удалять запись из хранилища состояний только после того, как убедитесь, что ваша запись успешно обработана API, поэтому удалите первую keyValueStore.delete(entry.key); и оставьте вторую. Если нет, то вы можете потенциально потерять некоторые сообщения, когда keyValueStore.delete зафиксирован в базовом списке изменений topi c, но ваши сообщения еще не успешно обработаны, поэтому это всего лишь одна гарантия.
  2. Просто оберните вызывающий API код вокруг бесконечного l oop и продолжайте попытки до тех пор, пока запись не будет успешно обработана, ваш процессор не будет использовать новое сообщение с вышеуказанного узла процессора, потому что он работает в том же StreamThread:
    private void sendEntriesFromStore() {
        KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
        while (iterator.hasNext()) {
            KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
            //remove this state store delete code : keyValueStore.delete(entry.key);
            BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
            if (bulkRequest.getLocation() != null) {
                URI url = bulkClient.buildURIPath(bulkRequest);
                while (true) {
                    try {
                        bulkClient.postRequestBulkApi(url, bulkRequest);
                        keyValueStore.delete(entry.key);//only delete after successfully process the message to achieve at least one processing guarantee
                        break;
                    } catch (BulkApiException e) {
                        logger.warn(e.getMessage(), e.fillInStackTrace());
                    }
                }
            }
        }
        iterator.close();
        count = 0;
    }
Да, вы можете выдать исключение StreamsException, это StreamTask будет перенесено в другой StreamThread во время перебалансировки, возможно, в примере экземпляра приложения. Если API продолжает вызывать Exception до тех пор, пока не исчезнет весь StreamThread, ваше приложение не будет автоматически выходить и получать ниже Exception, вы должны добавить собственный обработчик StreamsException, чтобы выйти из приложения, когда все потоки потока умерли с помощью KafkaStreams#setUncaughtExceptionHandler или прослушать изменение состояния потока (в состояние ОШИБКА):
All stream threads have died. The instance will be in error state and should be closed.
...