Пример: Kafka Consumer получает несколько записей об объекте User каждую секунду. входящий Пользователь также имеет параметр «timeInterval».
User: {name: "xyz", age: 23, timeInterval: 30}
на основе «timeInterval». Мне нужно динамически запускать задание, которое выполняется непрерывно в течение указанного интервала. У меня могут быть разные интервалы с объектами User, такими как 10s, 15s 30s.
Первоначальный дизайн:
структура данных:
Map<Integer, HashMap<Integer, ArrayList<User>>> intervalUserMapMap;
В работе , Мне нужно обработать пользователей, сгруппированных по возрасту. Я добавлю всех пользователей на карту на основе интервала и внутреннюю карту на основе возраста.
Поскольку может быть несколько заданий, выполняемых в разные временные интервалы, мне нужно запускать в разных потоках.
Проблема:
Пока выполняются задания, Kafka продолжает подталкивать многих пользователей к программе. HashMaps внутри "intervalUserMapMap" будет обновлен. Это может вызвать проблемы: одна и та же карта используется двумя потоками.
Я могу использовать ConcurrentHashMap
, но я боюсь, что если больше ключей будет хешировано в том же ведре, которое будет заблокировано, тогда карта не будет быть обновленным, поскольку потоки будут работать непрерывно в течение всего времени существования приложения.
НУЖНА ПОМОЩЬ
- Правильно ли мой дизайн. Если это можно улучшить, предложите.
- Если дизайн в порядке, то как мне обновить карту в этой многопоточной модели.
Изменить: Код, который я пробовал
@Component
public class UsersConsumer {
private Map<Integer, HashMap<Integer, List<User>>> intervalAgeMapMap = new HashMap<>();
@KafkaListener(groupId = "users", topics = "users", containerFactory = "usersKafkaListenerContainerFactory")
public void listenGroupUsers(User user) {
System.out.println(user);
HashMap<Integer, List<User>> ageUserMap = null;
if (intervalAgeMapMap.containsKey(user.getInterval())) {
ageUserMap = intervalAgeMapMap.get(user.getInterval());
if (ageUserMap.containsKey(user.getAge())) {
List<User> userList = ageUserMap.get(user.getAge());
userList.add(user);
} else {
List<User> userList = new ArrayList<User>();
userList.add(user);
ageUserMap.put(user.getAge(), userList);
}
} else {
ageUserMap = new HashMap<>();
List<User> userList = new ArrayList<User>();
userList.add(user);
ageUserMap.put(user.getAge(), userList);
intervalAgeMapMap.put(user.getInterval(), ageUserMap);
CompletableFuture.runAsync(() -> {
processUsers(user.getInterval());
});
}
}
public void processUsers(int interval) {
while (true) {
HashMap<Integer, List<User>> ageUserListMap = intervalAgeMapMap.get(interval);
for (Entry<Integer, List<User>> ageUserListSet : ageUserListMap.entrySet()) {
List<User> userList = ageUserListSet.getValue();
// process Users
}
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Заранее спасибо.