У меня есть приложение для потокового воспроизведения, которое использует кластер Kafka1 и записывает агрегированные данные в кластер Kafka2. Приложение Spark и Kafka cluster2 развертываются в OpenShift в двух отдельных модулях.
Из журнала искровых приложений кажется, что данные из кластера Kafka1 читаются, агрегаты запускаются и вызывается операция запуска выходного информационного кадра для записи данных в кластер Kafka2. Кажется, нет ошибок. Вот образец журнала
[2019-04-08 11:47:39,691] INFO Reading stream from server: asdasd1.aa.ss.net:9092,asdasd2.aa.ss.net:9092,asdasd3.aa.ss.net:9092
[2019-04-08 11:47:42,532] INFO Acquired Kakfa stream
[2019-04-08 11:47:43,809] INFO Preparing to run
[2019-04-08 11:47:44,493] INFO Beginning aggregation:
[2019-04-08 11:47:44,898] INFO Writing query: (myapplication)
.
.
.
[2019-04-08 15:15:34,859] INFO Asked to remove non-existent executor 12223 (org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint)
[2019-04-08 15:15:34,859] INFO Trying to remove executor 12223 from BlockManagerMaster. (org.apache.spark.storage.BlockManagerMasterEndpoint)
[2019-04-08 15:15:34,860] INFO Executor updated: app-20190408114740-0000/12225 is now RUNNING (org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint)
[2019-04-08 15:15:36,584] INFO Executor updated: app-20190408114740-0000/12224 is now EXITED (Command exited with code 1) (org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint)
[2019-04-08 15:15:36,584] INFO Executor app-20190408114740-0000/12224 removed: Command exited with code 1 (org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
[2019-04-08 15:15:36,584] INFO Executor added: app-20190408114740-0000/12226 on worker-20190408093440-10.225.30.29-33371 (10.225.30.29:33371) with 8 core(s) (org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint)
[2019-04-08 15:15:36,584] INFO Removal of executor 12224 requested (org.apache.spark.storage.BlockManagerMaster)
[2019-04-08 15:15:36,584] INFO Asked to remove non-existent executor 12224 (org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint)
[2019-04-08 15:15:36,584] INFO Trying to remove executor 12224 from BlockManagerMaster. (org.apache.spark.storage.BlockManagerMasterEndpoint)
[2019-04-08 15:15:36,584] INFO Granted executor ID app-20190408114740-0000/12226 on hostPort 10.225.30.29:33371 with 8 core(s), 3.0 GB RAM (org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
[2019-04-08 15:15:36,585] INFO Executor updated: app-20190408114740-0000/12226 is now RUNNING (org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint)
Однако я не вижу сообщений в кластере Kafka 2. Я запустил это на модуле Kafka в openshift.
. / Bin / kafka-console-consumer.sh --bootstrap-server kafka-service: 9092 --topic process.mymetrics
где kafka-service - это сервис OpenShift, как указано ниже
kafka-service ClusterIP xx.xxx.xxx.xxx <none> 9092/TCP
Поскольку нет ошибок приложения, похоже, что приложение spark записывает данные. Можете ли вы дать мне какие-либо входные данные о том, как отладить это? Обратите внимание, что эта установка работала хорошо до нескольких недель назад.
Update1:
Я протестировал запись потока в консоль, и это работает
df.writeStream
.outputMode(OutputMode.Append())
.format("console")
Но запись в Kakfa cluster2 не работает.