Возможно ли использовать динамический прокси в Flink Job? - PullRequest
0 голосов
/ 15 мая 2019

когда я пытался использовать динамический прокси в задании Flink, я столкнулся с исключением ClassNotFound, это невозможно? если нет, то как я могу это реализовать?

Мне нужно добавить счетчик в моей работе Flink, но я не хочу использовать код трассировки непосредственно в моей основной логике, поскольку он будет влиять на основную логику, поэтому я попытался использовать механизм динамического прокси от Guava ReflectionUtil, но когда я пытаюсь отправить задание, я обнаружил исключение ClassNotFound.

Аннотация:


@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Trace {

    String name();

    boolean isCount() default false;

    int countStep() default 1;

    boolean isDuration() default false;

    boolean isGauge() default false;

    int gaugeValue() default 1;

}

Интерфейс:

public interface TestStorage {

    @Trace(name = "testLoad", isDuration = true)
    public Series testLoad(String id);

}

ProxyHandler:

@Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        Object ret = null;
        if (method.isAnnotationPresent(Trace.class)) {
            Trace trace = method.getAnnotation(Trace.class);
            String counterName = trace.name();

            long start = System.currentTimeMillis();
            try {
                ret = method.invoke(proxy, args);
            } catch (Throwable e) {
                logger.error(e.getMessage(), e);
                traceCounter("Error." + counterName, 1);
            }
            long end = System.currentTimeMillis();

            if (trace.isDuration()) {
                traceDuration(counterName, (end - start));
            }

            if (trace.isCount()) {
                int countStep = trace.countStep();
                traceCounter(counterName, countStep);
            }

            if (trace.isGauge()) {
                int gaugeValue = trace.gaugeValue();
                traceGauge(counterName, gaugeValue);
            }

        } else {
            ret = method.invoke(proxy, args);
        }
        return ret;
    }
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.myjob.test.TestStorage
ClassLoader info: URL ClassLoader:
    file: '/mnt/flink-8084/tmp/blobStore-a02e66eb-d7de-4ddb-bfe5-46b04e2f2842/job_8cf47336ebb3020b11171465a8d9d52d/blob_p-9b827ed989b89d0c906bd37327ff197fdb285816-d3cfe2a00a6815a7672fc5e6b5d4d946' (valid JAR)
Class was actually found in classloader - deserialization issue.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.myjob.test.TestStorage
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.io.ObjectInputStream.resolveProxyClass(ObjectInputStream.java:758)
    at java.io.ObjectInputStream.readProxyDesc(ObjectInputStream.java:1800)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1748)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:524)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:510)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:498)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:459)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...