Я создал очень простое приложение для потоковой передачи луча, которое может читать темы кафки и записывать полученные данные в другую тему кафки. Приложение было упаковано с использованием spark-runner и развернуто в spark2 на Cloudera CDH. Но было выброшено следующее исключение:
19/10/17 18:39:25 ERROR scheduler.TaskResultGetter: Exception while getting task result
com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.beam.runners.core.metrics.MetricsContainerImpl$$Lambda$31/1929817005
Serialization trace:
factory (org.apache.beam.runners.core.metrics.MetricsMap)
counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
metricsContainers (org.apache.beam.runners.core.metrics.MetricsContainerStepMap)
metricsContainers (org.apache.beam.runners.spark.io.SparkUnboundedSource$Metadata)
...
Версия Spark2 - 2.3.0.cloudera3, работает на CDH (5.13.0, Parcels). Версия Java: 1.8.0_161 Версия ОС: Red Hat Enterprise Linux Server версии 7.2 (Maipo)
Приложение основано на официальном примере приложения луча (2.16.0), поэтому в нем используется искровая версия. 2.4.4 и hadoop 2.7.3.
Демонстрационное приложение было отправлено в кластер следующей строкой:
spark2-submit --class org.apache.beam.examples.kafka.KafkaDemo --master yarn --deploy-mode cluster kafka-beam-demo-bundled-0.2.0.jar --runner=SparkRunner --bootstrap=n1:9092,n2:9092,n3:9092
Кроме того, пример луча - WordCount может работать наCDH плавно (не потоковый режим).
Я проверил затененный пакет, заявленный класс не существует в пакете, лямбда-класс не найден в пути к классам.
Есть идеи? Как выяснить эту проблему? Если требуется подробный журнал работ на CDH, его можно разместить здесь (хотя и по 251 КБ).
Исходный код вставлен здесь:
package org.apache.beam.examples.kafka;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.common.collect.ImmutableMap;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaDemo {
static final Logger LOG = LoggerFactory.getLogger(KafkaDemo.class);
public interface KafkaDemoOptions
extends PipelineOptions, StreamingOptions {
@Description("Kafka bootstrap servers")
@Default.String("")
String getBootstrap();
void setBootstrap(String value);
@Description("Kafka group.id")
@Default.String("test.group")
String getGroupId();
void setGroupId(String value);
@Description("Kafka input topic for test")
@Default.String("test_sample_data")
String getInputTopic();
void setInputTopic(String value);
@Description("Kafka output topic for test")
@Default.String("test_output_data")
String getOutputTopic();
void setOutputTopic(String value);
}
public static void main(String[] args) {
KafkaDemoOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaDemoOptions.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> strings = pipeline
.apply(KafkaIO.<Long, String>read()
.withBootstrapServers(options.getBootstrap())
.withTopic(options
.getInputTopic()) // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
// Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
// Rest of the settings are optional :
// you can further customize KafkaConsumer used to read the records by adding more
// settings for ConsumerConfig. e.g :
.withConsumerConfigUpdates(ImmutableMap.of("group.id", options.getGroupId()))
// set event times and watermark based on 'LogAppendTime'. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
// Use withCreateTime() with topics that have 'CreateTime' timestamps.
// .withLogAppendTime()
// restrict reader to committed messages on Kafka (see method documentation).
.withReadCommitted()
// offset consumed by the pipeline can be committed back.
.commitOffsetsInFinalize()
// finally, if you don't need Kafka metadata, you can drop it.g
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.create());// PCollection<String>
strings.apply(KafkaIO.<Void, String>write()
.withBootstrapServers(options.getBootstrap())
.withTopic(options.getOutputTopic())
.withValueSerializer(StringSerializer.class) // just need serializer for value
.values()
);
System.out.println("input topic = " + options.getInputTopic() + ", output topic: " + options.getOutputTopic());
LOG.info("input topic = " + options.getInputTopic() + ", output topic: " + options.getOutputTopic());
pipeline.run().waitUntilFinish();
}
}