У меня потоковая RDD и я применил java POJO к ней, теперь есть inputRDD, она содержит id, .. другие детали. Я хочу groupby / filterby id, тогда каждый rdd должен сохранять в своей собственной БД. Я пробовал и код работает с для цикла, но если это должно произойти в параллельной обработке искры. любая помощь приветствуется.
messages.transform(this::getClass).foreachRDD(inputRDD -> {
List<String> idList = inputRDD.map(ClassObject::getEmpid).distinct().collect();
for (String id : idList){
String EmpName = EmpCache.getEmpNameFor(id).toLowerCase();
inputRDD.filter(f -> f.getEmpid().equals(id));
javaFunctions(inputRDD).writerBuilder(in_EmpName , tableName, mapToRow(agg)).saveToCassandra();
}
}