Flink: Как установить свойства системы в TaskManager? - PullRequest
1 голос
/ 10 апреля 2019

У меня есть сообщение для чтения кода от kafka, как показано ниже:

def main(args: Array[String]): Unit = {
    System.setProperty("java.security.auth.login.config", "someValue")
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val consumerProperties = new Properties()
    consumerProperties.setProperty("security.protocol", "SASL_PLAINTEXT")
    consumerProperties.setProperty("sasl.mechanism", "PLAIN")
    val kafkaConsumer = new FlinkKafkaConsumer011[ObjectNode](consumerProperties.getProperty("topic"), new JsonNodeDeserializationSchema, consumerProperties)

    val stream = env.addSource(kafkaConsumer)
}

Когда источник пытается прочитать сообщение от Apache Kafka, функция org.apache.kafka.common.security.JaasContext.defaultContext загрузит свойство "java.security.auth.login.config".

Но свойство устанавливается только в JobManager, и когда мое задание запускается, свойство не может правильно загружаться в TaskManager, поэтому источник не будет работать.

Я пытался установить дополнительные JVM_OPTS, например "-Dxxx = yyy", но кластер flink развернут в автономном режиме, переменную среды нельзя менять очень часто.

Есть ли способ установить свойство в TaskManager?

1 Ответ

0 голосов
/ 10 апреля 2019

Файл bin/config.sh автономного кластера Flink содержит свойство с именем DEFAULT_ENV_JAVA_OPTS.

Также, если вы export $JVM_ARGS="your parameters" файл bin/config.sh загрузите его, используя следующие строки:

# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
if [ -z "${JVM_ARGS}" ]; then
    JVM_ARGS=""
fi
...