У меня есть работающий зоопарк + кафка, и я успешно отправляю твиты производителю кафки. Твиты взяты из очереди:
queue = new LinkedBlockingQueue<>(10000);
public void run() {
client.connect();
try (Producer<Long, String> producer = getProducer()) {
while (true) {
Tweet tweet = gson.fromJson(queue.take(), Tweet.class);
System.out.printf("Fetched tweet id %d\n", tweet.getId());
long key = tweet.getId();
String msg = tweet.toString();
ProducerRecord<Long, String> record = new ProducerRecord<>(KafkaConfiguration.TOPIC, key, msg);
producer.send(record, callback);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
client.stop();
}
Мой вопрос: как записать уже полученный объект (класс Tweet) в MongoDB? У меня есть настройки конфигурации на моем локальном хосте:
//MongoDB config
int port_no = 27017;
String host_name = "localhost", db_name = "bigdata", db_coll_name = "twitter";
// Mongodb connection string.
String client_url = "mongodb://" + host_name + ":" + port_no + "/" + db_name;
MongoClientURI uri = new MongoClientURI(client_url);
// Connecting to the mongodb server using the given client uri.
MongoClient mongo_client = new MongoClient(uri);
// Fetching the database from the mongodb.
MongoDatabase db = mongo_client.getDatabase(db_name);
// Fetching the collection from the mongodb.
MongoCollection<Document> coll = db.getCollection(db_coll_name);
Есть ли способ десериализации с помощью JSON? Любые предложения будут ценны. Заранее спасибо.