Работа Spark Streaming, как отправить данные по теме Кафки и сохранить их в Elastic - PullRequest
0 голосов
/ 04 июня 2019

Я работаю над проектом по анализу данных, в котором я читаю данные из файла CSV, просматриваю их по теме Kafka и использую Spark Streaming, чтобы использовать эти данные темы Kafka. Все компоненты, которые я использую в одном проекте.

Теперь, после использования данных с помощью Spark Streaming, мне нужно выполнить некоторые вычисления, сохранить данные в упругом поиске и отправить эти данные по другой теме. Поэтому я делаю эти вещи (сохраняя данные в эластичный и отправляя данные в тему) из Spark Streaming.

Ниже мой код

@Component
public class RawEventSparkConsumer implements Serializable {

    @Autowired
    private ElasticSearchServiceImpl dataModelServiceImpl;

    @Autowired
    private EventKafkaProducer enrichEventKafkaProducer;

    Collection<String> topics = Arrays.asList("rawTopic");

    public void sparkRawEventConsumer(JavaStreamingContext streamingContext) {

        Map<String, Object> kafkaParams = new HashedMap();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", true);

        JavaInputDStream<ConsumerRecord<String, String>> rawEventRDD = KafkaUtils.createDirectStream(streamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

        JavaDStream<String> dStream = rawEventRDD.map((x) -> x.value());

        JavaDStream<BaseDataModel> baseDataModelDStream = dStream.map(convertIntoBaseModel);
        baseDataModelDStream.foreachRDD(rdd1 -> {
            saveDataToElasticSearch(rdd1.collect());
        });

        JavaDStream<EnrichEventDataModel> enrichEventRdd = baseDataModelDStream.map(convertIntoEnrichModel);

        enrichEventRdd.foreachRDD(rdd -> {
            System.out.println("Inside rawEventRDD.foreachRDD = = = " + rdd.count());
            sendEnrichEventToKafkaTopic(rdd.collect());
        });

        streamingContext.start();

        try {
            streamingContext.awaitTermination();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    static Function convertIntoBaseModel = new Function<String, BaseDataModel>() {

        @Override
        public BaseDataModel call(String record) throws Exception {
            ObjectMapper mapper = new ObjectMapper();
            BaseDataModel csvDataModel = mapper.readValue(record, BaseDataModel.class);
            return csvDataModel;
        }
    };

    static Function convertIntoEnrichModel = new Function<BaseDataModel, EnrichEventDataModel>() {

        @Override
        public EnrichEventDataModel call(BaseDataModel csvDataModel) throws Exception {

            EnrichEventDataModel enrichEventDataModel = new EnrichEventDataModel(csvDataModel);
            enrichEventDataModel.setEnrichedUserName("Enriched User");
            User user = new User();
            user.setU_email("Nitin.Tyagi");
            enrichEventDataModel.setUser(user);
            return enrichEventDataModel;
        }
    };

    private void sendEnrichEventToKafkaTopic(List<EnrichEventDataModel> enrichEventDataModels) {
        if (enrichEventKafkaProducer != null && enrichEventDataModels != null && enrichEventDataModels.size() > 0)
            try {
                enrichEventKafkaProducer.sendEnrichEvent(enrichEventDataModels);
            } catch (JsonProcessingException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    }

    private void saveDataToElasticSearch(List<BaseDataModel> baseDataModelList) {
        if(!baseDataModelList.isEmpty())
            dataModelServiceImpl.saveAllBaseModel(baseDataModelList);
    }
}

Теперь у меня есть несколько вопросов

1) Хорошо ли подходит мой подход, т.е. сохранять данные в Elastic Search и отправлять их по теме из Spark Streaming?

2) Я использую компоненты приложения (Kafka, Spark Streaming) в одном проекте, и существует несколько классов Spark Streaming. Я запускаю эти классы через CommandLineRunner в моей локальной системе. Итак, как теперь можно представить Spark Streaming в качестве искровой работы?

Для Spark Submit нужно ли создавать отдельный проект с классами Spark Streaming?

1 Ответ

0 голосов
/ 04 июня 2019

Мой подход хорош, то есть сохранение данных в Elastic Search и отправка их по теме из Spark Streaming?

Думаю, я хотел бы изучить использование библиотек ES-Hadoop Spark.Похоже, что вы только что использовали Elastic Java API напрямую (учитывая, что вы собираете разделы RDD)

Хотя это может работать, оно сильно связано ... Что происходит, когда Elasticsearch не работает из-за обслуживания или иным образом сильно скрыт?Все приложение останавливается?

Альтернативой является разделение логики обработки Kafka на собственное развертывание.Таким образом, вы также можете просто использовать процесс Elasticsearch Kafka Connect для загрузки данных из темы без необходимости писать этот код самостоятельно (API Connect, вероятно, уже является частью вашего работающего кластера Kafka)

существует несколько классов Spark Streaming

Несколько основных классов?Это не должно быть проблемой.Вам нужно дать как один JAR, так и одно имя класса для отправки Spark.Вы можете иметь несколько «точек входа» / основных методов в одном банке.

как отправить Spark Streaming как задание на запуск?

Я не уверен, что понимаю проблему.spark-submit работает для потоковых заданий

Примечание: CSV - один из худших форматов, который вы можете использовать в Kafka, если вы когда-либо планируете изменить типы данных или их порядок, и вы также ожидаете, что тема будет использоваться кем-либоно себя.Даже Elasticsearch предпочел бы, чтобы у вас были json-кодированные полезные нагрузки

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...