Я сталкиваюсь со следующей проблемой, которую не могу понять.В резюме шеллскрипт запускает основную Java-программу следующим образом:
spark-submit \
--master yarn \
--deploy-mode cluster \
--files $FILES \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-spark-debug.properties" \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-spark.properties" \
--conf "spark.ui.showConsoleProgress=false" \
--conf "spark.streaming.backpressure.enabled=true" \
--conf "spark.eventLog.enabled=false" \
--conf "spark.yarn.driver.memoryOverhead=384" \
--conf "spark.yarn.executor.memoryOverhead=1024" \
--conf "spark.executor.instances=1" \
--jars $JARS \
--driver-memory 1G \
--executor-memory 1G \
--class com.foo.bar.business.MyMainClass \
--executor-cores 2 \
--name "CLASS process name" \
${javaDir}/my-busines-jarfile.jar 0
Имеет:
$FILES = /my/path/to/other/properties/files
$JARS = /my/path/to/jar/files
com.foo.bar.business.MyMainClass имеет следующий код:
final Properties properties = PropertyLoader.loadPropertyResource("my-property-file.properties");
//Property loading this way
public static Properties loadPropertyResource(String resource) {
ClassLoader cl = PropertyLoader.class.getClassLoader();
if (cl == null) {
cl = ClassLoader.getSystemClassLoader();
}
if (cl == null) {
LOGGER.error("Unable to access class loader.");
throw new RuntimeException("Unable to access class loader.");
}
URL resourceURL = cl.getResource(resource);
Properties props;
if (resourceURL != null) {
InputStream in = null;
try {
in = resourceURL.openStream();
props = new Properties();
props.load(in);
} catch (IOException e) {
LOGGER.error("Unable to access specified resource: '" + resource + "'");
LOGGER.error("Error message is:" + e.getLocalizedMessage());
throw new RuntimeException("Unable to access resource:." + resource, e);
} finally {
try {
in.close();
} catch (IOException e) {
LOGGER.warn("Error closing stream ", e.getLocalizedMessage());
}
}
} else {
LOGGER.error("Unable to access resource: " + resource);
throw new RuntimeException("Unable to access resource: " + resource);
}
return props;
}
После загрузки свойства я печатаю каждое отдельное свойство и значение для целей отладки:
if(null != properties) {
StringBuffer sb = new StringBuffer();
for(String key : properties.stringPropertyNames()) {
String value = properties.getProperty(key);
sb.append(key + " => " + value).append(" | ");
}
LOGGER.debug("properties loaded --> " + sb.toString());
} else {
LOGGER.error("Properties are null!!! (??????)");
}
Затем я инициализирую sparkContext:
JavaStreamingContext jssc =
JavaStreamingContext.getOrCreate(checkPointLocation, new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
try {
return createContext(properties);
} catch (Exception e) {
LOGGER.error("Error at creation of JDD " + e.getMessage());
LOGGER.error(e.toString());
}
return null;
}
});
LOGGER.info("Spark context created");
jssc.start();
jssc.awaitTermination();
И "createContext (свойства) "метод:
MyBusinessLogicClass mblc = new MyBusinessLogicClass(properties);
mblc.doLogic(inputDataFromKafka);
В MyBusinessLogicClass (который реализует сериализуемый) есть" Свойства свойств ";(не частный, не временный, не окончательный), который инициализируется в конструкторе с помощью "this.properties = properties;", просто как круговой.
Затем в процессе doLogic я печатаю те же свойства, что и раньше.
Результат совершенно другой: в главном классе (который загружается из скрипта .sh) 15 свойств и в BusinessLogicClass havin 7.
Кроме того, любой конструктор журнала slf4j не являетсяв консоль пряжи выводятся только логи от doLogic и далее.
protected static final Logger LOGGER = LoggerFactory.getLogger(MyClass.class);
protected static final Logger LOGGER = LoggerFactory.getLogger(MyBusinessLogicClass.class);
Мой вопрос: почему это все возможно?почему основной класс получает каждое свойство, а подкласс - нет?Это может быть оператор «this.properties = properties»?
Один вывод одного выполнения выглядит так:
2018-10-17 18:43:56,141 [Driver] DEBUG com.foo.bar.business.MyMainClass -
properties loaded -->
kafka.input.topic.pattern => APP.kafka.topic.pattern
| top.group.value => 5
| hdfs.temp.data.path => temp
| hdfs.final.data.path => reporting/final
| kafka.max.input.messages.per.partition => 1
| input.minutes.delay => 2
| app.num.workers => 1
| streaming.period => 120
| auto.timeout.hours => 1
| max.study.var.filter => 30
| kafka.output.topic => APP.kafka.ouput
| hdfs.checkpoint.path => checkpoint
2018-10-17 18:44:02,407 [streaming-job-executor-0] DEBUG com.foo.bar.business.MyBusinessLogicClass -
properties loaded -->
streaming.period => 120
| kafka.max.input.messages.per.partition => 1
| max.study.var.filter => 30
| top.group.value => 5
| num.workers => 1
| kafka.input.topic.pattern => APP.kafka.topic.pattern
| hdfs.checkpoint.path => checkpoint