Мой сценарий таков: я хочу вызвать один поток на основе ввода другого потока. Оба типа потока разные. Ниже приведен мой пример кода. Я хочу запускать один поток, когда какое-либо сообщение получено из потока Kafka.
При запуске приложения я могу читать данные из БД. Опять же, я хочу получить данные из БД на основе какого-то сообщения kafka. Когда я получаю сообщение kafka в потоке, я хочу снова получить данные из БД. Это мой реальный вариант использования.
Как этого добиться? Возможно ли это?
public class DataStreamCassandraExample implements Serializable{
private static final long serialVersionUID = 1L;
static Logger LOG = LoggerFactory.getLogger(DataStreamCassandraExample.class);
private transient static StreamExecutionEnvironment env;
static DataStream<Tuple4<UUID,String,String,String>> inputRecords;
public static void main(String[] args) throws Exception {
env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool argParameters = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(argParameters);
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("testtopic", new SimpleStringSchema(), kafkaProps);
ClusterBuilder cb = new ClusterBuilder() {
private static final long serialVersionUID = 1L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1")
.withPort(9042)
.withoutJMXReporting()
.build();
}
};
CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat =
new CassandraInputFormat<> ("select * from employee_details", cb);
//While Application is start up , Read data from table and send as stream
inputRecords = getDBData(env,cassandraInputFormat);
// If any data comes from kafka means, again i want to get data from table.
//How to i trigger getDBData() method from inside this stream.
//The below code is not working
DataStream<String> inputRecords1= env.addSource(kafkaConsumer)
.map(new MapFunction<String,String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
inputRecords = getDBData(env,cassandraInputFormat);
return "OK";
}
});
//This is not printed , when i call getDBData() stream from inside the kafka stream.
inputRecords1.print();
DataStream<Employee> empDataStream = inputRecords.map(new MapFunction<Tuple4<UUID,String,String,String>, Tuple2<String,Employee>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Employee> map(Tuple4<UUID,String,String,String> value) throws Exception {
Employee emp = new Employee();
try{
emp.setEmpid(value.f0);
emp.setFirstname(value.f1);
emp.setLastname(value.f2);
emp.setAddress(value.f3);
}
catch(Exception e){
}
return new Tuple2<>(emp.getEmpid().toString(), emp);
}
}).keyBy(0).map(new MapFunction<Tuple2<String,Employee>,Employee>() {
private static final long serialVersionUID = 1L;
@Override
public Employee map(Tuple2<String, Employee> value)
throws Exception {
return value.f1;
}
});
empDataStream.print();
env.execute();
}
private static DataStream<Tuple4<UUID,String,String,String>> getDBData(StreamExecutionEnvironment env,
CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat){
DataStream<Tuple4<UUID,String,String,String>> inputRecords = env
.createInput
(cassandraInputFormat
,TupleTypeInfo.of(new TypeHint<Tuple4<UUID,String,String,String>>() {}));
return inputRecords;
}
}