KafkaException при использовании kafkaAppender в приложении Spark - PullRequest
0 голосов
/ 06 января 2020

Я пытаюсь настроить Log4j2 KafkaAppender в приложении spark для извлечения метрик из org. apache .spark.scheduler.SparkListener . Кажется, что моя конфигурация работает, потому что я получаю некоторые сообщения в мою kafka topi c, но эти сообщения приходят только от onApplicationEnd (SparkListenerApplicationEnd applicationEnd) .

Затем мое приложение cra sh со следующей трассировкой стека:

20/01/06 11:12:01 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':
java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':
    at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1075)
    at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSession.scala:142)
    at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSession.scala:141)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:141)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:138)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:689)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:650)
    at fr.ca.cat.lib.fwkjobfactory.readers.hdfs.HDFSReaders.lambda$static$56(HDFSReaders.java:9)
    at fr.ca.cat.lib.fwkjobfactory.readers.Reader.lambda$read$54(Reader.java:20)
    at java.util.Optional.map(Optional.java:215)
    at fr.ca.cat.lib.fwkjobfactory.readers.Reader.read(Reader.java:20)
    at fr.ca.cat.lib.fwkjobfactory.strategies.read_strategies.SimpleReadStrategy.read(SimpleReadStrategy.java:23)
    at fr.ca.cat.lib.fwkjobfactory.strategies.SimpleJobStrategy.run(SimpleJobStrategy.java:53)
    at fr.ca.cat.lib.fwkjobfactory.runners.JobFactoryRunner.run(JobFactoryRunner.java:68)
    at fr.ca.cat.lib.fwkjobfactory.JobFactoryApp.main(JobFactoryApp.java:41)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:646)
Caused by: org.apache.spark.sql.AnalysisException: java.lang.ExceptionInInitializerError: null;
    at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)
    at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:194)
    at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:105)
    at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:93)
    at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessionStateBuilder.scala:39)
    at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog$lzycompute(HiveSessionStateBuilder.scala:54)
    at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateBuilder.scala:52)
    at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateBuilder.scala:35)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:289)
    at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1072)
    ... 20 more
Caused by: java.lang.ExceptionInInitializerError
    at org.apache.commons.logging.LogAdapter$Log4jAdapter.createLog(LogAdapter.java:122)
    at org.apache.commons.logging.LogAdapter.createLog(LogAdapter.java:89)
    at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:67)
    at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:59)
    at org.apache.hadoop.hive.ql.session.SessionState.<clinit>(SessionState.java:99)
    at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:136)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:268)
    at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:362)
    at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:266)
    at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:66)
    at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:65)
    at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:195)
    at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)
    at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)
    at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
    ... 29 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188)
    at org.apache.logging.log4j.core.appender.mom.kafka.DefaultKafkaProducerFactory.newKafkaProducer(DefaultKafkaProducerFactory.java:40)
    at org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.startup(KafkaManager.java:131)
    at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.start(KafkaAppender.java:175)
    at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:265)
    at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:547)
    at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:619)
    at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:636)
    at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:231)
    at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153)
    at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45)
    at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194)
    at org.apache.commons.logging.LogAdapter$Log4jLog.<clinit>(LogAdapter.java:155)
    ... 48 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:203)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:305)
    ... 61 more
20/01/06 11:12:01 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':)

С org. apache .kafka.common.serialization.ByteArraySerializer не является экземпляром org. apache .kafka.common.serialization.Serializer, мне интересно, если исполнители пропустили что-то для правильной работы, поэтому я попытался добавить две строки в моей командной строке:

 --conf spark.driver.extraClassPath='kafka-clients-0.10.0.1.jar'\
 --conf spark.executor.extraClassPath='kafka-clients-0.10.0.1.jar'\

А также

 --conf spark.driver.extraJavaOptions='-DconfigLoaderType="classpath" -Dconfiguration=file:log4j2.yml'\

Но это ничего не меняет. Я пытался найти разницу между классами ByteArraySerializer и Serializer, у меня есть только один kafkaClient и одна искра Sql -kafka, которые могут конфликтовать, но я думаю, что они в порядке.

Я где-то читал, что, Можно использовать свойство spark под названием spark.driver.userClassPathFirst , но использование этого хуже, потому что мое приложение вообще не работает. Вот вторая трассировка стека:

log4j:ERROR A "org.apache.log4j.ConsoleAppender" object is not assignable to a "org.apache.log4j.Appender" variable.
log4j:ERROR The class "org.apache.log4j.Appender" was loaded by
log4j:ERROR [sun.misc.Launcher$AppClassLoader@5d22bbb7] whereas object of type
log4j:ERROR "org.apache.log4j.ConsoleAppender" was loaded by [org.apache.spark.util.ChildFirstURLClassLoader@9225652].
log4j:ERROR Could not instantiate appender named "console".
(More if you think it could help..)

Большинство наших jar-файлов предоставляются через командную строку:

kafka-clients-0.10.0.1.jar
spark-sql-kafka-0-10_2.11

И log4j включены в проект:

org.apache.logging.log4j:log4j-api:2.11.0
org.apache.logging.log4j:log4j-core:2.11.0
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...