У меня есть две задачи, которые я выполняю в параллельных потоках, и я стараюсь понять, почему не работает функциональность Watch
.Пожалуйста, дайте мне знать, если у вас есть какие-либо идеи.
Задача 1: Получить статус модулей и показать текущее состояние.
Задача 2: Следите за новыми событиями.Это то, что я пытаюсь понять лучше.
Каждая из этих задач выполняется каждые 30 секунд с scheduledAtFixedRate()
.
Ожидаемое поведение:
Задача 1: Я должен получить список всех модулей с их текущим статусом (это работает).
Задача 2: Я должен ожидать список новых событий по мере их возникновения.
Наблюдаемое поведение:
Задача 1: Работает нормально,Я получаю обновленное состояние модулей каждые 30 секунд.
Задача 2: Он выводит события из первого запроса, но, похоже, он не обновляет новые события.
Код: Задание 1:
@Component
@Scope(value = org.springframework.beans.factory.config.ConfigurableBeanFactory.SCOPE_SINGLETON)
public class Task1 implements Runnable {
private ScheduledExecutorService scheduledExecutorService;
private CommandInvoker commandInvoker;
private static final int INITIAL_DELAY = 15;
private static final int POLLING_INTERVAL = 30;
@Autowired
public Task1 (CommandInvoker commandInvoker,
ScheduledExecutorService scheduledExecutorService) {
this.commandInvoker = commandInvoker;
this.scheduledExecutorService = scheduledExecutorService;
this.scheduledExecutorService.scheduleAtFixedRate(this, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.SECONDS);
}
@Override
public void run() {
System.out.println("===== STARTING TASK 1 POD HEALTH CHECK =======");
commandInvoker.getPodStatus();
}
}
Задание 2:
@Component
@Scope(value = org.springframework.beans.factory.config.ConfigurableBeanFactory.SCOPE_SINGLETON)
public class Task2 implements Runnable {
private ScheduledExecutorService scheduledExecutorService;
private CommandInvoker commandInvoker;
@Autowired
public Task2(CommandInvoker commandInvoker,
ScheduledExecutorService scheduledExecutorService) {
this.commandInvoker = commandInvoker;
this.scheduledExecutorService = scheduledExecutorService;
this.scheduledExecutorService.scheduleAtFixedRate(this, 30, 30, TimeUnit.SECONDS);
}
@Override
public void run() {
System.out.println("===== STARTING TASK 2 EVENT WATCH UPDATE =======");
commandInvoker.getWatchUpdates();
}
}
CommandInvoker:
@Component
public class CommandInvoker {
public void getPodStatus() {
try {
CoreV1Api api = new CoreV1Api();
V1PodList list = api.listPodForAllNamespaces(null,
null, null, null, null, null, null, null, null);
for( V1Pod pod : list.getItems() ) {
// THIS WORKS //
}
} catch ( ApiException e) {
throw new WhateverException ("Failed to handle watchlist event", e);
}
}
public void getWatchUpdates() {
CoreV1Api api = new CoreV1Api();
try {
Watch<V1Event> watch = Watch.createWatch(
apiClient,
api.listEventForAllNamespacesCall(null, null, null, null,
null, null, null, null, true, null, null),
new TypeToken<Watch.Response<V1Event>>() {}.getType());
watch.forEach( response -> {
V1Event event = response.object;
// THIS ONLY DUMPS EVENTS FROM FIRST CALL BUT NEVER GETS EXECUTED AGAIN
});
// I NEVER REACH HERE BUT I DON'T GET ANY UPDATES
} catch ( ApiException e) {
throw new K8ServerException("Failed to handle watchlist event", e);
}
}
}