Я новичок в Spark и не слишком разбираюсь в этом. Я работаю над приложением, в котором данные передаются по другой теме Кафки-2, и Spark Streaming читает данные из этой темы. Это проект SpringBoot, и у меня есть 3 потребительских класса Spark. Работа этих классов SparkStreaming состоит в том, чтобы использовать данные из темы Kafka и отправлять их в другую тему. Код класса SparkStreaming ниже-
@Service
public class EnrichEventSparkConsumer {
Collection<String> topics = Arrays.asList("eventTopic");
public void startEnrichEventConsumer(JavaStreamingContext javaStreamingContext) {
Map<String, Object> kafkaParams = new HashedMap();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "group1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
JavaInputDStream<ConsumerRecord<String, String>> enrichEventRDD = KafkaUtils.createDirectStream(javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaDStream<String> enrichEventDStream = enrichEventRDD.map((x) -> x.value());
JavaDStream<EnrichEventDataModel> enrichDataModelDStream = enrichEventDStream.map(convertIntoEnrichModel);
enrichDataModelDStream.foreachRDD(rdd1 -> {
saveDataToElasticSearch(rdd1.collect());
});
enrichDataModelDStream.foreachRDD(enrichDataModelRdd -> {
if(enrichDataModelRdd.count() > 0) {
if(executor != null) {
executor.executePolicy(enrichDataModelRdd.collect());
}
}
});
}
static Function convertIntoEnrichModel = new Function<String, EnrichEventDataModel>() {
@Override
public EnrichEventDataModel call(String record) throws Exception {
ObjectMapper mapper = new ObjectMapper();
EnrichEventDataModel csvDataModel = mapper.readValue(record, EnrichEventDataModel.class);
return csvDataModel;
}
};
private void saveDataToElasticSearch(List<EnrichEventDataModel> baseDataModelList) {
for (EnrichEventDataModel baseDataModel : baseDataModelList)
dataModelServiceImpl.save(baseDataModel);
}
}
Я вызываю метод startEnrichEventConsumer () с помощью CommandLineRunner.
public class EnrichEventSparkConsumerRunner implements CommandLineRunner {
@Autowired
JavaStreamingContext javaStreamingContext;
@Autowired
EnrichEventSparkConsumer enrichEventSparkConsumer;
@Override
public void run(String... args) throws Exception {
//start Raw Event Spark Cosnumer.
JobContextImpl jobContext = new JobContextImpl(javaStreamingContext);
//start Enrich Event Spark Consumer.
enrichEventSparkConsumer.startEnrichEventConsumer(jobContext.streamingctx());
}
}
Теперь я хочу отправить эти три класса Spark Streaming в кластер. Я где-то читал, что сначала мне нужно создать Jar-файл, затем после него я могу использовать команду Spark-submit, но у меня есть несколько вопросов в уме -
- Должен ли я создать другой проект с этими 3 классами Spark Streaming?
- На данный момент я использую CommandLineRunner, чтобы инициировать SparkStreaming, а затем при отправке кластера, должен ли я создать метод main () в этом классе?
Пожалуйста, скажите мне, как это сделать. Заранее спасибо.