Является ли Java 8 Stream решением, которое я ищу? - PullRequest
0 голосов
/ 12 декабря 2018

Я создаю инструмент для потребления данных из Kafka и вставки их в MongoDB, с некоторыми манипуляциями между ними.

Сейчас я делаю:

// Poll during X ms
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
// For each record, insert it into Mongo
for (ConsumerRecord<String, String> record : records) {
  System.out.println("Message of length ["+ record.value().length() +"] received.");
  Tools._insertReport(record.value());
}

Я ищу решение, похожее на динамическую коллекцию, в которой я могу накапливать записи, а метод вставки будет вставлять, а затем удалять записи из кучи по мере их поступления.?Как внутренняя очередь сообщений ..

Является ли Java 8 Streams чем-то подобным?Если нет, то есть ли очевидное решение?

Редактировать 1:

Оба решения кажутся жизнеспособными.Kafka Connector и RxJava, поскольку RxJava больше похож на то, что я искал, я рассмотрю и опубликую результаты своего исследования здесь.Спасибо вам всем.

Ответы [ 3 ]

0 голосов
/ 13 декабря 2018

Является ли Java 8 Streams чем-то вроде этого?

Не совсем, нет.

Очевидное решение - буквально создать фиксированный размер List<ConsumerRecord>, а затем периодически проверять размер этого списка ... Когда он заполнится, зациклитесь и перейдите в Mongo, как если бы вы делали одну запись навремя.

Вполне возможно, что вы используете Kafka Connect, так как он может более разумно управлять исключениями, повторными попытками и преобразованиями сообщений.

0 голосов
/ 14 декабря 2018

Так что да, Streams вовсе не был этим, но RxJava является подходящим ответом на мой ответ в моем случае использования.Точнее, тема публикации есть.

Я создал класс, где я инициировал указанный объект публикации, и использую функцию .onNext() для передачи моей записи Kafka ConsumerRecord субъекту.вот определение:

public static void _initRx(){
        RxRecordsList = PublishSubject.create();
        RxRecordsList.subscribe(_initRxRecordConsumer());
    }
    private static Observer<ConsumerRecord<String,String>> _initRxRecordConsumer(){
        return new Observer<ConsumerRecord<String,String>>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Rx Subscrition : OK");
            }

            @Override
            public void onNext(ConsumerRecord record) {
                System.out.println("Message received - Length : "+record.toString().length());
                MongoHelper._insertReport(record.value().toString());
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Error: "+ e);
            }

            @Override
            public void onComplete() {
                System.out.println("Stream ended");
            }
        };
    }

А вот как я отправляю данные на него:

while(true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
  for (ConsumerRecord<String, String> record : records) {
    RxHDB_Records.RxRecordsList.onNext(record);
  }
}

Я не знаю, хорошо ли это сделано, но это работает так ... это начало.Я понятия не имею, как использовать операторы или как изменить тип даты отсюда.Но все же это начало

0 голосов
/ 13 декабря 2018

Я не знаю, чего именно вы хотите достичь, но для чтения сообщений от Kafka и записи их в MongoDB я предлагаю вам использовать Kafka Connect с разъемом mongoDB!Все очереди создаются с помощью kafka connect без необходимости написания кода.

Вы найдете множество mongodb-разъемов, подходящих для вашего случая, вот два из них:

https://github.com/hpgrahsl/kafka-connect-mongodb/blob/master/README.md

https://docs.lenses.io/connectors/sink/mongo.html

Последний вариант может оказаться удачным выбором в первую очередь, он более прост в использовании и управляет несколькими другими разъемами.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...