У меня есть сообщение для чтения кода от kafka, как показано ниже:
def main(args: Array[String]): Unit = {
System.setProperty("java.security.auth.login.config", "someValue")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val consumerProperties = new Properties()
consumerProperties.setProperty("security.protocol", "SASL_PLAINTEXT")
consumerProperties.setProperty("sasl.mechanism", "PLAIN")
val kafkaConsumer = new FlinkKafkaConsumer011[ObjectNode](consumerProperties.getProperty("topic"), new JsonNodeDeserializationSchema, consumerProperties)
val stream = env.addSource(kafkaConsumer)
}
Когда источник пытается прочитать сообщение от Apache Kafka, функция org.apache.kafka.common.security.JaasContext.defaultContext загрузит свойство "java.security.auth.login.config".
Но свойство устанавливается только в JobManager, и когда мое задание запускается, свойство не может правильно загружаться в TaskManager, поэтому источник не будет работать.
Я пытался установить дополнительные JVM_OPTS, например "-Dxxx = yyy", но кластер flink развернут в автономном режиме, переменную среды нельзя менять очень часто.
Есть ли способ установить свойство в TaskManager?