Java Kafka потребитель и авро десериализатор - PullRequest
1 голос
/ 09 апреля 2019

Я занимаюсь разработкой простого Java с потоковой передачей.

Я настроил коннектор kafka jdbc (postgres к теме) и хочу прочитать его с потребителем потоковой передачи.

I 'я могу правильно прочитать тему с помощью:

./kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --property print.key=true --from-beginning --topic postgres-ip_audit

, получая следующие результаты:

null {"id": 1557, "ip": {"string": "90.228.176.138 "}," create_ts ": {" long ": 1554819937582}}

, когда я использую Java-приложение с этой конфигурацией:

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "groupStreamId");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Я получаю такие результаты:

�179.20.119.53�����Z

Может кто-нибудь указать мне, как исправить мою проблему?

Я также пытаюсь использоватьByteArrayDeserializer и преобразовать bytes [] в строку, но я всегда получаю плохие результаты символов.

Ответы [ 2 ]

1 голос
/ 09 апреля 2019

Вы можете десериализовать сообщения avro, используя io.confluent.kafka.serializers.KafkaAvroDeserializer и имея реестр схемы для управления схемой записей.

Вот пример кода

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import io.confluent.kafka.serializers.KafkaAvroDecoder;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

public class SparkStreaming {

  public static void main(String... args) {
    SparkConf conf = new SparkConf();
    conf.setMaster("local[2]");
    conf.setAppName("Spark Streaming Test Java");

    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(10));

    processStream(ssc, sc);

    ssc.start();
    ssc.awaitTermination();
  }

  private static void processStream(JavaStreamingContext ssc, JavaSparkContext sc) {
    System.out.println("--> Processing stream");

    Map<String, String> props = new HashMap<>();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("schema.registry.url", "http://localhost:8081");
    props.put("group.id", "spark");
    props.put("specific.avro.reader", "true");

    props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    Set<String> topicsSet = new HashSet<>(Collections.singletonList("test"));

    JavaPairInputDStream<String, Object> stream = KafkaUtils.createDirectStream(ssc, String.class, Object.class,
      StringDecoder.class, KafkaAvroDecoder.class, props, topicsSet);

    stream.foreachRDD(rdd -> {
      rdd.foreachPartition(iterator -> {
          while (iterator.hasNext()) {
            Tuple2<String, Object> next = iterator.next();
            Model model = (Model) next._2();
            System.out.println(next._1() + " --> " + model);
          }
        }
      );
    });
  }
}

Полный пример приложения доступен в этом репозитории github

0 голосов
/ 09 апреля 2019

Вы предоставили StringDeserializer, однако отправляете значения, сериализованные с помощью avro, поэтому вам необходимо соответствующим образом десериализовать их. Используя spark 2.4.0 (и следующие компиляторы deps org.apache.spark:spark-avro_2.12:2.4.1, вы можете добиться этого с помощью функции from_avro:

import org.apache.spark.sql.avro._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("path/to/your/schema.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
Dataset<Row> output = df
  .select(from_avro(col("value"), jsonFormatSchema).as("user"))
  .where("user.favorite_color == \"red\"")
  .show()

Если вам нужно использовать реестр схем (как вы это делали с kafka-avro-console-consumer), это невозможно из коробки и нужно написать много кода. Я рекомендую использовать эту библиотеку https://github.com/AbsaOSS/ABRiS. Однако она совместима только со свечой 2.3.0

...