Как реализовать собственный десериализатор для потока Kafka с использованием структурированной потоковой передачи Spark? - PullRequest
0 голосов
/ 28 ноября 2018

Я пытаюсь перенести мое текущее потоковое приложение, основанное на использовании RDD (из их документация ), на их новый API наборов данных с использованием структурированной потоковой передачи, что, как мне сказали, является предпочтительным подходом кделайте потоковое вещание в реальном времени со Spark в эти дни.

В настоящее время у меня есть настройка приложения для использования из 1 темы под названием «SATELLITE», в которой есть сообщения, содержащие метку времени и значение, содержащее Satellite POJO.Но у меня проблемы с выяснением, как реализовать десериализатор для этого.В моем текущем приложении это просто, вы просто добавляете строку к вашей карте свойств kafka kafkaParams.put("value.deserializer", SatelliteMessageDeserializer.class); Я делаю это на Java, что представляет собой самую большую проблему, потому что все решения, похоже, находятся в Scala, которую яплохо понимаю, и я не могу легко преобразовать код Scala в код Java.

Я следовал примеру JSON, изложенному в этом вопросе , который в настоящее время работает, но кажется слишком сложным для того, что мне нужно сделать.Учитывая, что у меня уже есть специальный десериализатор, созданный для этой цели, я не понимаю, почему мне нужно сначала преобразовать его в строку, а просто преобразовать в JSON, а затем преобразовать в желаемый тип класса.Я также пытался использовать некоторые примеры, которые я нашел здесь , но мне пока не повезло.

В настоящее время мое приложение выглядит так (с использованием подхода json):

import common.model.Satellite;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class SparkStructuredStreaming  implements Runnable{

    private String bootstrapServers;
    private SparkSession session;

    public SparkStructuredStreaming(final String bootstrapServers, final SparkSession session) {
        this.bootstrapServers = bootstrapServers;
        this.session = session;
    }
    @Override
    public void run() {
        Dataset<Row> df = session
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", bootstrapServers)
                .option("subscribe", "SATELLITE")
                .load();

        StructType schema =  DataTypes.createStructType(new StructField[] {
                DataTypes.createStructField("id", DataTypes.StringType, true),
                DataTypes.createStructField("gms", DataTypes.StringType, true),
                DataTypes.createStructField("satelliteId", DataTypes.StringType, true),
                DataTypes.createStructField("signalId", DataTypes.StringType, true),
                DataTypes.createStructField("cnr", DataTypes.DoubleType, true),
                DataTypes.createStructField("constellation", DataTypes.StringType, true),
                DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
                DataTypes.createStructField("mountPoint", DataTypes.StringType, true),
                DataTypes.createStructField("pseudorange", DataTypes.DoubleType, true),
                DataTypes.createStructField("epochTime", DataTypes.IntegerType, true)
        });

            Dataset<Satellite> df1 = df.selectExpr("CAST(value AS STRING) as message")
                    .select(functions.from_json(functions.col("message"),schema).as("json"))
                    .select("json.*")
                    .as(Encoders.bean(Satellite.class));

        try {
            df1.writeStream()
                    .format("console")
                    .option("truncate", "false")
                    .start()
                    .awaitTermination();

        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }
}

И у меня есть собственный десериализатор, который выглядит следующим образом

import common.model.Satellite;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class SatelliteMessageDeserializer implements Deserializer<Satellite> {

    private static Logger logger = LoggerFactory.getLogger(SatelliteMessageDeserializer.class);
    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map configs, boolean isKey) {
    }

    @Override
    public void close() {
    }

    @Override
    public Satellite deserialize(String topic, byte[] data) {
        try {
            return objectMapper.readValue(new String(data, "UTF-8"), getMessageClass());
        } catch (Exception e) {
            logger.error("Unable to deserialize message {}", data, e);
            return null;
        }
    }

    protected Class<Satellite> getMessageClass() {
        return Satellite.class;
    }
}

Как я могу использовать свой собственный десериализатор из класса SparkStructuredStreaming?Я использую Spark 2.4, OpenJDK 10 и Kafka 2.0

РЕДАКТИРОВАТЬ: я пытался создать свой собственный UDF, который я думаю, как это должно быть сделано, но я не уверен, как его получитьвозвращать определенный тип, так как это только позволяет мне использовать те, что в классе Datatypes!

UserDefinedFunction mode = udf(
                (byte[] bytes) -> deserializer.deserialize("", bytes), DataTypes.BinaryType //Needs to be type Satellite, but only allows ones of type DataTypes
        );

Dataset df1 = df.select(mode.apply(col("value")));

1 Ответ

0 голосов
/ 28 ноября 2018

from_json может работать только со строковыми столбцами.

Структурированная потоковая передача всегда использует значения Kafka в байтах

Значения всегда десериализуются как байтовые массивы с помощью ByteArrayDeserializer.Используйте операции DataFrame, чтобы явно десериализовать значения

Поэтому сначала вы должны хотя бы десериализовать строку, но я не думаю, что вам это действительно нужно.

Возможно, можно просто сделать это

df.select(value).as(Encoders.bean(Satellite.class))

Если это не сработает, вы можете попробовать определить собственный UDF / декодер, чтобы вы могли получить что-то вроде SATELLITE_DECODE(value)

В scala

object SatelliteDeserializerWrapper {
    val deser = new SatelliteDeserializer
}
spark.udf.register("SATELLITE_DECODE", (topic: String, bytes: Array[Byte]) => 
    SatelliteDeserializerWrapper.deser.deserialize(topic, bytes)
)

df.selectExpr("""SATELLITE_DECODE("topic1", value) AS message""")

см. этот пост для вдохновения , а также упоминается в блоге Databricks

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...