Spark Kafka Direct Stream не использует вновь созданные сообщения (Saprk2.3.1 kafak10) - PullRequest
0 голосов
/ 22 сентября 2018

При запуске My spark Kafka Direct Stream будут использоваться только те сообщения, которые были созданы до его запуска.Любые сообщения, созданные после запуска задания, не будут использованы до тех пор, пока задание не будет перезапущено, хотя я вижу эти вновь созданные сообщения в Kafka.открытый класс Test {

private static Logger logger = Logger.getLogger(Test.class.getName());

public Test() {
}

/**
 * DOCUMENT ME!
 *
 * @param  args
 */
public static void main(final String[] args) {
    if ((args != null) && (args.length > 0) && (args[0] != null)) {
        final SparkJobConfig sparkJobConfig = PcsSparkJobUtil.loadJobConfig(
                args[0]);

        final SparkConf sparkConf = new SparkConf();

        SparkJobUtils.initializeSparkConf(sparkJobConfig, sparkConf, sparkJobConfig.getSparkJobName());

        final JavaStreamingContext jssc = new JavaStreamingContext(
                sparkConf, new Duration(sparkJobConfig.getSparkjobStreamingKafkaPollingDuration()));
        final Map<String, Integer> topicMap = new HashMap<>();

        final CassandraConnector cassandraConnector = CassandraConnector.apply(sparkConf);
        final Session session = cassandraConnector.openSession();
        session.execute("USE " + sparkJobConfig.getCassandraKeyspace());


        HashMap<TopicAndPartition, Long> kafkaTopicPartition = new HashMap<TopicAndPartition, Long>();

        Map<String, String> kafkaParamMap = KafkaInternalUtils.getConsumerProperties(sparkJobConfig.getElasticSearchConfig().getEsKafkaProducerUrl(),
                sparkJobConfig.getElasticSearchConfig().getPushToESKafkaTopic(), sparkJobConfig.getElasticSearchConfig().getEsKafkaConsumerStartFromSmallestOffset(), sparkJobConfig);
        Set<String> topicSet = KafkaInternalUtils.getTopicSet(sparkJobConfig.getElasticSearchConfig().getPushToESKafkaTopic());


        JavaDStream<String> listingIdsDStream;
        List<KafkaEventLog> offsetsListCassandra = KafkaInternalUtils.getDataFromKafkaLogTable(session);

        /**
         * If No Offsets Create Fresh Stream
         */
        if(offsetsListCassandra == null || offsetsListCassandra.isEmpty()){
            listingIdsDStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
                    StringDecoder.class, kafkaParamMap, topicSet).transformToPair(stringStringJavaPairRDD -> {
                if(!stringStringJavaPairRDD.isEmpty()){
                    final OffsetRange[] offsets = ((HasOffsetRanges) stringStringJavaPairRDD.rdd()).offsetRanges();
                    writeOffsetsFormingSession(sparkJobConfig, offsets);
                }
                return stringStringJavaPairRDD;
            }).map(stringStringTuple2 -> stringStringTuple2._2());

        }else{

            /**
             * Create Stream From Offsets
             */
            for (KafkaEventLog eventLog : offsetsListCassandra) {
                kafkaTopicPartition.put(new TopicAndPartition(sparkJobConfig.getElasticSearchConfig().getPushToESKafkaTopic(),
                                Integer.parseInt(eventLog.getPartition_number())),
                        Long.parseLong(eventLog.getSet_from_offset()));
            }
            JavaInputDStream<String> stream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
                    String.class, kafkaParamMap, kafkaTopicPartition, (messageAndMetadata) -> messageAndMetadata.message());
            listingIdsDStream = stream.transform(rdd -> {
                if(!rdd.isEmpty()){
                    final OffsetRange[] offsetsForThisBatch = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                    writeOffsetsFormingSession(sparkJobConfig, offsetsForThisBatch);
                }
                return rdd;
            });

        }

        listingIdsDStream.foreachRDD(stringJavaRDD1 -> {
            stringJavaRDD1.foreachPartition(stringIterator -> {
                while(stringIterator.hasNext()) {
                    String val = stringIterator.next();
                    System.out.println("Danish Vale:-" + val);
                    logger.info("Danish Vale:-" + val);
                }
            });
        });

        jssc.start();
        try {
            jssc.awaitTermination();
        }catch (Exception e){
            logger.error("Exception termination job ", e);
        }

    }

}


private static void writeOffset(Session session, final OffsetRange[] offsets) {
    logger.info("Danish Offsets written");
    for (OffsetRange offsetRange : offsets) {
        KafkaEventLog eventLog = new KafkaEventLog();
        eventLog.setTopic_name(String.valueOf(offsetRange.topic()));
        eventLog.setPartition_number(String.valueOf(offsetRange.partition()));
        eventLog.setSet_from_offset(String.valueOf(offsetRange.fromOffset()));
        eventLog.setSet_until_offset(String.valueOf(offsetRange.untilOffset()));
        eventLog.setInsert_timestamp(new java.sql.Date(new Date().getTime()));
        eventLog.setInserted_by("ESKafkaPush");
        KafkaInternalUtils.insertIntoKafkaEventLogTable(eventLog, session);
    }
}


public static void writeOffsetsFormingSession(SparkJobConfig sparkJobConfig, OffsetRange[] offsets){
    Session session1 = null;
    try {
        session1 = SparkJobUtils.getCassandraSession(sparkJobConfig);
        writeOffset(session1, offsets);
    }catch (Exception e){
        logger.error("Exception processing kafka offsets", e);
    }finally {
        if(session1 != null){
            session1.close();
            session1.getCluster().close();
        }
    }
}

}

Это мой метод создания KafkaConsumer.

По иронии судьбы все это будет работать нормально при локальном запуске. Но когда я разверну его в своем Spark Mesos Cluster, он будет использовать только ранее созданные сообщения.

...