Как асинхронно обрабатывать карту объектов, поступающих из Kafka в задании, которое выполняется с интервалами - PullRequest
1 голос
/ 12 июля 2020

Пример: Kafka Consumer получает несколько записей об объекте User каждую секунду. входящий Пользователь также имеет параметр «timeInterval».

User: {name: "xyz", age: 23, timeInterval: 30}

на основе «timeInterval». Мне нужно динамически запускать задание, которое выполняется непрерывно в течение указанного интервала. У меня могут быть разные интервалы с объектами User, такими как 10s, 15s 30s.

Первоначальный дизайн:

структура данных:

Map<Integer, HashMap<Integer, ArrayList<User>>> intervalUserMapMap;

В работе , Мне нужно обработать пользователей, сгруппированных по возрасту. Я добавлю всех пользователей на карту на основе интервала и внутреннюю карту на основе возраста.

Поскольку может быть несколько заданий, выполняемых в разные временные интервалы, мне нужно запускать в разных потоках.

Проблема:

Пока выполняются задания, Kafka продолжает подталкивать многих пользователей к программе. HashMaps внутри "intervalUserMapMap" будет обновлен. Это может вызвать проблемы: одна и та же карта используется двумя потоками.

Я могу использовать ConcurrentHashMap, но я боюсь, что если больше ключей будет хешировано в том же ведре, которое будет заблокировано, тогда карта не будет быть обновленным, поскольку потоки будут работать непрерывно в течение всего времени существования приложения.

НУЖНА ПОМОЩЬ

  1. Правильно ли мой дизайн. Если это можно улучшить, предложите.
  2. Если дизайн в порядке, то как мне обновить карту в этой многопоточной модели.

Изменить: Код, который я пробовал

@Component
public class UsersConsumer {

    private Map<Integer, HashMap<Integer, List<User>>> intervalAgeMapMap = new HashMap<>();

    @KafkaListener(groupId = "users", topics = "users", containerFactory = "usersKafkaListenerContainerFactory")
    public void listenGroupUsers(User user) {
        System.out.println(user);
        HashMap<Integer, List<User>> ageUserMap = null;
        if (intervalAgeMapMap.containsKey(user.getInterval())) {
            ageUserMap = intervalAgeMapMap.get(user.getInterval());
            if (ageUserMap.containsKey(user.getAge())) {
                List<User> userList = ageUserMap.get(user.getAge());
                userList.add(user);
            } else {
                List<User> userList = new ArrayList<User>();
                userList.add(user);
                ageUserMap.put(user.getAge(), userList);
            }
        } else {
            ageUserMap = new HashMap<>();
            List<User> userList = new ArrayList<User>();
            userList.add(user);
            ageUserMap.put(user.getAge(), userList);
            intervalAgeMapMap.put(user.getInterval(), ageUserMap);
            CompletableFuture.runAsync(() -> {
                processUsers(user.getInterval());
            });
        }
    }

    public void processUsers(int interval) {
        while (true) {
            HashMap<Integer, List<User>> ageUserListMap = intervalAgeMapMap.get(interval);
            for (Entry<Integer, List<User>> ageUserListSet : ageUserListMap.entrySet()) {
                List<User> userList = ageUserListSet.getValue();
                // process Users
            }
            try {
                Thread.sleep(interval);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Заранее спасибо.

1 Ответ

1 голос
/ 13 июля 2020

Для выполнения какой-либо пакетной работы с Kafka вам не следует "засыпать" поток потребителей, потому что он будет регулярно перебалансировать группу, создавая нагрузку на брокеров.

Итак, нет причин использовать аннотации Spring Kafka Вот. Вам необходимо вручную открывать и закрывать потребителя по вашему установленному расписанию

Вы также можете постоянно sh все задания помещать в приоритетную очередь, если нет ограничений на порядок / обработка идемпотентна

...