Я запускаю потоковое задание на кластере Flink 1.9.1 и пытаюсь передать гистограмму значений в наш сборщик Prometheus metri c. В соответствии с рекомендациями в документах Flink я использовал реализацию гистограммы Dropwizard с предоставленной Flink оболочкой, однако при отправке задания в кластер происходит сбой со следующей трассировкой:
java.lang.LinkageError: loader constraint violation: when resolving method "org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper.<init>(Lcom/codahale/metrics/Histogram;)V" the class loader (instance of org/apache/flink/util/ChildFirstClassLoader) of the current class, com/example/foo/metrics/FooMeter, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper, have different Class objects for the type com/codahale/metrics/Histogram used in the signature
at com.example.foo.metrics.FooMeter.<init>(FooMeter.scala:11)
at com.example.foo.transform.ValidFoos$.open(ValidFoos.scala:15)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Я обнаружил аналогичную ошибку в список рассылки , однако использование плагина shadowJar
в Gradle не помогло.
Что-то мне не хватает?
Соответствующий код здесь:
import com.codahale.metrics.{Histogram, SlidingWindowReservoir}
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
import org.apache.flink.metrics.{MetricGroup, Histogram => FlinkHistogram}
class FooMeter(metricGroup: MetricGroup, name: String) {
private var histogram: FlinkHistogram = metricGroup.histogram(
name, new DropwizardHistogramWrapper(new Histogram(new SlidingWindowReservoir(500))))
def record(fooValue: Long): Unit = {
histogram.update(fooValue)
}
}
object ValidFoos extends RichFlatMapFunction[Try[FooData], Foo] {
@transient private var fooMeter: FooMeter = _
override def open(parameters: Configuration): Unit = {
fooMeter = new FooMeter(getRuntimeContext.getMetricGroup, "foo_values")
}
override def flatMap(value: Try[FooData], out: Collector[FooData]): Unit = {
Transform.validFoo(value) foreach (foo => {
fooMeter.record(foo.value)
out.collect(foo)
})
}
}
build.gradle:
plugins {
id 'scala'
id 'application'
id 'com.github.johnrengelman.shadow' version '2.0.4'
}
ext {
flinkVersion = "1.9.1"
scalaBinaryVersion = "2.11"
scalaVersion = "2.11.12"
}
dependencies {
implementation(
"org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}",
"org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}",
"org.apache.flink:flink-runtime-web_${scalaBinaryVersion}:${flinkVersion}",
"org.apache.flink:flink-json:${flinkVersion}"
"org.apache.flink:flink-metrics-dropwizard:${flinkVersion}",
"org.scala-lang:scala-library:${scalaVersion}",
)
}
shadowJar {
relocate("org.apache.flink.dropwizard", "com.example.foo.shaded.dropwizard")
relocate("com.codahale", "com.example.foo.shaded.codahale")
}
jar {
zip64 = true
archiveName = rootProject.name + '-all.jar'
manifest {
attributes('Main-Class': 'com.example.foo.Foo')
}
from {
configurations.compileClasspath.collect {
it.isDirectory() ? it : zipTree(it)
}
configurations.runtimeClasspath.collect {
it.isDirectory() ? it : zipTree(it)
}
}
}
Дополнительная информация:
- Выполнение кода локально работает
- Кластер Flink скомпилирован специально со следующей структурой каталогов:
# find /usr/lib/flink/
/usr/lib/flink/
/usr/lib/flink/plugins
/usr/lib/flink/plugins/flink-metrics-influxdb-1.9.1.jar
/usr/lib/flink/plugins/flink-s3-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-graphite-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-prometheus-1.9.1.jar
/usr/lib/flink/plugins/flink-cep_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-python_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-queryable-state-runtime_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-sql-client_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-slf4j-1.9.1.jar
/usr/lib/flink/plugins/flink-state-processor-api_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-oss-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-statsd-1.9.1.jar
/usr/lib/flink/plugins/flink-swift-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-gelly-scala_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-azure-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-datadog-1.9.1.jar
/usr/lib/flink/plugins/flink-shaded-netty-tcnative-dynamic-2.0.25.Final-7.0.jar
/usr/lib/flink/plugins/flink-s3-fs-presto-1.9.1.jar
/usr/lib/flink/plugins/flink-cep-scala_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-gelly_2.11-1.9.1.jar
/usr/lib/flink/lib
/usr/lib/flink/lib/flink-metrics-influxdb-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-graphite-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-prometheus-1.9.1.jar
/usr/lib/flink/lib/flink-table_2.11-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-slf4j-1.9.1.jar
/usr/lib/flink/lib/log4j-1.2.17.jar
/usr/lib/flink/lib/slf4j-log4j12-1.7.15.jar
/usr/lib/flink/lib/flink-metrics-statsd-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-datadog-1.9.1.jar
/usr/lib/flink/lib/flink-table-blink_2.11-1.9.1.jar
/usr/lib/flink/lib/flink-dist_2.11-1.9.1.jar
/usr/lib/flink/bin/...