У меня есть кодовый фрагмент в куске кода с использованием Kafka Streams, который постоянно повторяется, я делаю карту, затем группирую по ключу и затем уменьшаю. Это выглядит так:
KTable<ProjectKey, EventConfigurationIdsWithDeletedState> eventConfigurationsByProjectTable = eventConfigurationStream
.map((key, value) -> {
Map<String, Boolean> eventConfigurationUpdates = new HashMap<>();
eventConfigurationUpdates.put(key.getEventConfigurationId(), value != null);
ProjectKey projectKey = ProjectKey.newBuilder().setId(key.getProjectId()).build();
EventConfigurationIdsWithDeletedState eventConfigurationIdsWithDeletedState = EventConfigurationIdsWithDeletedState.newBuilder().setEventConfigurations(eventConfigurationUpdates).build();
return KeyValue.pair(projectKey, eventConfigurationIdsWithDeletedState);
})
.groupByKey()
.reduce((aggValue, newValue) -> {
Map<String, Boolean> newEventConfigurations = newValue.getEventConfigurations();
Map<String, Boolean> aggEventConfigurations = aggValue.getEventConfigurations();
Map.Entry<String, Boolean> newEntry = newEventConfigurations.entrySet().iterator().next();
if (newEntry.getValue())
aggEventConfigurations.putAll(newEventConfigurations);
else
aggEventConfigurations.remove(newEntry.getKey());
if (aggEventConfigurations.size() == 0)
return null;
return aggValue;
});
(с eventConfigurationStream типа KStream<EventConfigurationKey, EventConfiguration>
)
Еще один пример, который следует этому шаблону. Обратите внимание, что здесь тоже есть фильтр, но это не всегда так:
KTable<ProjectKey, NotificationSettingsTransition> globalNotificationSettingsPerProjectTable = notificationSettingTable.toStream()
.filter((key, value) -> {
return key.getEventConfigurationId() == null;
})
.map((key, value) -> {
ProjectKey projectKey = ProjectKey.newBuilder().setId(key.getProjectId()).build();
Map<String, NotificationSetting> notificationSettingsMap = new HashMap<>();
notificationSettingsMap.put(getAsCompoundKeyString(key), value);
NotificationSettingsTransition notificationSettingTransition = NotificationSettingsTransition
.newBuilder()
.setNotificationSettingCompoundKeyLastUpdate(getAsCompoundKey(key))
.setNotificationSettingLastUpdate(value)
.setEventConfigurationIds(new ArrayList<>())
.setNotificationSettingsMap(notificationSettingsMap)
.build();
return KeyValue.pair(projectKey, notificationSettingTransition);
})
.groupByKey()
.reduce((aggValue, newValue) -> {
Map<String, NotificationSetting> notificationSettingMap = aggValue.getNotificationSettingsMap();
String compoundKeyAsString = getAsString(newValue.getNotificationSettingCompoundKeyLastUpdate());
if (newValue.getNotificationSettingLastUpdate() != null)
notificationSettingMap.put(compoundKeyAsString, newValue.getNotificationSettingLastUpdate());
else
notificationSettingMap.remove(compoundKeyAsString);
aggValue.setNotificationSettingCompoundKeyLastUpdate(newValue.getNotificationSettingCompoundKeyLastUpdate());
aggValue.setNotificationSettingLastUpdate(newValue.getNotificationSettingLastUpdate());
aggValue.setNotificationSettingsMap(notificationSettingMap);
return aggValue;
});
(с уведомлениемSettingsTable типа KTable<NotificationSettingKey, NotificationSetting> notificationSettingTable
, но также немедленно преобразующимся в KStream.)
Как я могу извлечь это в функцию, где я передаю функцию для кода карты и для кода сокращения, но не должен повторять шаблон .map().groupByKey().reduce()
? Уважая, что возвращаемые типы различны и зависят от кода в функции карты и должны оставаться типизированными. В идеале в Java 8, но потенциально возможны более высокие версии. Думаю, у меня есть хорошая идея, как это сделать, когда внутренние типы KeyValuePair
в коде карты не изменятся, но я не уверен, как это сделать сейчас.