Связанные запросы Spring WebClient выполняются для синхронных API - PullRequest
0 голосов
/ 14 апреля 2020

Я знаю, что это очень много контекста, но, пожалуйста, потерпите меня.

Мы создаем службу на основе Spring Boot, которая в основном считывает некоторые данные из внешнего синхронного API через HTTP-вызовы и, в зависимости от некоторых определенных флагов c в данных, которые мы читаем из внешней системы, запускает некоторые дополнительные операции. Более подробная информация:

  1. Внешняя система предоставляет информацию в виде JSON форматированных строк, сгруппированных в последовательности. Мы читаем последовательности постепенно с интервалом последовательности. Пример запроса для интервала последовательности: https://external_host / api / contactchannels? FromSeq = 0 & toSeq = 100 . Мы знаем максимальный порядковый номер заранее. Например, если максимальный порядковый номер равен 80000, мы выполним 800 (последовательно) таких запросов (если интервал последовательности равен 100). На такой запрос отвечает тип содержимого text / plain и тело с количеством строк (это не фиксированное число, может быть от 0 до нескольких тысяч строк). Пример ответа, содержащий 3 строки:
{"id": "440C9931-DDF5-43A1-B217-3D81BFF27E4E", "contactId": "A764A2B9-172F-48E5-AD16-D20797F516E3", "permissionType": "MARKETING_PERMISSION", "status": "ACTIVE"}
{"id": "27E9A769-DE6A-4CD2-B9FD-D28683A5C726", "contactId": "C2D90BD1-4027-4A5C-A91A-E3ACABC8A9D2", "permissionType": "EMAIL", "status": "ACTIVE"}
{"id": "41B792A6-C701-4E03-AB73-59E1E21C75E3", "contactId": "91D2F68B-DC95-45CD-9A5F-65C4BAE67661", "permissionType": "EMAIL", "status": "PENDING"}

Каждая строка описывает каналы связи объекта.

Для каждого добавочного запроса, который мы выполняем (как указано выше), мы обрабатываем ответ, отфильтровывая все строки, кроме тех, которые имеют определенный тип_презентации ("EMAIL") и определенный статус ("ОЖИДАНИЕ"). В приведенном выше примере мы говорим только об этой строке при обработке:
{"id": "41B792A6-C701-4E03-AB73-59E1E21C75E3", "contactId": "91D2F68B-DC95-45CD-9A5F-65C4BAE67661", "permissionType": "EMAIL", "status": "PENDING"}

Для каждой такой строки мы берем идентификатор контакта и выполняем поиск другой конечной точки http, которая отвечает application / json тип контента и сущность под названием «контактные данные»

Мы берем определенное свойство из ответа JSON на запрос 2 и выполняем с ним некоторые дополнительные операции.

PS. Всякий раз, когда мы заканчиваем sh обработку всех последовательностей из интервала последовательностей, мы сохраняем контрольную точку toSeq в нашем локальном хранилище данных, чтобы возобновить работу с последней точки в случае сбоя.

Как мы это делаем прямо сейчас :

//retrieve from our database the last read sequence
Long maxSequence = entitySyncRepository.getMaxSequence();
//get the last sequence generated by the external system
Long newMaxSequence = entityApi.getMaxSequence();

//for each (from, to) tuples where maxSequence <= from < to <= newMaxSequence we are performing this method:

public void getAll(
          @Nullable Long fromSeq,
          @Nullable Long toSeq,
          @NonNull Consumer<String> consumer
  ) {

    URI uri = buildReplicationUri(fromSeq, toSeq);
   // uri is basically https://external_host/api/contactchannels?fromSeq={fromSeq}&toSeq={fromSeq + sequenceInterval}
    processMultilineStream(uri, consumer);
  }

, где параметр customer представляет дополнительное действие, которое мы должны выполнить в точке номер 3 (при обработке последовательности from-> to). ** processMultilineStream ** метод выполняет этот связанный с веб-клиентом вызов:

protected void processMultilineStream(Consumer<String> lineConsumer) {
this.webClient.get()
            .uri(uri)
            //perform the call to the sequence URI (1)
            .retrieve()
            .bodyToFlux(String.class)
            //map each line from request 1's response to a ContactChannel entity
            .map(EntitySynchroniser::lineToContactChannelMapper)
            //filter out all contact channels and leave only one that have certain    characteristics
            .filter(EntitySynchroniser::isPendingAndEmail)
            //for each contact channel remained, get the contact data id and perform request to another endpoint in order to get the email (request number 2)
            .flatMap(response1 -> getEmailWithStatusPending(response1.getContactData().getId()))
            .map(EntitySynchroniser::contactDataToEmailMapper)
            .doOnNext(emailAsString -> {
              //for each row find, apply the consumer that does the additional processing (step 3)
              lineConsumer.accept(emailAsString);
            })
            //wait for the incremental request to emit the last signal
            .blockLast();
}

Метод getEmailWithStatusPending возвращает Mono, выполняя впоследствии функции retrieve () и bodyToMono (String.class).

Мы думаем, что, возможно, был бы лучший способ написать это (возможно, мы могли бы лучше использовать возможности Flux для выполнения этих дополнительных запросов - Flux.generate или что-то в этом роде). Кто-нибудь сталкивался с таким вариантом использования (множественные встроенные действия с использованием вызовов webClient)? Любые идеи будут оценены.

...