Я реализую собственный приемник NSQ для Flink. У меня он работает как подкласс RichSinkFunction
, но я бы хотел, чтобы реализация журнала записи с опережением работала для обеспечения дополнительной целостности данных.
Использование WriteAheadSinkExample O'Reilly доступно здесь Я попытался реализовать свое собственное:
package com.wistia.analytics
import java.net.{InetSocketAddress, SocketAddress}
import com.github.mitallast.nsq._
import org.apache.flink.api.scala.createTypeInformation
import java.lang.Iterable
import java.nio.file.{Files, Paths}
import java.util.UUID
import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.streaming.runtime.operators.{CheckpointCommitter, GenericWriteAheadSink}
import scala.collection.mutable
class WALNsqSink(val topic: String) extends GenericWriteAheadSink[String](
// CheckpointCommitter that commits checkpoints to the local filesystem
new FileCheckpointCommitter(System.getProperty("java.io.tmpdir")),
// Serializer for records
createTypeInformation[String]
.createSerializer(new ExecutionConfig),
// Random JobID used by the CheckpointCommitter
UUID.randomUUID.toString) {
var client: NSQClient = _
var producer: NSQProducer = _
override def open(): Unit = {
val lookup = new NSQLookup {
def nodes(): List[SocketAddress] = List(new InetSocketAddress("127.0.0.1",4150))
def lookup(topic: String): List[SocketAddress] = List(new InetSocketAddress("127.0.0.1",4150))
}
client = NSQClient(lookup)
producer = client.producer()
}
def sendValues(readings: Iterable[String], checkpointId: Long, timestamp: Long): Boolean = {
val arr = mutable.Seq()
readings.forEach{ reading =>
arr :+ reading
}
producer.mpubStr(topic=topic, data=arr)
true
}
}
повторное использование FileCheckpointCommitter
в нижней части класса, но я получил исключение нулевого указателя внутри GenericWriteAheadSink
:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.wistia.analytics.NsqProcessor$.main(NsqProcessor.scala:24)
at com.wistia.analytics.NsqProcessor.main(NsqProcessor.scala)
Caused by: java.lang.NullPointerException
at org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink.processElement(GenericWriteAheadSink.java:277)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Nonzero exit code returned from runner: 1
(Compile / run) Nonzero exit code returned from runner: 1
Total time: 45 s, completed Feb 10, 2020 6:41:06 PM
I понятия не имею, куда go отсюда. Любая помощь приветствуется