Ошибка присвоения свойств процесса Spark 1.6 - PullRequest
0 голосов
/ 17 октября 2018

Я сталкиваюсь со следующей проблемой, которую не могу понять.В резюме шеллскрипт запускает основную Java-программу следующим образом:

spark-submit   \
--master yarn \
--deploy-mode cluster   \
--files $FILES   \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-spark-debug.properties"   \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-spark.properties"   \
--conf "spark.ui.showConsoleProgress=false" \
--conf "spark.streaming.backpressure.enabled=true" \
--conf "spark.eventLog.enabled=false" \
--conf "spark.yarn.driver.memoryOverhead=384" \
--conf "spark.yarn.executor.memoryOverhead=1024" \
--conf "spark.executor.instances=1" \
--jars $JARS   \
--driver-memory 1G \
--executor-memory 1G \
--class com.foo.bar.business.MyMainClass   \
--executor-cores 2 \
--name "CLASS process name"   \
${javaDir}/my-busines-jarfile.jar 0

Имеет:

$FILES = /my/path/to/other/properties/files
$JARS = /my/path/to/jar/files

com.foo.bar.business.MyMainClass имеет следующий код:

final Properties properties = PropertyLoader.loadPropertyResource("my-property-file.properties");

//Property loading this way
public static Properties loadPropertyResource(String resource) {
    ClassLoader cl = PropertyLoader.class.getClassLoader();
    if (cl == null) {
        cl = ClassLoader.getSystemClassLoader();
    }

    if (cl == null) {
        LOGGER.error("Unable to access class loader.");
        throw new RuntimeException("Unable to access class loader.");
    }

    URL resourceURL = cl.getResource(resource);

    Properties props;
    if (resourceURL != null) {
        InputStream in = null;

        try {
            in = resourceURL.openStream();
            props = new Properties();
            props.load(in);
        } catch (IOException e) {
            LOGGER.error("Unable to access specified resource: '" + resource + "'");
            LOGGER.error("Error message is:" + e.getLocalizedMessage());
            throw new RuntimeException("Unable to access resource:." + resource, e);
        } finally {
            try {
                in.close();
            } catch (IOException e) {
                LOGGER.warn("Error closing stream ", e.getLocalizedMessage());
            }
        }
    } else {
        LOGGER.error("Unable to access resource: " + resource);
        throw new RuntimeException("Unable to access resource: " + resource);
    }

    return props;
}

После загрузки свойства я печатаю каждое отдельное свойство и значение для целей отладки:

if(null != properties) {
    StringBuffer sb = new StringBuffer();
    for(String key : properties.stringPropertyNames()) {
      String value = properties.getProperty(key);
      sb.append(key + " => " + value).append(" | ");
    }

    LOGGER.debug("properties loaded --> " + sb.toString());
} else {
    LOGGER.error("Properties are null!!! (??????)");
}

Затем я инициализирую sparkContext:

JavaStreamingContext jssc =
    JavaStreamingContext.getOrCreate(checkPointLocation, new JavaStreamingContextFactory() {
      @Override
      public JavaStreamingContext create() {
        try {
          return createContext(properties);
        } catch (Exception e) {
          LOGGER.error("Error at creation of JDD " + e.getMessage());
          LOGGER.error(e.toString());
        }
        return null;
      }
    });
LOGGER.info("Spark context created");
jssc.start();
jssc.awaitTermination();

И "createContext (свойства) "метод:

MyBusinessLogicClass mblc = new MyBusinessLogicClass(properties);
mblc.doLogic(inputDataFromKafka);

В MyBusinessLogicClass (который реализует сериализуемый) есть" Свойства свойств ";(не частный, не временный, не окончательный), который инициализируется в конструкторе с помощью "this.properties = properties;", просто как круговой.

Затем в процессе doLogic я печатаю те же свойства, что и раньше.

Результат совершенно другой: в главном классе (который загружается из скрипта .sh) 15 свойств и в BusinessLogicClass havin 7.

Кроме того, любой конструктор журнала slf4j не являетсяв консоль пряжи выводятся только логи от doLogic и далее.

protected static final Logger LOGGER = LoggerFactory.getLogger(MyClass.class);
protected static final Logger LOGGER = LoggerFactory.getLogger(MyBusinessLogicClass.class);

Мой вопрос: почему это все возможно?почему основной класс получает каждое свойство, а подкласс - нет?Это может быть оператор «this.properties = properties»?

Один вывод одного выполнения выглядит так:

2018-10-17 18:43:56,141 [Driver] DEBUG com.foo.bar.business.MyMainClass  - 
properties loaded --> 
    kafka.input.topic.pattern => APP.kafka.topic.pattern 
 |  top.group.value => 5 
 |  hdfs.temp.data.path => temp 
 |  hdfs.final.data.path => reporting/final 
 |  kafka.max.input.messages.per.partition => 1 
 |  input.minutes.delay => 2 
 |  app.num.workers => 1 
 |  streaming.period => 120 
 |  auto.timeout.hours => 1 
 |  max.study.var.filter => 30 
 |  kafka.output.topic => APP.kafka.ouput 
 |  hdfs.checkpoint.path => checkpoint

 2018-10-17 18:44:02,407 [streaming-job-executor-0] DEBUG com.foo.bar.business.MyBusinessLogicClass - 
 properties loaded --> 
    streaming.period => 120 
 |  kafka.max.input.messages.per.partition => 1 
 |  max.study.var.filter => 30 
 |  top.group.value => 5 
 |  num.workers => 1 
 |  kafka.input.topic.pattern => APP.kafka.topic.pattern 
 |  hdfs.checkpoint.path => checkpoint
...