У меня есть код, который пишет Кафке в транзакции. Это следует за этим форматом:
public void removeEventTriggerInTransaction(UUID projectId, UUID id) {
Producer producer = producerPool.getProducer();
try {
producer.beginTransaction();
removeEventTrigger(producer, projectId, id);
producer.commitTransaction();
} catch (ExecutionException e) {
handleExecutionException(producer, e);
} catch (InterruptedException e) {
handleInterrupt(producer, e);
} finally {
producer.close();
}
}
, но может также стать более сложным, как это:
Producer producer = producerPool.getProducer();
try {
producer.beginTransaction();
for (NotificationSettingKey notificationSettingKey : notificationSettingKeys) {
kafkaWritingService.removeNotificationSetting(producer, notificationSettingKey);
}
for (UUID webhookSettingId : webhookSettingIds) {
kafkaWritingService.removeWebhookSetting(producer, id, webhookSettingId);
}
for (UUID eventTriggerId : eventTriggerIds) {
kafkaWritingService.removeEventTrigger(producer, id, eventTriggerId);
}
for (SubjectDeleteInfo subjectDeleteInfo : subjectDeleteInfoList) {
kafkaWritingService.removeSubject(producer, subjectDeleteInfo);
}
kafkaWritingService.removeProject(producer, id);
producer.commitTransaction();
} catch (ExecutionException e) {
handleExecutionException(producer, e);
} catch (InterruptedException e) {
handleInterrupt(producer, e);
} finally {
producer.close();
}
Ключ в том, что он может иметь много различных входных аргументов. В конечном итоге весь код завершается выполнением producer.send(producerRecord).get()
, которое потенциально генерирует ExecutionException, InterruptedException
, но в противном случае просто возвращает void.
Вспомогательные методы следующие:
public static void handleInterrupt(Producer producer, InterruptedException e) {
Thread.currentThread().interrupt();
log.info(THREAD_INTERRUPTED, e);
producer.abortTransaction();
}
public static void handleExecutionException(Producer producer, ExecutionException e) {
producer.abortTransaction();
throw new InternalException("Error producing to kafka", e);
}
Как я могу переписать этобез дублирования кода try / catch / final? Я нахожусь на Java 11 и надеюсь, что смогу передать некоторые блоки кода в качестве аргументов.