Reactor Flux: как разобрать файл с заголовком - PullRequest
1 голос
/ 29 мая 2020

Недавно я начал использовать проект реактор 3.3, и я не знаю, как лучше всего обрабатывать поток строк, используя первую строку в качестве имен столбцов, а затем использовать эти имена столбцов для обработки / преобразования всех остальных строк. Прямо сейчас я делаю так:

Flux<String> lines = ....;
Mono<String[]> columns = Mono.from(lines.take(1).map(header -> header.split(";"))); //getting first line
Flux<SomeDto> objectFlux = lines.skip(1) //skip first line
  .flatMapIterable(row -> //iterating over lines
      columns.map(cols -> convert(cols, row)));  //convert line into SomeDto object

Так правильно ли это?

1 Ответ

2 голосов
/ 29 мая 2020

Так правильно ли это?

Всегда есть более одного способа приготовить яйцо, но код, который у вас там, кажется странным / неоптимальным по двум основным причинам:

  • Я бы предположил, что это одна строка для каждой записи / DTO, которую вы хотите извлечь, поэтому немного странно, что вы используете flatMapIterable() вместо flatMap()
  • Вы собираетесь для повторной подписки на lines один раз для каждой строки, когда вы повторно оцениваете это Mono. Это почти наверняка не то, что вы хотите делать. (Кэширование Mono помогает, но вы все равно будете повторно подписываться как минимум дважды.)

Вместо этого вы можете использовать switchOnFirst(), что позволит вам для динамического преобразования Flux на основе первого элемента (заголовок в вашем случае.) Это означает, что вы можете сделать что-то вроде этого:

lines
        .switchOnFirst((signal, flux) -> flux.zipWith(Flux.<String[]>just(signal.get().split(";")).repeat()))
        .map(row -> convert(row.getT1(), row.getT2()))

Обратите внимание, что это пример медвежьих костей, на самом деле - world use вам нужно будет проверить, действительно ли signal имеет значение в соответствии с документами:

Обратите внимание, что источник может немедленно завершиться или ошибка вместо того, чтобы испускать, и в этом случае сигнал будет быть onComplete или onError. Это НЕ обязательно сигнал onNext, и его необходимо соответствующим образом проверить.

...