Как извлечь функцию, где я могу передать функции в качестве параметров для этих цепных лямбд в Java 8? - PullRequest
0 голосов
/ 10 января 2019

У меня есть кодовый фрагмент в куске кода с использованием Kafka Streams, который постоянно повторяется, я делаю карту, затем группирую по ключу и затем уменьшаю. Это выглядит так:

KTable<ProjectKey, EventConfigurationIdsWithDeletedState> eventConfigurationsByProjectTable = eventConfigurationStream
        .map((key, value) -> {
            Map<String, Boolean> eventConfigurationUpdates = new HashMap<>();
            eventConfigurationUpdates.put(key.getEventConfigurationId(), value != null);
            ProjectKey projectKey = ProjectKey.newBuilder().setId(key.getProjectId()).build();
            EventConfigurationIdsWithDeletedState eventConfigurationIdsWithDeletedState = EventConfigurationIdsWithDeletedState.newBuilder().setEventConfigurations(eventConfigurationUpdates).build();
            return KeyValue.pair(projectKey, eventConfigurationIdsWithDeletedState);
        })
        .groupByKey()
        .reduce((aggValue, newValue) -> {
            Map<String, Boolean> newEventConfigurations = newValue.getEventConfigurations();
            Map<String, Boolean> aggEventConfigurations = aggValue.getEventConfigurations();
            Map.Entry<String, Boolean> newEntry = newEventConfigurations.entrySet().iterator().next();
            if (newEntry.getValue())
                aggEventConfigurations.putAll(newEventConfigurations);
            else
                aggEventConfigurations.remove(newEntry.getKey());
            if (aggEventConfigurations.size() == 0)
                return null;
            return aggValue;
        });

(с eventConfigurationStream типа KStream<EventConfigurationKey, EventConfiguration>)

Еще один пример, который следует этому шаблону. Обратите внимание, что здесь тоже есть фильтр, но это не всегда так:

KTable<ProjectKey, NotificationSettingsTransition> globalNotificationSettingsPerProjectTable = notificationSettingTable.toStream()
        .filter((key, value) -> {
            return key.getEventConfigurationId() == null;
        })
        .map((key, value) -> {
            ProjectKey projectKey = ProjectKey.newBuilder().setId(key.getProjectId()).build();
            Map<String, NotificationSetting> notificationSettingsMap = new HashMap<>();
            notificationSettingsMap.put(getAsCompoundKeyString(key), value);
            NotificationSettingsTransition notificationSettingTransition = NotificationSettingsTransition
                    .newBuilder()
                    .setNotificationSettingCompoundKeyLastUpdate(getAsCompoundKey(key))
                    .setNotificationSettingLastUpdate(value)
                    .setEventConfigurationIds(new ArrayList<>())
                    .setNotificationSettingsMap(notificationSettingsMap)
                    .build();

            return KeyValue.pair(projectKey, notificationSettingTransition);
        })
        .groupByKey()
        .reduce((aggValue, newValue) -> {
            Map<String, NotificationSetting> notificationSettingMap = aggValue.getNotificationSettingsMap();
            String compoundKeyAsString = getAsString(newValue.getNotificationSettingCompoundKeyLastUpdate());
            if (newValue.getNotificationSettingLastUpdate() != null)
                notificationSettingMap.put(compoundKeyAsString, newValue.getNotificationSettingLastUpdate());
            else
                notificationSettingMap.remove(compoundKeyAsString);
            aggValue.setNotificationSettingCompoundKeyLastUpdate(newValue.getNotificationSettingCompoundKeyLastUpdate());
            aggValue.setNotificationSettingLastUpdate(newValue.getNotificationSettingLastUpdate());
            aggValue.setNotificationSettingsMap(notificationSettingMap);
            return aggValue;
        });

(с уведомлениемSettingsTable типа KTable<NotificationSettingKey, NotificationSetting> notificationSettingTable, но также немедленно преобразующимся в KStream.)

Как я могу извлечь это в функцию, где я передаю функцию для кода карты и для кода сокращения, но не должен повторять шаблон .map().groupByKey().reduce()? Уважая, что возвращаемые типы различны и зависят от кода в функции карты и должны оставаться типизированными. В идеале в Java 8, но потенциально возможны более высокие версии. Думаю, у меня есть хорошая идея, как это сделать, когда внутренние типы KeyValuePair в коде карты не изменятся, но я не уверен, как это сделать сейчас.

1 Ответ

0 голосов
/ 10 января 2019

Вы можете параметризовать вашу функцию так, чтобы она принимала две универсальные функции, где типы будут выведены (или установлены явно, если это невозможно) при вызове функции.

Для ввода в map требуется BiFunction<K, V, T>, а для reduce требуется BiFunction<U, U, U>, где:

  • K - это тип key в функции map.
  • V - это тип value в функции map.
  • T - это тип возврата функции map.
  • U - это тип агрегатора, значений и типа возврата функции reduce.

Глядя на KStream и KGroupedStream, вы можете получить более подробную информацию о типе для дальнейшего ограничения функций.

Это сделает вашу пользовательскую функцию примерно такой:

<K, V, T, U> U mapGroupReduce(final KStream<K, V> stream, final BiFunction<K, V, T> mapper, final BiFunction<U, U, U> reducer) {
    return stream.map(mapper).groupByKey().reduce(reducer);
}

Затем вы можете назвать это так:

mapGroupReduce(yourStream,
    (key, value) -> new KeyValue(k, v)),
    (acc, value) -> acc);

В вашем случае вместо BiFunction s вам нужно использовать:

  • KeyValueMapper<K, V, KeyValue<T, U>> для картографа
  • Reducer<U> для редуктора.

Однако, действительно ли это намного лучше, чем просто писать stream.map(M).groupByKey().reduce(R) каждый раз? Более подробная версия является более явной, и, учитывая относительные размеры картографа и редуктора, вы не сильно экономите на этом.

...