Я пытаюсь запустить интеграцию Spark + Kafka в Amazon EMR на простом примере с Spark-Shell, но у меня постоянно возникают ошибки тайм-аута. Однако, когда я публикую sh с org.apache.kafka
и теми же настройками, что и ниже, он работает без сбоев.
Ошибка времени ожидания:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
Я переместил client.truststore.jks
и client.keystore.p12
в hdfs и запустил следующую команду:
$ spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
import org.apache.spark.sql.functions.col
val kafkaOptions = Map("kafka.bootstrap.servers" -> s"$host:$port",
"kafka.security.protocol" -> "SSL",
"kafka.ssl.endpoint.identification.algorithm" -> "",
"kafka.ssl.truststore.location" -> "/home/hadoop/client.truststore.jks",
"kafka.ssl.truststore.password" -> "password",
"kafka.ssl.keystore.type" -> "PKCS12",
"kafka.ssl.key.password" -> "password",
"kafka.ssl.keystore.location" -> "/home/hadoop/client.keystore.p12",
"kafka.ssl.keystore.password" -> "password")
)
val df = spark
.read
.option("header", true)
.option("escape", "\"")
.csv("s3://bucket/file.csv")
val publishToKafkaDf = df.withColumn("value", col("body"))
publishToKafkaDf
.selectExpr( "CAST(value AS STRING)")
.write
.format("kafka")
.option("topic", "test-topic")
.options(kafkaOptions)
.save()