Я пытаюсь сохранить данные с локального экземпляра Kafka на локальный Postgres с помощью Spark Streaming.Я настроил все соединения и параметры, и данные фактически попадают в базу данных.Тем не менее, это там только на пару секунд.После этого стол просто становится пустым.Если я остановлю приложение, как только в Postgres появятся данные, данные сохранятся, поэтому я полагаю, что пропустил какой-то параметр для потоковой передачи в Spark или что-то в конфигурационных файлах Kafka.Код написан на Java, а не на Scala, поэтому вместо DataFrame есть набор данных.
Я попытался установить для spark.driver.allowMultipleContexts
значение true, но это никак не связано с контекстом.Когда я запускаю счетчик для базы данных с полной потоковой передачей данных в фоновом режиме, всегда есть около 1700 записей, что означает, что может быть какой-то параметр для размера пакета.
censusRecordJavaDStream.map(e -> {
Row row = RowFactory.create(e.getAllValues());
return row;
}).foreachRDD(rdd -> {
Dataset<Row> censusDataSet = spark.createDataFrame(rdd, CensusRecord.getStructType());
censusDataSet
.write()
.mode(SaveMode.Overwrite)
.jdbc("jdbc:postgresql:postgres", "census.census", connectionProperties);
});
Моя цель - передавать данные из Kafkaи сохранить его в Postgre.Каждая запись имеет уникальный идентификатор, который используется в качестве ключа в Kafka, поэтому не должно быть конфликтов относительно первичного ключа или двойных записей.Для текущего тестирования я использую небольшое подмножество из примерно 100 записей;полный набор данных превышает 300 МБ.