Как обработать файл CSV с использованием Reactor Flux и вывести его в формате JSON - PullRequest
1 голос
/ 25 сентября 2019

У меня есть CSV-файл, который я хочу обработать с использованием Spring Reactor Flux.

Имеется CSV-файл с заголовком, в котором зафиксированы первые два столбца, и может содержать несколько дополнительных данных.столбцы

Id, Name, Group, Status
6EF3C06E-6240-1A4A-17D6-27E73F0CDD31, Harlan Ferguson, xy1, true
6B261437-217C-0FDF-741A-92477EE354EC, Risa Greene, xy2, false
4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1, Samson Blanchard, xy3, false
562C3486-E009-2C2D-9D3E-14355DB7D4D7, Damian Carson, xy4, true
...
...
... 

Я хочу обработать ввод с помощью Flux, чтобы вывод был

[{
    "Id": "6EF3C06E-6240-1A4A-17D6-27E73F0CDD31",
    "Name": "Harlan Ferguson",
    "data": {
        "Group": "xyz1",
        "Status": true
    }
}, {
    "Id": "6B261437-217C-0FDF-741A-92477EE354EC",
    "Name": "Risa Greene",
    "data": {
        "Group": "xy2",
        "Status": false
    }
}, {
    "Id": "4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1",
    "Name": "Samson Blanchard",
    "data": {
        "Group": "xy3",
        "Status": false
    }
}, {
    "Id": "562C3486-E009-2C2D-9D3E-14355DB7D4D7",
    "Name": "Damian Carson",
    "data": {
        "Group": "xy4",
        "Status": true
    }
}]

Я использую CSVReader для потоковой передачи и созданияи Flux, используя

new CSVReader( Files.newBufferedReader(file) );
Flux<String[]> fluxOfCsvRecords = Flux.fromIterable(reader);

Я вернусь в Spring Reactor через пару лет, поэтому мое понимание немного устарело.

Создание моно заголовкаиспользование

Mono<String[]> headerMono = fluxOfCsvRecords.next();

И затем,

fluxOfCsvRecords.skip(1)
  .flatMap(csvRecord -> headerMono.map(header -> header[0] + " : " + csvRecord[0]))
.subscribe(System.out::println);

Это промежуточный код только для проверки того, что я могу объединить данные из заголовка и остальной части потока, ожидая увидеть

Id : 6EF3C06E-6240-1A4A-17D6-27E73F0CDD31
Id : 6B261437-217C-0FDF-741A-92477EE354EC
Id : 4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1
Id : 562C3486-E009-2C2D-9D3E-14355DB7D4D7

Но мой вывод просто

4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1 : 6EF3C06E-6240-1A4A-17D6-27E73F0CDD31

Буду признателен, если кто-нибудь сможет помочь мне понять, как этого добиться.

--------------------------- Обновление ---------------------

Пробовалдругой подход

Flux<String[]> take1 = fluxOfCsvRecords.take(1);
take1.flatMap(header -> fluxOfCsvRecords.map(csvRecord -> header[0] + " : " + csvRecord[0]))
.subscribe(System.out::println);

Выход

Id : 6B261437-217C-0FDF-741A-92477EE354EC
Id : 4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1
Id : 562C3486-E009-2C2D-9D3E-14355DB7D4D7

Отсутствует строка после заголовка

1 Ответ

0 голосов
/ 25 сентября 2019

Добавить два класса, например

public class TopJson {
    private int Id;
    private String name;

    private InnerJson data;

    public TopJson() {}
    public TopJson(int id, String name, InnerJson data) {
        super();
        Id = id;
        this.name = name;
        this.data = data;
    }



}


class InnerJson{

    private String group;
    private String status;
    public InnerJson() {}
    public InnerJson(String group, String status) {
        super();
        this.group = group;
        this.status = status;
    }

, преобразованные в соответствующие типы и используемые для создания объекта.

fluxOfCsvRecords.skip(1)
      .map((Function<String, TopJson>) x -> {
            String[] csvRecord = line.split(",");// a CSV has comma separated lines
            return new TopJson(Integer.parseInt(csvRecord[0]), csvRecord[1],
                    new InnerJson(csvRecord[2], csvRecord[3]));
        }).collect(Collectors.toList()));
...