Существует очень простой код для чтения данных из Kafka и записи значения в dataframe в другой Kafka (я использую spark-2.3.0 и kafka-0.10.2.1):
object KafkaDSTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.master("local[*]").appName(s"${this.getClass.getSimpleName}").getOrCreate()
import spark.implicits._
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092")
.option("subscribe", "source")
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092")
.option("topic", "sink")
.option("checkpointLocation", "hdfs://rdpecore1:9000/tmp/test/checkpoint")
.outputMode("update")
.start()
.awaitTermination()
}
}
Когда я запускаюВ этой программе я получил сообщение об ошибке:
Query [id = 42f9fa3c-8033-4283-bef8-5cac5d352cc8, runId = a01fad4d-bb83-4b58-83ce-37cc81bbfae0] terminated with error
java.lang.IllegalStateException
at com.google.common.base.Preconditions.checkState(Preconditions.java:133)
at org.apache.hadoop.ipc.Client.setCallIdAndRetryCount(Client.java:118)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:99)
at com.sun.proxy.$Proxy21.delete(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2053)
......
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: null
Эта проблема озадачила меня на несколько дней.Я ценю это, если я могу получить вашу помощь.