Есть ли способ напечатать сообщение отладки Kafka (я думаю о сообщениях журнала, которые похожи на librdkafka
Отладочное сообщение или kafkacat -D
опция), при запуске задания PySpark?
Проблемаявляется то, что я использовал следующие коды на PySpark для подключения к кластеру Kafka под названием A, он работает и распечатывает вещи на консоли каждый раз, когда появляется новое сообщение. Но когда я переключился на другой кластер, под названием B и настроить то же самоеЧто касается кластера A, он ничего не выводил на экран при поступлении новых сообщений. Я вижу, что сообщение проходит нормально, используя инструмент kafkacat
в обоих кластерах.
consumer.py
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
spark = SparkSession.builder.appName("KafkaConsumer").getOrCreate()
sc = spark.sparkContext
sqlc = SQLContext(sc)
hosts = "host1:9092,host2:9092,host3:9092"
topic = "myTopic"
securityProtocol = "SASL_PLAINTEXT"
saslMechanism = "PLAIN"
try:
df = sqlc \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", hosts) \
.option("kafka.security.protocol", securityProtocol) \
.option("kafka.sasl.mechanism", saslMechanism) \
.option("startingOffsets", "earliest") \
.option("subscribe", topic) \
.load()
dss = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream.outputMode('append') \
.format("console") \
.start()
dss.awaitTermination()
except KeyboardInterrupt:
print 'shutting down...'
kafka.jaas
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="user1"
password="sssshhhh"
serviceName="kafka";
};
команда оболочки:
spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 \
--files "kafka.jaas" \
--driver-java-options "-Djava.security.auth.login.config=kafka.jaas" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka.jaas" \
"./consumer.py"
Похоже, kafka
кластер B доступен, так как я могу получить из него информацию о смещении, но он просто не читает сообщения.