Невозможно найти класс: org.apache.beam.runners.core.metrics.MetricsContainerImpl $$ Lambda $ 31/1929817005 - PullRequest
0 голосов
/ 17 октября 2019

Я создал очень простое приложение для потоковой передачи луча, которое может читать темы кафки и записывать полученные данные в другую тему кафки. Приложение было упаковано с использованием spark-runner и развернуто в spark2 на Cloudera CDH. Но было выброшено следующее исключение:

19/10/17 18:39:25 ERROR scheduler.TaskResultGetter: Exception while getting task result
com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.beam.runners.core.metrics.MetricsContainerImpl$$Lambda$31/1929817005
Serialization trace:
factory (org.apache.beam.runners.core.metrics.MetricsMap)
counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
metricsContainers (org.apache.beam.runners.core.metrics.MetricsContainerStepMap)
metricsContainers (org.apache.beam.runners.spark.io.SparkUnboundedSource$Metadata)
...

Версия Spark2 - 2.3.0.cloudera3, работает на CDH (5.13.0, Parcels). Версия Java: 1.8.0_161 Версия ОС: Red Hat Enterprise Linux Server версии 7.2 (Maipo)

Приложение основано на официальном примере приложения луча (2.16.0), поэтому в нем используется искровая версия. 2.4.4 и hadoop 2.7.3.

Демонстрационное приложение было отправлено в кластер следующей строкой:

spark2-submit --class org.apache.beam.examples.kafka.KafkaDemo --master yarn --deploy-mode cluster kafka-beam-demo-bundled-0.2.0.jar --runner=SparkRunner --bootstrap=n1:9092,n2:9092,n3:9092

Кроме того, пример луча - WordCount может работать наCDH плавно (не потоковый режим).

Я проверил затененный пакет, заявленный класс не существует в пакете, лямбда-класс не найден в пути к классам.

Есть идеи? Как выяснить эту проблему? Если требуется подробный журнал работ на CDH, его можно разместить здесь (хотя и по 251 КБ).


Исходный код вставлен здесь:

package org.apache.beam.examples.kafka;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.common.collect.ImmutableMap;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaDemo {
  static final Logger LOG = LoggerFactory.getLogger(KafkaDemo.class);

  public interface KafkaDemoOptions
      extends PipelineOptions, StreamingOptions {

    @Description("Kafka bootstrap servers")
    @Default.String("")  
    String getBootstrap();
    void setBootstrap(String value);

    @Description("Kafka group.id")
    @Default.String("test.group")
    String getGroupId();
    void setGroupId(String value);

    @Description("Kafka input topic for test")
    @Default.String("test_sample_data")
    String getInputTopic();
    void setInputTopic(String value);

    @Description("Kafka output topic for test")
    @Default.String("test_output_data")
    String getOutputTopic();
    void setOutputTopic(String value);
  }

  public static void main(String[] args) {
    KafkaDemoOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaDemoOptions.class);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> strings = pipeline
        .apply(KafkaIO.<Long, String>read()
            .withBootstrapServers(options.getBootstrap())
            .withTopic(options
                .getInputTopic())  // use withTopics(List<String>) to read from multiple topics.
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)

            // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>

            // Rest of the settings are optional :

            // you can further customize KafkaConsumer used to read the records by adding more
            // settings for ConsumerConfig. e.g :
            .withConsumerConfigUpdates(ImmutableMap.of("group.id", options.getGroupId()))

            // set event times and watermark based on 'LogAppendTime'. To provide a custom
            // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
            // Use withCreateTime() with topics that have 'CreateTime' timestamps.
//            .withLogAppendTime()

            // restrict reader to committed messages on Kafka (see method documentation).
            .withReadCommitted()

            // offset consumed by the pipeline can be committed back.
            .commitOffsetsInFinalize()

            // finally, if you don't need Kafka metadata, you can drop it.g
            .withoutMetadata() // PCollection<KV<Long, String>>
        )
        .apply(Values.create());// PCollection<String>

    strings.apply(KafkaIO.<Void, String>write()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getOutputTopic())
        .withValueSerializer(StringSerializer.class) // just need serializer for value
        .values()
    );

    System.out.println("input topic = " + options.getInputTopic() + ", output topic: " + options.getOutputTopic());
    LOG.info("input topic = " + options.getInputTopic() + ", output topic: " + options.getOutputTopic());

    pipeline.run().waitUntilFinish();
  }

}
...