Log4j не работает в методе foreachRDD потокового воспроизведения - PullRequest
0 голосов
/ 09 января 2019

Когда я использовал sparkstreaming spark2.4 для потребления kafka, я обнаружил, что мои журналы вне метода foreachRDD были напечатаны, но журналы внутри foreachRDD не были напечатаны. Используемый API-интерфейс log4j имеет версию 1.2.

Я попытался добавить
spark.executor.extraJavaOptions = -Dlog4j.configuration = log4j.properties
spark.driver.extraJavaOptions = -Dlog4j.configuration = log4j.properties

в файл конфигурации spark-defaults.properties, и в начале я записал неправильный путь при выводе информации об ошибке уровня журнала и пути файла конфигурации журнала Итак, настройки spark.executor.extraJavaOptions и spark.driver.extraJavaOptions вступили в силу.

Ответы [ 2 ]

0 голосов
/ 10 января 2019
<code>
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/vdir/mnt/disk2/hadoop/yarn/local/usercache/root/filecache/494/__spark_libs__3795396964941241866.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    19/01/10 14:17:16 ERROR KafkaSparkStreamingKafkaTests: receive+++++++++++++++++++++++++++++++
</code>

    My code:
<code>
    1.if (args[3].equals("consumer1")) {
                logger.error("receive+++++++++++++++++++++++++++++++");
                SparkSQLService sparkSQLService = new SparkSQLService();
                consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");
                sparkSQLService.sparkForwardedToKafka(sparkConf,
                        CONSUMER_TOPIC,
                        PRODUCER_TOPIC,
                        new HashMap<String, Object>((Map) consumerProperties));
    ......
    2.public void sparkForwardedToKafka(SparkConf sparkConf, String consumerTopic, String producerTopic, Map<String, Object> kafkaConsumerParamsMap) {
            sparkConf.registerKryoClasses(new Class[]{SparkSQLService.class, FlatMapFunction.class, JavaPairInputDStream.class, Logger.class});
            JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.milliseconds(DURATION_SECONDS));
            Collection<String> topics = Arrays.asList(consumerTopic);
            JavaInputDStream<ConsumerRecord<String, String>> streams =
                    KafkaUtils.createDirectStream(
                            javaStreamingContext,
                            LocationStrategies.PreferConsistent(),
                            ConsumerStrategies.Subscribe(topics, kafkaConsumerParamsMap)
                    );
            if (producerTopic != null) {
                JavaPairDStream<Long, String> messages = streams.mapToPair(record -> new Tuple2<>(record.timestamp(), record.value()));
     messages.foreachRDD(rdd ->
                        {
                            rdd.foreachPartition(partition -> {
                                partition.forEachRemaining(tuple2 -> {
                                    LOGGER.error("****"+tuple2._1+"|"+tuple2._2);
                                    KafkaService.getInstance().send(producerTopic, TaskContext.get().partitionId(), tuple2._1, null, tuple2._2);
                                });
                            });
                        }
                );

</code>

И мой регистратор объявил: private static final Logger LOGGER = LoggerFactory.getLogger (SparkSQLService.class);

0 голосов
/ 09 января 2019

Журнал внутри и снаружи foreach блок выполняется на разных машинах, один на драйвере, а другой на исполнителе. Так что, если вы хотите увидеть журнал внутри блока foreach, вы можете посетить пряжу для других журналов.

...