Я работаю над проектом по анализу данных, в котором я читаю данные из файла CSV, просматриваю их по теме Kafka и использую Spark Streaming, чтобы использовать эти данные темы Kafka. Все компоненты, которые я использую в одном проекте.
Теперь, после использования данных с помощью Spark Streaming, мне нужно выполнить некоторые вычисления, сохранить данные в упругом поиске и отправить эти данные по другой теме. Поэтому я делаю эти вещи (сохраняя данные в эластичный и отправляя данные в тему) из Spark Streaming.
Ниже мой код
@Component
public class RawEventSparkConsumer implements Serializable {
@Autowired
private ElasticSearchServiceImpl dataModelServiceImpl;
@Autowired
private EventKafkaProducer enrichEventKafkaProducer;
Collection<String> topics = Arrays.asList("rawTopic");
public void sparkRawEventConsumer(JavaStreamingContext streamingContext) {
Map<String, Object> kafkaParams = new HashedMap();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "group1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
JavaInputDStream<ConsumerRecord<String, String>> rawEventRDD = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaDStream<String> dStream = rawEventRDD.map((x) -> x.value());
JavaDStream<BaseDataModel> baseDataModelDStream = dStream.map(convertIntoBaseModel);
baseDataModelDStream.foreachRDD(rdd1 -> {
saveDataToElasticSearch(rdd1.collect());
});
JavaDStream<EnrichEventDataModel> enrichEventRdd = baseDataModelDStream.map(convertIntoEnrichModel);
enrichEventRdd.foreachRDD(rdd -> {
System.out.println("Inside rawEventRDD.foreachRDD = = = " + rdd.count());
sendEnrichEventToKafkaTopic(rdd.collect());
});
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
static Function convertIntoBaseModel = new Function<String, BaseDataModel>() {
@Override
public BaseDataModel call(String record) throws Exception {
ObjectMapper mapper = new ObjectMapper();
BaseDataModel csvDataModel = mapper.readValue(record, BaseDataModel.class);
return csvDataModel;
}
};
static Function convertIntoEnrichModel = new Function<BaseDataModel, EnrichEventDataModel>() {
@Override
public EnrichEventDataModel call(BaseDataModel csvDataModel) throws Exception {
EnrichEventDataModel enrichEventDataModel = new EnrichEventDataModel(csvDataModel);
enrichEventDataModel.setEnrichedUserName("Enriched User");
User user = new User();
user.setU_email("Nitin.Tyagi");
enrichEventDataModel.setUser(user);
return enrichEventDataModel;
}
};
private void sendEnrichEventToKafkaTopic(List<EnrichEventDataModel> enrichEventDataModels) {
if (enrichEventKafkaProducer != null && enrichEventDataModels != null && enrichEventDataModels.size() > 0)
try {
enrichEventKafkaProducer.sendEnrichEvent(enrichEventDataModels);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void saveDataToElasticSearch(List<BaseDataModel> baseDataModelList) {
if(!baseDataModelList.isEmpty())
dataModelServiceImpl.saveAllBaseModel(baseDataModelList);
}
}
Теперь у меня есть несколько вопросов
1) Хорошо ли подходит мой подход, т.е. сохранять данные в Elastic Search и отправлять их по теме из Spark Streaming?
2) Я использую компоненты приложения (Kafka, Spark Streaming) в одном проекте, и существует несколько классов Spark Streaming. Я запускаю эти классы через CommandLineRunner в моей локальной системе. Итак, как теперь можно представить Spark Streaming в качестве искровой работы?
Для Spark Submit нужно ли создавать отдельный проект с классами Spark Streaming?