Как использовать загрузчик классов в потоке flink? - PullRequest
0 голосов
/ 15 марта 2019

Ниже приведен мой потоковый код. Я хочу, чтобы метод map получил метод customer из внешнего jarfile

       public void run() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> ds = env.socketTextStream("localhost",9999);

 URL[] urls = {new URL("file:///Users/django/workspace/test01/target/test01-1.0.jar")};
        URLClassLoader urlClassLoader = new URLClassLoader(urls);
        Class say = urlClassLoader.loadClass("com.fanqizha.FanqizhaMap");


        RichMapFunction parser = (RichMapFunction)say.getConstructor().newInstance();

        SingleOutputStreamOperator map = ds.map(parser);

        map.print();

        env.execute();
    }

но я получаю исключение

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.fanqizha.FanqizhaMap

Моя версия Flink 1.7.1

...