Получить количество разделов для топи c перед созданием прямого потока с использованием kafka и spark streaming? - PullRequest
0 голосов
/ 17 марта 2020

У меня есть следующий код, который создает прямой поток, используя разъем Kafka для искры.

public abstract class MessageConsumer<T> 
{
    public JavaInputDStream<ConsumerRecord<String, T>> createConsumer(final JavaStreamingContext jsc, 
        final Collection<String> topics, final String servers)
    {
        return KafkaUtils.createDirectStream(
            jsc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, T>Subscribe(topics,
                ConsumerUtils.getKafkaParams(servers, getGroupId(), getDeserializerClassName())));
    }

    protected abstract String getDeserializerClassName();

    protected abstract String getGroupId();
}

Это работает нормально, но теперь я хочу изменить логи c, чтобы потребитель получал данные из указанного c раздела topi c, вместо того, чтобы позволить Кафке решить, какой раздел для потребления. Я делаю это с помощью того же алгоритма, который стандартный разделитель kafka использует для определения того, в какой раздел отправить сообщение, на основе ключа DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;. Затем я просто назначаю своего потребителя этому разделу. Чтобы это работало, мне нужно знать общее количество разделов, доступных для topi c. Однако я не знаю, как получить эту информацию с помощью API потоковой передачи kafka / spark.

Мне удалось настроить ее для работы с другими частями моего приложения, которые не используют Spark, но мне неясно о том, как этого добиться при использовании Spark. Единственный способ добиться этого - создать другого потребителя до создания прямого потока и использовать его для получения общего количества разделов, а затем закрыть этого потребителя. Ниже приведен код этой реализации:

public abstract class MessageConsumer<T> 
{
    public JavaInputDStream<ConsumerRecord<String, T>> createConsumer(final JavaStreamingContext jsc, 
        final String topic, final String servers, final String groundStation)
    {
        final Properties props = ConsumerUtils.getKafkaParams(servers, getGroupId(), getDeserializerClassName());
        final Consumer<String, T> tempConsumer = new KafkaConsumer<>(props);
        final int numPartitions = tempConsumer.partitionsFor(topic).size();
        final int partition = calculateKafkaPartition(groundStation.getBytes(), numPartitions);
        final TopicPartition topicPartition = new TopicPartition(topic, partition);
        tempConsumer.close();

        return KafkaUtils.createDirectStream(
            jsc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, T>Assign(Collections.singletonList(topicPartition),
                ConsumerUtils.getKafkaParams(servers, getGroupId(), getDeserializerClassName())));
    }

    protected abstract String getDeserializerClassName();

    protected abstract String getGroupId();

    private static int calculateKafkaPartition(final byte[] keyBytes, final int numberOfPartitions)
    {
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numberOfPartitions;
    }
}

Мне это не кажется правильным, наверняка есть лучший способ сделать это?

1 Ответ

1 голос
/ 17 марта 2020

Вы бы использовали AdminClient Кафки для описания топи c. Нет Spark API для такой информации

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...