Kafka Streams - Перегрузка памяти потребителем - PullRequest
0 голосов
/ 07 января 2019

Я планирую приложение Spring + Kafka Streams, которое обрабатывает входящие сообщения и сохраняет обновленное внутреннее состояние в результате этих сообщений. Прогнозируется, что это состояние достигнет ~ 500 МБ на уникальный ключ (вероятно, ~ 10 000 уникальных ключей распределены по разделам 2 КБ).

Это состояние обычно должно храниться в памяти для эффективной работы моего приложения, но даже на диске я все еще сталкиваюсь с аналогичной проблемой (хотя и на более позднем этапе масштабирования).

Я планирую развернуть это приложение в динамически масштабируемой среде, такой как AWS, и установлю минимальное количество экземпляров, но я опасаюсь двух ситуаций:

  • При первом запуске (где, возможно, только 1 потребитель запускается первым) он не сможет обработать присвоение всех разделов, поскольку состояние в памяти переполнит доступную память экземпляров.
  • После крупного сбоя (выход из зоны доступности AWS) может оказаться, что 33% потребителей выбраны из группы, а дополнительная загрузка памяти в оставшихся экземплярах может фактически убрать всех, кто остается.

Как люди защищают своих потребителей от использования большего количества разделов, чем они могут обработать, чтобы они не переполняли доступную память / диск?

1 Ответ

0 голосов
/ 07 января 2019

См. Кафка документации .

С 0,11 ...

enter image description here

EDIT

Для вашего второго варианта использования (и он также работает для первого), возможно, вы могли бы реализовать пользовательский PartitionAssignor, который ограничивает количество разделов, назначаемых каждому экземпляру.

Я не пробовал; Я не знаю, как брокер будет реагировать на наличие неназначенных разделов.

EDIT2

Кажется, это работает нормально; но YMMV ...

public class NoMoreThanFiveAssignor extends RoundRobinAssignor {

    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
            Map<String, Subscription> subscriptions) {

        Map<String, List<TopicPartition>> assignments = super.assign(partitionsPerTopic, subscriptions);
        assignments.forEach((memberId, assigned) -> {
            if (assigned.size() > 5) {
                System.out.println("Reducing assignments from " + assigned.size() + " to 5 for " + memberId);
                assignments.put(memberId, 
                        assigned.stream()
                            .limit(5)
                            .collect(Collectors.toList()));
            }
        });
        return assignments;
    }

}

и

@SpringBootApplication
public class So54072362Application {

    public static void main(String[] args) {
        SpringApplication.run(So54072362Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so54072362", 15, (short) 1);
    }

    @KafkaListener(id = "so54072362", topics = "so54072362")
    public void listen(ConsumerRecord<?, ?> record) {
        System.out.println(record);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            for (int i = 0; i < 15; i++) {
                template.send("so54072362", i, "foo", "bar");
            }
        };
    }

}

и

spring.kafka.consumer.properties.partition.assignment.strategy=com.example.NoMoreThanFiveAssignor
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest

и

Reducing assignments from 15 to 5 for consumer-2-f37221f8-70bb-421d-9faf-6591cc26a76a
2019-01-07 15:24:28.288  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 7
2019-01-07 15:24:28.289  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:28.296  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:46.303  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Attempt to heartbeat failed since group is rebalancing
2019-01-07 15:24:46.303  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Revoking previously assigned partitions [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:46.303  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:46.304  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] (Re-)joining group
Reducing assignments from 8 to 5 for consumer-2-c9a6928a-520c-4646-9dd9-4da14636744b
Reducing assignments from 7 to 5 for consumer-2-f37221f8-70bb-421d-9faf-6591cc26a76a
2019-01-07 15:24:46.310  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 8
2019-01-07 15:24:46.311  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:46.315  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:58.324  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Attempt to heartbeat failed since group is rebalancing
2019-01-07 15:24:58.324  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Revoking previously assigned partitions [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:58.324  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:58.324  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] (Re-)joining group
2019-01-07 15:24:58.330  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 9
2019-01-07 15:24:58.332  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-14, so54072362-11, so54072362-5, so54072362-8, so54072362-2]
2019-01-07 15:24:58.336  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so54072362-14, so54072362-11, so54072362-5, so54072362-8, so54072362-2]

Конечно, это приводит к зависанию неназначенных разделов, но похоже, что это то, что вы хотите, пока регион снова не подключится.

...