TarArchiveInputStream с Java Reactor - PullRequest
       4

TarArchiveInputStream с Java Reactor

0 голосов
/ 10 октября 2019

В настоящее время я имею дело с TarArchiveInputStream следующим образом:

private Mono<Employee> createEmployeeFromArchiveFile() {

    return Mono.fromCallable(() -> {
        return new Employee();
    })
    .flatMap(employee -> {

        try {
            TarArchiveInputStream tar =
                    new TarArchiveInputStream(new GzipCompressorInputStream(new FileInputStream(new File("/tmp/myarchive.tar.gz"))));
            TarArchiveEntry entry;
            tar.read();
            while ((entry = tar.getNextTarEntry()) != null) {
                if (entry.getName().equals("data1.txt")) {
                    // process data
                    String data1 = IOUtils.toString(tar, String.valueOf(StandardCharsets.UTF_8));
                    if (data1.contains("age")) {
                        employee.setAge(4);
                    } else {
                        return Mono.error(new Exception("Missing age"));
                    }
                }
                if (entry.getName().equals("data2.txt")) {
                    // a lot more processing => put that in another function for clarity purpose
                    String data2 = IOUtils.toString(tar, String.valueOf(StandardCharsets.UTF_8));
                    employee = muchProcessing(employee, data2);
                }
            }
            tar.close();
        } catch (Exception e) {
            return Mono.error(new Exception("Error while streaming archive"));
        }

        return Mono.just(employee);
    });

}

private Employee muchProcessing(Employee employee, String data2) {
    if (data2.contains("name")) {
        employee.setName(4);
    } else {
        // return an error ?
    }
    return employee;
}

Во-первых, это правильный способ обработки файла архива с помощью Reactor? Это работает нормально, но кажется, что синхронный бизнес внутри flatMap. Я не нашел лучшего способа.

Во-вторых, я не знаю, как справиться с функцией muchProcessing(tar). Если эта функция вызывает ошибки, как она будет возвращать их, чтобы их можно было правильно обработать как Mono.error? Поскольку я хочу, чтобы эта функция вернула мне сотрудника.

Спасибо!

1 Ответ

0 голосов
/ 10 октября 2019

Вы можете обработать задачу внутри flatMap как CompletableFuture и преобразовать ее в Mono. Вот ссылка о том, как это сделать:

Как создать моно из завершаемого будущего

Затем вы можете абстрагировать его как:

.flatMap(this::processEmployee).doOnError(this::logError).onErrorResume(getFallbackEmployee())
...