Я знаю, что это очень много контекста, но, пожалуйста, потерпите меня.
Мы создаем службу на основе Spring Boot, которая в основном считывает некоторые данные из внешнего синхронного API через HTTP-вызовы и, в зависимости от некоторых определенных флагов c в данных, которые мы читаем из внешней системы, запускает некоторые дополнительные операции. Более подробная информация:
- Внешняя система предоставляет информацию в виде JSON форматированных строк, сгруппированных в последовательности. Мы читаем последовательности постепенно с интервалом последовательности. Пример запроса для интервала последовательности: https://external_host / api / contactchannels? FromSeq = 0 & toSeq = 100 . Мы знаем максимальный порядковый номер заранее. Например, если максимальный порядковый номер равен 80000, мы выполним 800 (последовательно) таких запросов (если интервал последовательности равен 100). На такой запрос отвечает тип содержимого text / plain и тело с количеством строк (это не фиксированное число, может быть от 0 до нескольких тысяч строк). Пример ответа, содержащий 3 строки:
{"id": "440C9931-DDF5-43A1-B217-3D81BFF27E4E", "contactId": "A764A2B9-172F-48E5-AD16-D20797F516E3", "permissionType": "MARKETING_PERMISSION", "status": "ACTIVE"}
{"id": "27E9A769-DE6A-4CD2-B9FD-D28683A5C726", "contactId": "C2D90BD1-4027-4A5C-A91A-E3ACABC8A9D2", "permissionType": "EMAIL", "status": "ACTIVE"}
{"id": "41B792A6-C701-4E03-AB73-59E1E21C75E3", "contactId": "91D2F68B-DC95-45CD-9A5F-65C4BAE67661", "permissionType": "EMAIL", "status": "PENDING"}
Каждая строка описывает каналы связи объекта.
Для каждого добавочного запроса, который мы выполняем (как указано выше), мы обрабатываем ответ, отфильтровывая все строки, кроме тех, которые имеют определенный тип_презентации ("EMAIL") и определенный статус ("ОЖИДАНИЕ"). В приведенном выше примере мы говорим только об этой строке при обработке:
{"id": "41B792A6-C701-4E03-AB73-59E1E21C75E3", "contactId": "91D2F68B-DC95-45CD-9A5F-65C4BAE67661", "permissionType": "EMAIL", "status": "PENDING"}
Для каждой такой строки мы берем идентификатор контакта и выполняем поиск другой конечной точки http, которая отвечает application / json тип контента и сущность под названием «контактные данные»
Мы берем определенное свойство из ответа JSON на запрос 2 и выполняем с ним некоторые дополнительные операции.
PS. Всякий раз, когда мы заканчиваем sh обработку всех последовательностей из интервала последовательностей, мы сохраняем контрольную точку toSeq в нашем локальном хранилище данных, чтобы возобновить работу с последней точки в случае сбоя.
Как мы это делаем прямо сейчас :
//retrieve from our database the last read sequence
Long maxSequence = entitySyncRepository.getMaxSequence();
//get the last sequence generated by the external system
Long newMaxSequence = entityApi.getMaxSequence();
//for each (from, to) tuples where maxSequence <= from < to <= newMaxSequence we are performing this method:
public void getAll(
@Nullable Long fromSeq,
@Nullable Long toSeq,
@NonNull Consumer<String> consumer
) {
URI uri = buildReplicationUri(fromSeq, toSeq);
// uri is basically https://external_host/api/contactchannels?fromSeq={fromSeq}&toSeq={fromSeq + sequenceInterval}
processMultilineStream(uri, consumer);
}
, где параметр customer представляет дополнительное действие, которое мы должны выполнить в точке номер 3 (при обработке последовательности from-> to). ** processMultilineStream ** метод выполняет этот связанный с веб-клиентом вызов:
protected void processMultilineStream(Consumer<String> lineConsumer) {
this.webClient.get()
.uri(uri)
//perform the call to the sequence URI (1)
.retrieve()
.bodyToFlux(String.class)
//map each line from request 1's response to a ContactChannel entity
.map(EntitySynchroniser::lineToContactChannelMapper)
//filter out all contact channels and leave only one that have certain characteristics
.filter(EntitySynchroniser::isPendingAndEmail)
//for each contact channel remained, get the contact data id and perform request to another endpoint in order to get the email (request number 2)
.flatMap(response1 -> getEmailWithStatusPending(response1.getContactData().getId()))
.map(EntitySynchroniser::contactDataToEmailMapper)
.doOnNext(emailAsString -> {
//for each row find, apply the consumer that does the additional processing (step 3)
lineConsumer.accept(emailAsString);
})
//wait for the incremental request to emit the last signal
.blockLast();
}
Метод getEmailWithStatusPending возвращает Mono, выполняя впоследствии функции retrieve () и bodyToMono (String.class).
Мы думаем, что, возможно, был бы лучший способ написать это (возможно, мы могли бы лучше использовать возможности Flux для выполнения этих дополнительных запросов - Flux.generate или что-то в этом роде). Кто-нибудь сталкивался с таким вариантом использования (множественные встроенные действия с использованием вызовов webClient)? Любые идеи будут оценены.