Так что да, Streams вовсе не был этим, но RxJava является подходящим ответом на мой ответ в моем случае использования.Точнее, тема публикации есть.
Я создал класс, где я инициировал указанный объект публикации, и использую функцию .onNext()
для передачи моей записи Kafka ConsumerRecord субъекту.вот определение:
public static void _initRx(){
RxRecordsList = PublishSubject.create();
RxRecordsList.subscribe(_initRxRecordConsumer());
}
private static Observer<ConsumerRecord<String,String>> _initRxRecordConsumer(){
return new Observer<ConsumerRecord<String,String>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Rx Subscrition : OK");
}
@Override
public void onNext(ConsumerRecord record) {
System.out.println("Message received - Length : "+record.toString().length());
MongoHelper._insertReport(record.value().toString());
}
@Override
public void onError(Throwable e) {
System.out.println("Error: "+ e);
}
@Override
public void onComplete() {
System.out.println("Stream ended");
}
};
}
А вот как я отправляю данные на него:
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
RxHDB_Records.RxRecordsList.onNext(record);
}
}
Я не знаю, хорошо ли это сделано, но это работает так ... это начало.Я понятия не имею, как использовать операторы или как изменить тип даты отсюда.Но все же это начало