Как подать заявку Spark Streaming - PullRequest
0 голосов
/ 17 мая 2019

Я новичок в Spark и не слишком разбираюсь в этом. Я работаю над приложением, в котором данные передаются по другой теме Кафки-2, и Spark Streaming читает данные из этой темы. Это проект SpringBoot, и у меня есть 3 потребительских класса Spark. Работа этих классов SparkStreaming состоит в том, чтобы использовать данные из темы Kafka и отправлять их в другую тему. Код класса SparkStreaming ниже-

    @Service
public class EnrichEventSparkConsumer {

    Collection<String> topics = Arrays.asList("eventTopic");

    public void startEnrichEventConsumer(JavaStreamingContext javaStreamingContext) {

        Map<String, Object> kafkaParams = new HashedMap();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", true);


        JavaInputDStream<ConsumerRecord<String, String>> enrichEventRDD = KafkaUtils.createDirectStream(javaStreamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

        JavaDStream<String> enrichEventDStream = enrichEventRDD.map((x) -> x.value());
        JavaDStream<EnrichEventDataModel> enrichDataModelDStream = enrichEventDStream.map(convertIntoEnrichModel);

        enrichDataModelDStream.foreachRDD(rdd1 -> {
            saveDataToElasticSearch(rdd1.collect());
        });

        enrichDataModelDStream.foreachRDD(enrichDataModelRdd -> {
            if(enrichDataModelRdd.count() > 0) {
                if(executor != null) {
                    executor.executePolicy(enrichDataModelRdd.collect());       
                }
            }
        }); 

    }

    static Function convertIntoEnrichModel = new Function<String, EnrichEventDataModel>() {

        @Override
        public EnrichEventDataModel call(String record) throws Exception {
            ObjectMapper mapper = new ObjectMapper();
            EnrichEventDataModel csvDataModel = mapper.readValue(record, EnrichEventDataModel.class);
            return csvDataModel;
        }
    };

    private void saveDataToElasticSearch(List<EnrichEventDataModel> baseDataModelList) {
        for (EnrichEventDataModel baseDataModel : baseDataModelList)
            dataModelServiceImpl.save(baseDataModel);
    }
}

Я вызываю метод startEnrichEventConsumer () с помощью CommandLineRunner.

public class EnrichEventSparkConsumerRunner implements CommandLineRunner {

    @Autowired
    JavaStreamingContext javaStreamingContext;

    @Autowired
    EnrichEventSparkConsumer enrichEventSparkConsumer;

    @Override
    public void run(String... args) throws Exception {
        //start Raw Event Spark Cosnumer.
        JobContextImpl jobContext = new JobContextImpl(javaStreamingContext);

        //start Enrich Event Spark Consumer.
        enrichEventSparkConsumer.startEnrichEventConsumer(jobContext.streamingctx());
    }

}

Теперь я хочу отправить эти три класса Spark Streaming в кластер. Я где-то читал, что сначала мне нужно создать Jar-файл, затем после него я могу использовать команду Spark-submit, но у меня есть несколько вопросов в уме -

  1. Должен ли я создать другой проект с этими 3 классами Spark Streaming?
  2. На данный момент я использую CommandLineRunner, чтобы инициировать SparkStreaming, а затем при отправке кластера, должен ли я создать метод main () в этом классе?

Пожалуйста, скажите мне, как это сделать. Заранее спасибо.

1 Ответ

0 голосов
/ 18 мая 2019
  • Нет необходимости в другом проекте.
  • Вы должны создать точку входа / main, которая отвечает за создание JavaStreamingContext.
  • Создайте свой jar с зависимостями, зависимостями в одном отдельном файле jar, не забудьте поставить предоставленную область видимости для всех ваших зависимостей spark, так как вы будете использовать библиотеки кластера.

При выполнении собранного приложения Spark используется приложение командной строки spark-submit следующим образом:

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

Для локальной отправки

bin/spark-submit \
  --class package.Main \
  --master local[2] \
  path/to/jar argument1 argument2
...