Как десериализовать Java-объекты из kafka с помощью Spark StructuredStream API без Avro или Json? - PullRequest
0 голосов
/ 06 декабря 2018

Я создал тему в kafka под названием «test», в которой есть только один раздел, и он не реплицируется.

Я создал производителя Kafka, который пишет по теме «test» ключ, то естьиндекс итерации каждого цикла и значение, являющееся объектом типа Word в цикле из 100000 итераций.Свойство «word» каждого слова устанавливается на индекс итерации, а свойство «timestamp» каждого слова устанавливается на текущую метку времени.Существует 1000 мс сна между итерациями (Thread.sleep).Вот определение Word:

public class Word implements Serializable{

    private static final long serialVersionUID = 1L;

    private String word;

    private Long timestamp;

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Word [word=" + word + ", timestamp=" + timestamp + "]";
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((timestamp == null) ? 0 : timestamp.hashCode());
        result = prime * result + ((word == null) ? 0 : word.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        Word other = (Word) obj;
        if (timestamp == null) {
            if (other.timestamp != null)
                return false;
        } else if (!timestamp.equals(other.timestamp))
            return false;
        if (word == null) {
            if (other.word != null)
                return false;
        } else if (!word.equals(other.word))
            return false;
        return true;
    }

}

Ниже приведен класс, который использует StructuredStreaming API.Как видите, я регистрирую класс десериализатора с именем «deserialize», затем использую этот класс для преобразования столбца df, построенного из kafka.Я избавлю вас от деталей класса Deserializer, потому что, как вы видите в трассировке стека, класс правильно десериализован.Дело в том, что после десериализации происходит сбой приведения к схеме Word.

public class StreamObjsFromKafka {

    public static void es() throws StreamingQueryException {
        SparkSession session = SparkSession.builder().appName("streamFromKafka").master("local[*]").getOrCreate();

        Dataset<Row> df = session.readStream().format("kafka")
                .option("group.id","test-consumer-group")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "test").load();

        df.printSchema();

        StructField word = DataTypes.createStructField("word", DataTypes.StringType, true);
        StructField timestamp = DataTypes.createStructField("timestamp", DataTypes.LongType, true);
        StructType schema = DataTypes.createStructType(Arrays.asList(word, timestamp));

        session.udf().register("deserialize", new WordDeserializer(), schema);

        Dataset<Row> df1 = df.selectExpr("CAST(key AS STRING) as KEY", "deserialize(value) AS value");

        StreamingQuery query0 = df1.writeStream().outputMode("update").format("console").start();

        query0.awaitTermination();

    }

}

Следующие строки являются первыми из трассировки стека (я избавлю вас от ненужных подробностей):

18/12/06 15:21:03 ERROR Utils: Aborting task
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$259: (binary) => struct<word:string,timestamp:bigint>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The value (Word [word=15, timestamp=1544106059933]) of the type (kafka.model.Word) cannot be converted to struct<word:string,timestamp:bigint>
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
    ... 17 more
18/12/06 15:21:03 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 0, attempt 0stage 0.0)
18/12/06 15:21:03 ERROR DataWritingSparkTask: Aborted commit for partition 0 (task 0, attempt 0stage 0.0)
18/12/06 15:21:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$259: (binary) => struct<word:string,timestamp:bigint>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The value (Word [word=15, timestamp=1544106059933]) of the type (kafka.model.Word) cannot be converted to struct<word:string,timestamp:bigint>
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
    ... 17 more
18/12/06 15:21:03 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$259: (binary) => struct<word:string,timestamp:bigint>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The value (Word [word=15, timestamp=1544106059933]) of the type (kafka.model.Word) cannot be converted to struct<word:string,timestamp:bigint>
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
    ... 17 more

Я думаю, что наиболее важной строкой трассировки стека является следующая:

The value (Word [word=15, timestamp=1544106059933]) of the type (kafka.model.Word) cannot be converted to struct<word:string,timestamp:bigint>

Как вы можете видеть в приведенной выше строке, десериализация работает правильно: "(Word [word = 15, timestamp= 1544106059933]) "это именно то, что Kafka Producer написал в" тестовую "тему.Тем не менее, он не приводит его свойство к следующей схеме StructType: "struct".Я не могу понять, почему.

...