Как лучше всего обрабатывать ошибки базы данных и сети внутри пакетов в kafka + spark streaming? - PullRequest
0 голосов
/ 20 ноября 2018

Я пытаюсь убедиться, что все события либо записаны в базу данных, если они приняты схемой, либо зарегистрированы как ошибки, не должно быть никаких событий, полностью потерянных из-за какой-либо базы данных, сетевых ошибок и т. Д.

Это то, что я имею до сих пор.Кажется неправильным создавать новый RDD для каждой строки, чтобы повторить их по отдельности, но я не вижу другого способа использования SQLContext.

  JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

    JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    new JavaStreamingContext(javaSparkContext, Durations.seconds(queryInterval)),
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.Subscribe(topics(), kafkaParameters())
            );

    SQLContext sqlContext = SQLContext.getOrCreate(javaSparkContext.sc());

    stream.map(cr->row)
            .foreachRDD(rowRDD -> {
                try {
                    // try the batch
                    Dataset<Row> rowDataset = sqlContext.createDataFrame(rowRDD.rdd(), someStructType);
                    rowDataset
                            .write()
                            .mode(SaveMode.Append)
                            .jdbc(jdbcUrl(), jdbcTable(), jdbcConnectionProperties());
                    /*
                        Need to change to only Database Exceptions,
                        other exceptions will cause the event to fail, hence should retry from kafka
                    */
                } catch (Exception e1){
                    // try individually
                    rowRDD.foreach(row->{

                        // create RDD for each row, could be slow...
                        JavaRDD javaRDD = javaSparkContext.parallelize(Arrays.asList(row));
                        // try write for each RDD now containing only one item

                        int attempts = 0;
                        while (attempts<5){
                            attempts++;
                            try {
                                Dataset<Row> rowDataset = sqlContext.createDataFrame(javaRDD, someStructType);
                                rowDataset
                                        .write()
                                        .mode(SaveMode.Append)
                                        .jdbc(jdbcUrl(), jdbcTable(), jdbcConnectionProperties());
                            /*
                                Need to change to only Database Exceptions,
                                if non database error i.e. network error need to retry here,
                                as should not retry the whole batch
                             */
                            } catch (Exception e2){
                                LOG.warn("Event could not be written to database: ",e2);
                                break;
                            }
                            try {
                                // wait 1 second before retrying
                                Thread.sleep(1000);
                            } catch (InterruptedException e){}
                        }
                    });
                }
            });
...