Исключение нулевого указателя в PubsubIO.readAvroGenericRecords при выполнении задания потока данных (java) - PullRequest
1 голос
/ 21 февраля 2020

У меня есть следующий apache луч конвейера:

package ch.mycompany.bb8;

import ch.mycompany.bb8.transforms.LogRecords;

import java.io.File;
import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.ParDo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Bb8Pipeline {
    private static final Logger LOG = LoggerFactory.getLogger(Bb8Pipeline.class);

    /**
     * Runs the pipeline with the supplied options.
     *
     * @param options The execution parameters to the pipeline.
     * @return The result of the pipeline execution.
     */
    public static PipelineResult run(CustomOptions options) {

        Pipeline pipeline = Pipeline.create(options);

        String schemaJson = "{"
        +    "\"type\": \"record\","
        +    "\"namespace\": \"com.google.cloud.pso\","
        +    "\"name\": \"User\","
        +    "\"fields\": ["
        +      "{"
        +        "\"name\": \"name\","
        +        "\"type\": \"string\""
        +      "},"
        +      "{"
        +        "\"name\": \"surname\","
        +        "\"type\": \"string\""
        +      "},"
        +      "{"
        +          "\"name\": \"age\","
        +          "\"type\": \"int\""
        +      "},"
        +      "{"
        +          "\"name\": \"retired\","
        +          "\"type\": \"boolean\""
        +      "}"
        +    "]"
        +  "}";

        Schema avroSchema = new Schema.Parser().parse(schemaJson);

        LOG.info(avroSchema.toString());

        pipeline.apply("Read PubSub record strings", 
        PubsubIO.readAvroGenericRecords(avroSchema)
        .fromSubscription(options.getInputSubscription()))
                .apply("Simply log records", ParDo.of(new LogRecords()))
                .apply("Write PubSub records", PubsubIO.writeStrings().to(options.getOutputTopic()));

        return pipeline.run();
    }

    /**
     * Main entry point for executing the pipeline.
     *
     * @param args The command-line arguments to the pipeline.
     */
    public static void main(String[] args) {
        CustomOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomOptions.class);

        options.setStreaming(true);

        run(options);
    }
}

Я запускаю конвейер, используя maven следующим образом:

mvn compile exec:java \
 -Dexec.mainClass=ch.mycompany.bb8.Bb8Pipeline \
      -Dexec.args="--project=t2-prod \
      --stagingLocation=gs://bb-8-staging/staging/  \
      --tempLocation=gs://bb-8-staging/staging/ \
      --runner=DataflowRunner \
      --region=europe-west1 \
      --jobName=bb-8-avro-test \
      --outputTopic=projects/t2-prod/topics/bb-8-output \
      --inputSubscription=projects/t2-prod/subscriptions/bb-8-ingest \
      --maxNumWorkers=1"

И я получаю следующее исключение нулевого указателя:

INFO: {"type":"record","name":"User","namespace":"com.google.cloud.pso","fields":[{"name":"name","type":"string"},{"name":"surname","type":"string"},{"name":"age","type":"int"},{"name":"retired","type":"boolean"}]}
[WARNING] 
java.lang.NullPointerException
    at java.util.concurrent.ConcurrentHashMap.get (ConcurrentHashMap.java:936)
    at java.util.concurrent.ConcurrentHashMap.containsKey (ConcurrentHashMap.java:964)
    at org.apache.avro.LogicalTypes.fromSchemaImpl (LogicalTypes.java:73)
    at org.apache.avro.LogicalTypes.fromSchema (LogicalTypes.java:47)
    at org.apache.beam.sdk.schemas.utils.AvroUtils.toFieldType (AvroUtils.java:673)
    at org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamField (AvroUtils.java:290)
    at org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamSchema (AvroUtils.java:313)
    at org.apache.beam.sdk.schemas.utils.AvroUtils.getSchema (AvroUtils.java:415)
    at org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.readAvroGenericRecords (PubsubIO.java:592)
    at ch.mycompany.bb8.Bb8Pipeline.run (Bb8Pipeline.java:68)
    at ch.mycompany.bb8.Bb8Pipeline.main (Bb8Pipeline.java:86)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:748)

Как видно из приведенной выше трассировки стека, схема записывается в журнал, как и ожидалось, и поэтому схема не является нулевой.

Кто-нибудь знает, как исправить эту ошибку, или как я могу дальше отлаживать?

mvn -version
Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-24T20:41:47+02:00)
Maven home: /opt/apache-maven
Java version: 1.8.0_191, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-oracle/jre
Default locale: en_ZA, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-88-generic", arch: "amd64", family: "unix"

Версия луча 2.19.0

org. apache .avro версия 1.8.0

1 Ответ

0 голосов
/ 25 февраля 2020

Похоже, это проблема, связанная с конфликтом зависимостей:

  • Луч 2.19.0 зависит от Avro 1.8.2 ( link ), которая имеет правильную реализацию ( см. Эту строку ) и, следовательно, не вызовет проблемы.

  • Но вы упомянули, что используете Avro 1.8.0 с неверной реализацией ( см. эту строку ), которая может выбросить NullPointerException

Так что простым решением этой проблемы является повышение версии Avro, которую вы используете, до 1.8.2

...