Нарушение ограничения загрузчика при использовании гистограммы Dropwizard metri c in Apache Flink - PullRequest
0 голосов
/ 29 января 2020

Я запускаю потоковое задание на кластере Flink 1.9.1 и пытаюсь передать гистограмму значений в наш сборщик Prometheus metri c. В соответствии с рекомендациями в документах Flink я использовал реализацию гистограммы Dropwizard с предоставленной Flink оболочкой, однако при отправке задания в кластер происходит сбой со следующей трассировкой:

java.lang.LinkageError: loader constraint violation: when resolving method "org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper.<init>(Lcom/codahale/metrics/Histogram;)V" the class loader (instance of org/apache/flink/util/ChildFirstClassLoader) of the current class, com/example/foo/metrics/FooMeter, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper, have different Class objects for the type com/codahale/metrics/Histogram used in the signature
    at com.example.foo.metrics.FooMeter.<init>(FooMeter.scala:11)
    at com.example.foo.transform.ValidFoos$.open(ValidFoos.scala:15)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

Я обнаружил аналогичную ошибку в список рассылки , однако использование плагина shadowJar в Gradle не помогло.

Что-то мне не хватает?

Соответствующий код здесь:

import com.codahale.metrics.{Histogram, SlidingWindowReservoir}
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
import org.apache.flink.metrics.{MetricGroup, Histogram => FlinkHistogram}

class FooMeter(metricGroup: MetricGroup, name: String) {
  private var histogram: FlinkHistogram = metricGroup.histogram(
    name, new DropwizardHistogramWrapper(new Histogram(new SlidingWindowReservoir(500))))

  def record(fooValue: Long): Unit = {
    histogram.update(fooValue)
  }
}

object ValidFoos extends RichFlatMapFunction[Try[FooData], Foo] {
  @transient private var fooMeter: FooMeter = _

  override def open(parameters: Configuration): Unit = {
    fooMeter = new FooMeter(getRuntimeContext.getMetricGroup, "foo_values")
  }

  override def flatMap(value: Try[FooData], out: Collector[FooData]): Unit = {
    Transform.validFoo(value) foreach (foo => {
      fooMeter.record(foo.value)
      out.collect(foo)
    })
  }
}

build.gradle:


plugins {
    id 'scala'
    id 'application'
    id 'com.github.johnrengelman.shadow' version '2.0.4'
}

ext {
    flinkVersion = "1.9.1"
    scalaBinaryVersion = "2.11"
    scalaVersion = "2.11.12"
}

dependencies {
    implementation(
        "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}",
        "org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}",
        "org.apache.flink:flink-runtime-web_${scalaBinaryVersion}:${flinkVersion}",
        "org.apache.flink:flink-json:${flinkVersion}"
        "org.apache.flink:flink-metrics-dropwizard:${flinkVersion}",
        "org.scala-lang:scala-library:${scalaVersion}",
    )
}

shadowJar {
    relocate("org.apache.flink.dropwizard", "com.example.foo.shaded.dropwizard")
    relocate("com.codahale", "com.example.foo.shaded.codahale")
}

jar {
    zip64 = true
    archiveName = rootProject.name + '-all.jar'
    manifest {
        attributes('Main-Class': 'com.example.foo.Foo')
    }
    from {
        configurations.compileClasspath.collect {
            it.isDirectory() ? it : zipTree(it)
        }
        configurations.runtimeClasspath.collect {
            it.isDirectory() ? it : zipTree(it)
        }
    }
}

Дополнительная информация:

  • Выполнение кода локально работает
  • Кластер Flink скомпилирован специально со следующей структурой каталогов:
# find /usr/lib/flink/
/usr/lib/flink/
/usr/lib/flink/plugins
/usr/lib/flink/plugins/flink-metrics-influxdb-1.9.1.jar
/usr/lib/flink/plugins/flink-s3-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-graphite-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-prometheus-1.9.1.jar
/usr/lib/flink/plugins/flink-cep_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-python_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-queryable-state-runtime_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-sql-client_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-slf4j-1.9.1.jar
/usr/lib/flink/plugins/flink-state-processor-api_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-oss-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-statsd-1.9.1.jar
/usr/lib/flink/plugins/flink-swift-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-gelly-scala_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-azure-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-datadog-1.9.1.jar
/usr/lib/flink/plugins/flink-shaded-netty-tcnative-dynamic-2.0.25.Final-7.0.jar
/usr/lib/flink/plugins/flink-s3-fs-presto-1.9.1.jar
/usr/lib/flink/plugins/flink-cep-scala_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-gelly_2.11-1.9.1.jar
/usr/lib/flink/lib
/usr/lib/flink/lib/flink-metrics-influxdb-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-graphite-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-prometheus-1.9.1.jar
/usr/lib/flink/lib/flink-table_2.11-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-slf4j-1.9.1.jar
/usr/lib/flink/lib/log4j-1.2.17.jar
/usr/lib/flink/lib/slf4j-log4j12-1.7.15.jar
/usr/lib/flink/lib/flink-metrics-statsd-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-datadog-1.9.1.jar
/usr/lib/flink/lib/flink-table-blink_2.11-1.9.1.jar
/usr/lib/flink/lib/flink-dist_2.11-1.9.1.jar
/usr/lib/flink/bin/...
...