Как записать данные из очереди в MongoDB - PullRequest
1 голос
/ 29 мая 2019

У меня есть работающий зоопарк + кафка, и я успешно отправляю твиты производителю кафки. Твиты взяты из очереди:

queue = new LinkedBlockingQueue<>(10000);
public void run() {
        client.connect();
        try (Producer<Long, String> producer = getProducer()) {
            while (true) {
                Tweet tweet = gson.fromJson(queue.take(), Tweet.class);
                System.out.printf("Fetched tweet id %d\n", tweet.getId());
                long key = tweet.getId();
                String msg = tweet.toString();
                ProducerRecord<Long, String> record = new ProducerRecord<>(KafkaConfiguration.TOPIC, key, msg);
                producer.send(record, callback);


            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            client.stop();
        }

Мой вопрос: как записать уже полученный объект (класс Tweet) в MongoDB? У меня есть настройки конфигурации на моем локальном хосте:

//MongoDB config
    int port_no = 27017;
    String host_name = "localhost", db_name = "bigdata", db_coll_name = "twitter";

    // Mongodb connection string.
    String client_url = "mongodb://" + host_name + ":" + port_no + "/" + db_name;
    MongoClientURI uri = new MongoClientURI(client_url);

    // Connecting to the mongodb server using the given client uri.
    MongoClient mongo_client = new MongoClient(uri);

    // Fetching the database from the mongodb.
    MongoDatabase db = mongo_client.getDatabase(db_name);

    // Fetching the collection from the mongodb.
    MongoCollection<Document> coll = db.getCollection(db_coll_name);

Есть ли способ десериализации с помощью JSON? Любые предложения будут ценны. Заранее спасибо.

1 Ответ

0 голосов
/ 29 мая 2019

Я бы предложил использовать Kafka Connect MongoDB Sink Connector для передачи данных из Kafka в MongoDB

Пример конфигурации для JSON со схемой:

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

Если вы используете Confluent Hub , инструкции по установке разъема можно найти здесь .

...