Я пытаюсь перенести мое текущее потоковое приложение, основанное на использовании RDD (из их документация ), на их новый API наборов данных с использованием структурированной потоковой передачи, что, как мне сказали, является предпочтительным подходом кделайте потоковое вещание в реальном времени со Spark в эти дни.
В настоящее время у меня есть настройка приложения для использования из 1 темы под названием «SATELLITE», в которой есть сообщения, содержащие метку времени и значение, содержащее Satellite
POJO.Но у меня проблемы с выяснением, как реализовать десериализатор для этого.В моем текущем приложении это просто, вы просто добавляете строку к вашей карте свойств kafka kafkaParams.put("value.deserializer", SatelliteMessageDeserializer.class);
Я делаю это на Java, что представляет собой самую большую проблему, потому что все решения, похоже, находятся в Scala, которую яплохо понимаю, и я не могу легко преобразовать код Scala в код Java.
Я следовал примеру JSON, изложенному в этом вопросе , который в настоящее время работает, но кажется слишком сложным для того, что мне нужно сделать.Учитывая, что у меня уже есть специальный десериализатор, созданный для этой цели, я не понимаю, почему мне нужно сначала преобразовать его в строку, а просто преобразовать в JSON, а затем преобразовать в желаемый тип класса.Я также пытался использовать некоторые примеры, которые я нашел здесь , но мне пока не повезло.
В настоящее время мое приложение выглядит так (с использованием подхода json):
import common.model.Satellite;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class SparkStructuredStreaming implements Runnable{
private String bootstrapServers;
private SparkSession session;
public SparkStructuredStreaming(final String bootstrapServers, final SparkSession session) {
this.bootstrapServers = bootstrapServers;
this.session = session;
}
@Override
public void run() {
Dataset<Row> df = session
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", "SATELLITE")
.load();
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("gms", DataTypes.StringType, true),
DataTypes.createStructField("satelliteId", DataTypes.StringType, true),
DataTypes.createStructField("signalId", DataTypes.StringType, true),
DataTypes.createStructField("cnr", DataTypes.DoubleType, true),
DataTypes.createStructField("constellation", DataTypes.StringType, true),
DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
DataTypes.createStructField("mountPoint", DataTypes.StringType, true),
DataTypes.createStructField("pseudorange", DataTypes.DoubleType, true),
DataTypes.createStructField("epochTime", DataTypes.IntegerType, true)
});
Dataset<Satellite> df1 = df.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),schema).as("json"))
.select("json.*")
.as(Encoders.bean(Satellite.class));
try {
df1.writeStream()
.format("console")
.option("truncate", "false")
.start()
.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
}
}
И у меня есть собственный десериализатор, который выглядит следующим образом
import common.model.Satellite;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class SatelliteMessageDeserializer implements Deserializer<Satellite> {
private static Logger logger = LoggerFactory.getLogger(SatelliteMessageDeserializer.class);
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map configs, boolean isKey) {
}
@Override
public void close() {
}
@Override
public Satellite deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(new String(data, "UTF-8"), getMessageClass());
} catch (Exception e) {
logger.error("Unable to deserialize message {}", data, e);
return null;
}
}
protected Class<Satellite> getMessageClass() {
return Satellite.class;
}
}
Как я могу использовать свой собственный десериализатор из класса SparkStructuredStreaming
?Я использую Spark 2.4, OpenJDK 10 и Kafka 2.0
РЕДАКТИРОВАТЬ: я пытался создать свой собственный UDF, который я думаю, как это должно быть сделано, но я не уверен, как его получитьвозвращать определенный тип, так как это только позволяет мне использовать те, что в классе Datatypes
!
UserDefinedFunction mode = udf(
(byte[] bytes) -> deserializer.deserialize("", bytes), DataTypes.BinaryType //Needs to be type Satellite, but only allows ones of type DataTypes
);
Dataset df1 = df.select(mode.apply(col("value")));