Печать сообщения отладки Kafka на задании PySpark - PullRequest
0 голосов
/ 30 апреля 2019

Есть ли способ напечатать сообщение отладки Kafka (я думаю о сообщениях журнала, которые похожи на librdkafka Отладочное сообщение или kafkacat -D опция), при запуске задания PySpark?

Проблемаявляется то, что я использовал следующие коды на PySpark для подключения к кластеру Kafka под названием A, он работает и распечатывает вещи на консоли каждый раз, когда появляется новое сообщение. Но когда я переключился на другой кластер, под названием B и настроить то же самоеЧто касается кластера A, он ничего не выводил на экран при поступлении новых сообщений. Я вижу, что сообщение проходит нормально, используя инструмент kafkacat в обоих кластерах.

consumer.py

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

spark = SparkSession.builder.appName("KafkaConsumer").getOrCreate()
sc = spark.sparkContext
sqlc = SQLContext(sc)

hosts = "host1:9092,host2:9092,host3:9092"
topic = "myTopic"
securityProtocol = "SASL_PLAINTEXT"
saslMechanism = "PLAIN"

try:
  df = sqlc \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", hosts) \
    .option("kafka.security.protocol", securityProtocol) \
    .option("kafka.sasl.mechanism", saslMechanism) \
    .option("startingOffsets", "earliest") \
    .option("subscribe", topic) \
    .load()

  dss = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream.outputMode('append') \
    .format("console") \
    .start()

  dss.awaitTermination()
except KeyboardInterrupt:    
  print 'shutting down...'

kafka.jaas

KafkaClient {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="user1"
   password="sssshhhh"
   serviceName="kafka";
};

команда оболочки:

spark-submit \
    --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 \
    --files "kafka.jaas" \
    --driver-java-options "-Djava.security.auth.login.config=kafka.jaas" \
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka.jaas" \
    "./consumer.py"

Похоже, kafka кластер B доступен, так как я могу получить из него информацию о смещении, но он просто не читает сообщения.

1 Ответ

0 голосов
/ 03 мая 2019

Проблема была вызвана подключением рабочих узлов к кластеру Kafka, IP-адрес рабочих узлов не был в белом списке брандмауэра в кластере Kafka. Вышеприведенный код заставил рабочие узлы тайм-аут и продолжать попытки подключения к кластеру Kafka, пока не будет подан сигнал прерывания.

Что касается самого сообщения об ошибке, то не было сгенерировано ни одного сообщения об ошибке на главном узле, поскольку рабочий узел все еще пытается подключиться к кластеру Kafka, но время от времени на мастер-консоли распечатывается сообщение о его сбое. для связи с рабочим узлом (или некоторым сообщением, например, «сбор информации»).

ПРИМЕЧАНИЕ. Предполагается, что это происходит в рабочем узле (к которому я не могу войти из-за прав администратора), но на рабочих узлах может быть журнал. (Если кто-то может поддержать или доказать обратное. Это будет высоко оценено)

Что касается самого сообщения отладки Kafka, то оно выглядит как уже печатающее на экран по умолчанию, если в зависимости от настройки уровня регистратора происходят ошибки, информация или предупреждение, а в некоторых нечетных случаях, таких как это, сообщение журнала может не отображаться. быть непосредственно видимым на экране.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...