Как мне переписать этот код транзакции Kafka, чтобы я мог передать код с переменными аргументами в (в настоящее время дублированный) блок? - PullRequest
0 голосов
/ 01 октября 2019

У меня есть код, который пишет Кафке в транзакции. Это следует за этим форматом:

public void removeEventTriggerInTransaction(UUID projectId, UUID id) {
    Producer producer = producerPool.getProducer();
    try {
        producer.beginTransaction();
        removeEventTrigger(producer, projectId, id);
        producer.commitTransaction();
    } catch (ExecutionException e) {
        handleExecutionException(producer, e);
    } catch (InterruptedException e) {
        handleInterrupt(producer, e);
    } finally {
        producer.close();
    }
}

, но может также стать более сложным, как это:

Producer producer = producerPool.getProducer();
try {
    producer.beginTransaction();
    for (NotificationSettingKey notificationSettingKey : notificationSettingKeys) {
        kafkaWritingService.removeNotificationSetting(producer, notificationSettingKey);
    }
    for (UUID webhookSettingId : webhookSettingIds) {
        kafkaWritingService.removeWebhookSetting(producer, id, webhookSettingId);
    }
    for (UUID eventTriggerId : eventTriggerIds) {
        kafkaWritingService.removeEventTrigger(producer, id, eventTriggerId);
    }
    for (SubjectDeleteInfo subjectDeleteInfo : subjectDeleteInfoList) {
        kafkaWritingService.removeSubject(producer, subjectDeleteInfo);
    }
    kafkaWritingService.removeProject(producer, id);
    producer.commitTransaction();
} catch (ExecutionException e) {
    handleExecutionException(producer, e);
} catch (InterruptedException e) {
    handleInterrupt(producer, e);
} finally {
    producer.close();
}

Ключ в том, что он может иметь много различных входных аргументов. В конечном итоге весь код завершается выполнением producer.send(producerRecord).get(), которое потенциально генерирует ExecutionException, InterruptedException, но в противном случае просто возвращает void.

Вспомогательные методы следующие:

public static void handleInterrupt(Producer producer, InterruptedException e) {
    Thread.currentThread().interrupt();
    log.info(THREAD_INTERRUPTED, e);
    producer.abortTransaction();
}

public static void handleExecutionException(Producer producer, ExecutionException e) {
    producer.abortTransaction();
    throw new InternalException("Error producing to kafka", e);
}

Как я могу переписать этобез дублирования кода try / catch / final? Я нахожусь на Java 11 и надеюсь, что смогу передать некоторые блоки кода в качестве аргументов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...