Apache Flink: Как вызвать один поток из другого потока - PullRequest
1 голос
/ 19 июня 2020

Мой сценарий таков: я хочу вызвать один поток на основе ввода другого потока. Оба типа потока разные. Ниже приведен мой пример кода. Я хочу запускать один поток, когда какое-либо сообщение получено из потока Kafka.

При запуске приложения я могу читать данные из БД. Опять же, я хочу получить данные из БД на основе какого-то сообщения kafka. Когда я получаю сообщение kafka в потоке, я хочу снова получить данные из БД. Это мой реальный вариант использования.

Как этого добиться? Возможно ли это?



public class DataStreamCassandraExample implements Serializable{

   private static final long serialVersionUID = 1L;

   static Logger LOG = LoggerFactory.getLogger(DataStreamCassandraExample.class);

   private transient static StreamExecutionEnvironment env;
    static DataStream<Tuple4<UUID,String,String,String>> inputRecords;

        public static void main(String[] args) throws Exception {
             env = StreamExecutionEnvironment.getExecutionEnvironment();

            ParameterTool argParameters = ParameterTool.fromArgs(args);
            env.getConfig().setGlobalJobParameters(argParameters);

               Properties kafkaProps = new Properties();
               kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
               kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1");

               FlinkKafkaConsumer<String> kafkaConsumer =  new FlinkKafkaConsumer<>("testtopic", new SimpleStringSchema(), kafkaProps);


               ClusterBuilder cb = new ClusterBuilder() {

               private static final long serialVersionUID = 1L;

                   @Override
                   public Cluster buildCluster(Cluster.Builder builder) {
                       return builder.addContactPoint("127.0.0.1")
                               .withPort(9042)
                               .withoutJMXReporting()
                               .build();
                   }
               };

               CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat =
                       new CassandraInputFormat<> ("select * from employee_details", cb);

               //While Application is start up , Read data from table and send as stream
               inputRecords = getDBData(env,cassandraInputFormat);

               // If any data comes from kafka means, again i want to get data from table.
               //How to i trigger getDBData() method from inside this stream.
               //The below code is not working
               DataStream<String> inputRecords1= env.addSource(kafkaConsumer)
                           .map(new MapFunction<String,String>() {
                               private static final long serialVersionUID = 1L;

                               @Override
                               public String map(String value) throws Exception {
                                   inputRecords =  getDBData(env,cassandraInputFormat);
                                   return "OK";
                               }
                           });

               //This is not printed , when i call getDBData() stream from inside the kafka stream.
               inputRecords1.print();


                DataStream<Employee> empDataStream = inputRecords.map(new MapFunction<Tuple4<UUID,String,String,String>, Tuple2<String,Employee>>() {
                       private static final long serialVersionUID = 1L;

                       @Override
                       public Tuple2<String, Employee> map(Tuple4<UUID,String,String,String> value) throws Exception {
                           Employee emp = new Employee();
                           try{
                           emp.setEmpid(value.f0);
                           emp.setFirstname(value.f1);
                           emp.setLastname(value.f2);
                           emp.setAddress(value.f3);

                           }
                           catch(Exception e){
                           }

                           return new Tuple2<>(emp.getEmpid().toString(), emp);
                       }
                   }).keyBy(0).map(new MapFunction<Tuple2<String,Employee>,Employee>() {

                       private static final long serialVersionUID = 1L;

                       @Override
                       public Employee map(Tuple2<String, Employee> value)
                               throws Exception {
                           return value.f1;
                       }   


                   });

             empDataStream.print();

                env.execute();
        }


        private static  DataStream<Tuple4<UUID,String,String,String>> getDBData(StreamExecutionEnvironment env,
                                                                   CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat){

            DataStream<Tuple4<UUID,String,String,String>> inputRecords = env
                    .createInput
                    (cassandraInputFormat   
                    ,TupleTypeInfo.of(new TypeHint<Tuple4<UUID,String,String,String>>() {}));
           return inputRecords;

        }          
}



Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...