Исключение нулевого указателя с GenericWriteAheadSink и FileCheckpointCommitter в Flink 1.8 - PullRequest
0 голосов
/ 11 февраля 2020

Я реализую собственный приемник NSQ для Flink. У меня он работает как подкласс RichSinkFunction, но я бы хотел, чтобы реализация журнала записи с опережением работала для обеспечения дополнительной целостности данных.

Использование WriteAheadSinkExample O'Reilly доступно здесь Я попытался реализовать свое собственное:

package com.wistia.analytics

import java.net.{InetSocketAddress, SocketAddress}

import com.github.mitallast.nsq._
import org.apache.flink.api.scala.createTypeInformation
import java.lang.Iterable
import java.nio.file.{Files, Paths}
import java.util.UUID

import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.streaming.runtime.operators.{CheckpointCommitter, GenericWriteAheadSink}

import scala.collection.mutable

class WALNsqSink(val topic: String) extends GenericWriteAheadSink[String](
  // CheckpointCommitter that commits checkpoints to the local filesystem
  new FileCheckpointCommitter(System.getProperty("java.io.tmpdir")),
  // Serializer for records
  createTypeInformation[String]
    .createSerializer(new ExecutionConfig),
  // Random JobID used by the CheckpointCommitter
  UUID.randomUUID.toString) {

  var client: NSQClient = _
  var producer: NSQProducer = _

  override def open(): Unit = {
    val lookup = new NSQLookup {
      def nodes(): List[SocketAddress] = List(new InetSocketAddress("127.0.0.1",4150))
      def lookup(topic: String): List[SocketAddress] = List(new InetSocketAddress("127.0.0.1",4150))
    }

    client = NSQClient(lookup)
    producer = client.producer()
  }

  def sendValues(readings: Iterable[String], checkpointId: Long, timestamp: Long): Boolean = {

    val arr = mutable.Seq()
    readings.forEach{ reading =>
      arr :+ reading
    }

    producer.mpubStr(topic=topic, data=arr)
    true
  }
}

повторное использование FileCheckpointCommitter в нижней части класса, но я получил исключение нулевого указателя внутри GenericWriteAheadSink:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
        at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
        at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
        at com.wistia.analytics.NsqProcessor$.main(NsqProcessor.scala:24)
        at com.wistia.analytics.NsqProcessor.main(NsqProcessor.scala)
Caused by: java.lang.NullPointerException
        at org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink.processElement(GenericWriteAheadSink.java:277)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Nonzero exit code returned from runner: 1
(Compile / run) Nonzero exit code returned from runner: 1
Total time: 45 s, completed Feb 10, 2020 6:41:06 PM

I понятия не имею, куда go отсюда. Любая помощь приветствуется

1 Ответ

1 голос
/ 11 февраля 2020

Проблема здесь, безусловно, заключается в том, что Вы никогда не вызываете метод open() суперкласса. Это приведет к неинициализации некоторых переменных. Это должно быть решено путем вызова super.open() внутри вашего open() метода.

...