Данные передавались из Кафки в Постгрес и пропускались через несколько секунд - PullRequest
0 голосов
/ 30 января 2019

Я пытаюсь сохранить данные с локального экземпляра Kafka на локальный Postgres с помощью Spark Streaming.Я настроил все соединения и параметры, и данные фактически попадают в базу данных.Тем не менее, это там только на пару секунд.После этого стол просто становится пустым.Если я остановлю приложение, как только в Postgres появятся данные, данные сохранятся, поэтому я полагаю, что пропустил какой-то параметр для потоковой передачи в Spark или что-то в конфигурационных файлах Kafka.Код написан на Java, а не на Scala, поэтому вместо DataFrame есть набор данных.

Я попытался установить для spark.driver.allowMultipleContexts значение true, но это никак не связано с контекстом.Когда я запускаю счетчик для базы данных с полной потоковой передачей данных в фоновом режиме, всегда есть около 1700 записей, что означает, что может быть какой-то параметр для размера пакета.

censusRecordJavaDStream.map(e -> {
    Row row = RowFactory.create(e.getAllValues());
    return row;
}).foreachRDD(rdd -> {
    Dataset<Row> censusDataSet = spark.createDataFrame(rdd, CensusRecord.getStructType());

    censusDataSet
            .write()
            .mode(SaveMode.Overwrite)
            .jdbc("jdbc:postgresql:postgres", "census.census", connectionProperties);
});

Моя цель - передавать данные из Kafkaи сохранить его в Postgre.Каждая запись имеет уникальный идентификатор, который используется в качестве ключа в Kafka, поэтому не должно быть конфликтов относительно первичного ключа или двойных записей.Для текущего тестирования я использую небольшое подмножество из примерно 100 записей;полный набор данных превышает 300 МБ.

...