Производитель Spark Kafka выбрасывает исключение при записи Dataframe в тему Kafka - PullRequest
0 голосов
/ 03 февраля 2020

Я пытаюсь создать кадр данных для Kafka Topi c, используя Spark Kafka в Java.

Я могу создать данные, если перебираю строки в кадре данных, извлекая ключевой столбец и столбец значений из фрейма данных и его создание, как показано ниже:

Map<String, Object> kafkaParameters = new HashMap<>();
kafkaParameters.put(<All Kafka Params>);

finalDataframe.foreach( row -> {
    Producer<String, String> producer = new KafkaProducer<String, String>(kafkaParameters);
    ProducerRecord<String, String> producerRec= new ProducerRecord<>("<TOPIC_NAME>", row.getAs("columnNameForMsgKey"), row.getAs("columnNameForMsgValue"));
    producer.send(producerRec);
});

Я не хочу использовать вышеуказанный метод, потому что для каждой строки он создает новый экземпляр Producer для его записи, который будет влиять производительность набора данных огромна.

Вместо этого я попытался записать весь фрейм данных в один go, используя следующий метод:

       finalDataframe.selectExpr("CAST(columnNameForMsgKey AS STRING) as key", "CAST(columnNameForMsgValue AS STRING) as value")
                    .write()
                    .format("kafka")
                    .option("kafka.bootstrap.servers", "<SERVER_NAMES>")
                    .option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
                    .option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
                    .option("security.protocol", "SASL_PLAINTEXT")
                    .option("sasl.kerberos.service.name", "kafka")
                    .option("sasl.mechanism", "GSSAPI")
                    .option("acks", "all")
                    .option("topic", "<TOPIC_NAME>")
                    .save();

Но метод выдает следующее исключение:

THROWS org.apache.kafka.common.errors.TimeoutException: Topic TOPIC_NAME not present in metadata
Entire stacktrace is:
20/02/01 23:04:30 INFO SparkContext: SparkContext already stopped.
20/02/01 23:04:30 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 131 in stage 266.0 failed 4 times, most recent failure: Lost task 131.3 in stage 266.0 (TID 4664, servername.com, executor 1): org.apache.kafka.common.errors.TimeoutException: Topic <TOPIC_NAME> not present in metadata after 60000 ms.

Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 131 in stage 266.0 failed 4 times, most recent failure: Lost task 131.3 in stage 266.0 (TID 4664, servername.com, executor 1): org.apache.kafka.common.errors.TimeoutException: Topic <TOPIC_NAME> not present in metadata after 60000 ms.

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927)
    at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:87)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:206)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:264)
    at CustomProducer.main(CustomProducer.java:508)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic <TOPIC_NAME> not present in metadata after 60000 ms.
20/02/01 23:04:30 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 131 in stage 266.0 failed 4 times, most recent failure: Lost task 131.3 in stage 266.0 (TID 4664, servername.com, executor 1): org.apache.kafka.common.errors.TimeoutException: Topic <TOPIC_NAME> not present in metadata after 60000 ms.

Пожалуйста, помогите найти причину проблемы или предложите альтернативу для создания полного кадра данных для topi c вместо создания каждой строки

NB Ключ и значение сообщения Kafka, которое будет создано, представить как два разных столбца в finalDataframe

Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...