Как отправить работу в кластер Flink с помощью кода Java? - PullRequest
2 голосов
/ 29 января 2020

Я уже загрузил толстый файл jar, содержащий код моего приложения, в папку / lib всех узлов моего кластера Flink. Я пытаюсь запустить задание Flink из отдельного java приложения, но не могу найти хороший способ сделать это.

Ближайшее решение, которое я нашел в настоящее время, - это API-интерфейс Monitoring Rest. который имеет API задания выполнения . Однако это позволяет только запускать задания, отправленные с помощью функции загрузки заданий.

Я видел ClusterClient. java в модуле flink-client, но не смог увидеть ни одного примера. о том, как я мог бы использовать это.

Любые примеры того, как кто-то успешно отправил работу через код java, будет принята с благодарностью!

1 Ответ

2 голосов
/ 29 января 2020

Вы можете использовать RestClusterClient для запуска PackagedProgram, который указывает на ваше задание Flink. Если ваша работа принимает некоторые аргументы, вы можете передать их.

Вот пример автономного кластера, работающего на localhost:8081:

// import org.apache.flink.api.common.JobSubmissionResult;
// import org.apache.flink.client.deployment.StandaloneClusterId;
// import org.apache.flink.client.program.PackagedProgram;
// import org.apache.flink.client.program.rest.RestClusterClient;
// import org.apache.flink.configuration.Configuration;
// import org.apache.flink.configuration.JobManagerOptions;
// import org.apache.flink.configuration.RestOptions;

String clusterHost = "localhost";
int clusterPort = 8081;

Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, clusterHost);
config.setInteger(RestOptions.PORT, clusterPort);

String jarFilePath = "/opt/flink/examples/streaming/SocketWindowWordCount.jar";
String[] args = new String[]{ "--port", "9000" };
PackagedProgram packagedProgram = new PackagedProgram(new File(jarFilePath), args);

RestClusterClient<StandaloneClusterId> client =
         new RestClusterClient<StandaloneClusterId>(config, StandaloneClusterId.getInstance());

int parallelism = 1;
JobSubmissionResult result = client.run(packagedProgram,  parallelism);
...