java .io.BufferedReader (). Map Невозможно определить тип аргумента (ов) для <T>fromStream (Stream) ) - PullRequest
2 голосов
/ 25 марта 2020

Сценарий: Spring WebFlux, запускающий CommandLineRunner.run для загрузки данных в MongoDb для целей тестирования.

Цель: при локальном запуске микросервиса он предназначен для чтения файла json и загрузки документов в MongDb.

Личные знания: "bufferedReader.lines (). Filter (l ->! L.trim (). IsEmpty ()" читает каждый json узел и возвращает его в виде потока. Затем я могу сопоставить его с «l» и доступ к методам get. Полагаю, мне не нужно создавать список и затем передавать его в потоковом режиме, поскольку я уже загрузил его в виде потока с помощью «new InputStreamReader (getClass (). getClassLoader (). getResourceAsStream ()» и Я предполагаю, что могу использовать lines (), так как его узел приведет к строковой строке. Я в правильном направлении или у меня возникла какая-то идея?

Это пример файла json:

{
  "Extrato": {
    "description": "credit",
    "value": "R$1.000,00",
    "status": 11
  },
  "Extrato": {
    "description": "debit",  
    "value": "R$2.000,00",
    "status": 99
  }
}

модель

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Document
public class Extrato {

    @Id
    private String id;
    private String description;
    private String value;
    private Integer status;

    public Extrato(String id, String description, String value, Integer status) {
        super();
        this.id = id;
        this.description = description;
        this.value = value;
        this.status = status;
    }
... getters and setter accordinly

Репозиторий

import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;

import com.noblockingcase.demo.model.Extrato;

import reactor.core.publisher.Flux;
import org.springframework.data.domain.Pageable;

public interface ExtratoRepository extends ReactiveCrudRepository<Extrato, String> {
    @Query("{ id: { $exists: true }}")
    Flux<Extrato> retrieveAllExtratosPaged(final Pageable page);
}

команда для загрузки сверху json file

import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import com.noblockingcase.demo.model.Extrato;
import com.noblockingcase.demo.repository.ExtratoRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Flux;

@Component
public class TestDataLoader implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(TestDataLoader.class);
    private ExtratoRepository extratoRepository;

    TestDataLoader(final ExtratoRepository extratoRepository) {
        this.extratoRepository = extratoRepository;
    }

    @Override
    public void run(final String... args) throws Exception {
        if (extratoRepository.count().block() == 0L) {
            final LongSupplier longSupplier = new LongSupplier() {
                Long l = 0L;

                @Override
                public long getAsLong() {
                    return l++;
                }
            };
            BufferedReader bufferedReader = new BufferedReader(
                    new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.txt")));

//*** THE ISSUE IS NEXT LINE
            Flux.fromStream(bufferedReader.lines().filter(l -> !l.trim().isEmpty())
                    .map(l -> extratoRepository.save(new Extrato(String.valueOf(longSupplier.getAsLong()),
                            l.getDescription(), l.getValue(), l.getStatus()))))
                    .subscribe(m -> log.info("Carga Teste: {}", m.block()));

        }
    }

}

Вот конфигурация MongoDb, хотя я не думаю, что это уместно

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.mongodb.MongoClientOptions;

@Configuration
public class MongoDbSettings {

    @Bean
    public MongoClientOptions mongoOptions() {
        return MongoClientOptions.builder().socketTimeout(2000).build();
    }

}

Если я попробую свой оригинальный код и настрою его для чтения текстового файла, я смогу успешно прочитать текстовый файл вместо json. Очевидно, что он не соответствует моим требованиям так как я хочу прочитать файл json. Кстати, он может уточнить немного, где я заблокирован. * 1 022 *

load-test.txt (доступно в https://github.com/jimisdrpc/webflux-worth-scenarious/blob/master/demo/src/main/resources/carga-teste.txt)

crédito de R$1.000,00
débito de R$100,00

код фрагмента для работы с простым текстовым файлом

    BufferedReader bufferedReader = new BufferedReader(
            new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.txt")));
    Flux.fromStream(bufferedReader.lines().filter(l -> !l.trim().isEmpty())
            .map(l -> extratoRepository
                    .save(new Extrato(String.valueOf(longSupplier.getAsLong()), "Qualquer descrição", l))))
            .subscribe(m -> log.info("Carga Teste: {}", m.block()));

Весь проект работает успешно, читая из текстового файла: https://github.com/jimisdrpc/webflux-worth-scenarious/tree/master/demo

Docker compose для загрузки MongoDb https://github.com/jimisdrpc/webflux-worth-scenarious/blob/master/docker-compose.yml

Подводя итог, моя проблема : Я не понял, как прочитать файл json и вставить данные в MongoDb во время CommandLineRunner.run ()

Ответы [ 2 ]

1 голос
/ 31 марта 2020

Обратите внимание, что ваш json недействителен. Текстовые данные не совпадают с json. Json нужна специальная обработка, поэтому всегда лучше использовать библиотеку.

carga-teste. json

[
  {"description": "credit", "value": "R$1.000,00", "status": 11},
  {"description": "debit","value": "R$2.000,00", "status": 99}
]

Кредиты см. В статье здесь - https://www.nurkiewicz.com/2017/09/streaming-large-json-file-with-jackson.html.

Я принял решение использовать Flux.

@Override
public void run(final String... args) throws Exception {

        BufferedReader bufferedReader = new BufferedReader(
                new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.json")));

        ObjectMapper mapper = new ObjectMapper();

        Flux<Extrato> flux = Flux.generate(
                () -> parser(bufferedReader, mapper),
                this::pullOrComplete,
                jsonParser -> {
                    try {
                        jsonParser.close();
                    } catch (IOException e) {}
                });

        flux.map(l -> extratoRepository.save(l)).subscribe(m -> log.info("Carga Teste: {}", m.block()));
    }
}

private JsonParser parser(Reader reader, ObjectMapper mapper) {
    JsonParser parser = null;
    try {
        parser = mapper.getFactory().createParser(reader);
        parser.nextToken();
    } catch (IOException e) {}
    return parser;
}

private JsonParser pullOrComplete(JsonParser parser, SynchronousSink<Extrato> emitter) {
    try {
        if (parser.nextToken() != JsonToken.END_ARRAY) {
            Extrato extrato = parser.readValueAs(Extrato.class);
            emitter.next(extrato);
        } else {
            emitter.complete();
        }
    } catch (IOException e) {
        emitter.error(e);
    }
    return parser;
}
1 голос
/ 25 марта 2020

Я нашел пример с Flux :: использованием Flux :: fromStream, чтобы быть полезным для этой цели. Это прочитает ваш файл в Flux, а затем вы сможете подписаться и обработать .flatmap или что-то еще. Из Javado c

использование (Callable resourceSupplier, Function> sourceSupplier, Consumer resourceCleanup) Использует ресурс, созданный поставщиком для каждого отдельного подписчика, при потоковой передаче значений из издателя, полученных из тот же ресурс и обеспечивает освобождение ресурса, если последовательность завершается или подписчик отменяет.

и код, который я собрал:

private static Flux<Account> fluxAccounts() {
    return Flux.using(() -> 
        new BufferedReader(new InputStreamReader(new ClassPathResource("data/ExportCSV.csv").getInputStream()))
            .lines()
            .map(s->{
                String[] sa = s.split(" ");
                return Account.builder()
                    .firstname(sa[0])
                    .lastname(sa[1])
                    .build();
            }),
            Flux::fromStream,
            BaseStream::close
    );
}
...