Reactor - groupBy начинает сбрасывать сообщения - PullRequest
0 голосов
/ 23 июня 2019

Ниже приведено определение канала данных.Но это начинает падать под нагрузкой, когда я создаю тысячи групп в минуту ... любые способы решить эту проблему

ds.getPublisher()
        .onBackpressureBuffer(15000)
        .onBackpressureDrop(record -> LOGGER.error("Backpressure applied-1. Dropped records: {}", record))
        .groupBy(record -> record.getGroupId())
        .flatMap(group -> group
                .timeout(Duration.ofSeconds(60))
                .bufferUntil(record -> isGroupComplete(record))
                .bufferTimeout(100, Duration.ofSeconds(5))
                .map(listOflistOfRecords -> listOflistOfRecords.stream().flatMap(List::stream).collect(Collectors.toList()))
                .onErrorContinue((th, records) -> {
                    LOGGER.error("Timedout records: {}", records);
                                            // TAKE ACTION ON THE RECORDS
                }))
        .filter(records -> {
            return (publishController.shouldIPublish()) ? true 
                    : records.get(0).getCreatedTimestamp() <= (publishController.stopRequestTimestamp() - 5);
        })
        .doOnDiscard(List.class, records -> {
            if(! records.isEmpty()) {
                LOGGER.error("Discarded: {}", records);
                discardedRecords.put(records, new Object());
            } else {
                LOGGER.error("Empty record received. This should never happen.");
            }
        })
        .map(record -> Collections.unmodifiableList(Enricher.enrich(record)))
        .map(dbRecords -> RecordTransformer.transform(dbRecords))
        .retryBackoff(MAX_RETRY, Duration.ofSeconds(FIRST_BACKOFF_IN_SECONDS), Duration.ofSeconds(MAX_BACKOFF_IN_SECONDS))
        .publishOn(Schedulers.single());
...