У меня есть следующий код, который создает прямой поток, используя разъем Kafka для искры.
public abstract class MessageConsumer<T>
{
public JavaInputDStream<ConsumerRecord<String, T>> createConsumer(final JavaStreamingContext jsc,
final Collection<String> topics, final String servers)
{
return KafkaUtils.createDirectStream(
jsc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, T>Subscribe(topics,
ConsumerUtils.getKafkaParams(servers, getGroupId(), getDeserializerClassName())));
}
protected abstract String getDeserializerClassName();
protected abstract String getGroupId();
}
Это работает нормально, но теперь я хочу изменить логи c, чтобы потребитель получал данные из указанного c раздела topi c, вместо того, чтобы позволить Кафке решить, какой раздел для потребления. Я делаю это с помощью того же алгоритма, который стандартный разделитель kafka использует для определения того, в какой раздел отправить сообщение, на основе ключа DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
. Затем я просто назначаю своего потребителя этому разделу. Чтобы это работало, мне нужно знать общее количество разделов, доступных для topi c. Однако я не знаю, как получить эту информацию с помощью API потоковой передачи kafka / spark.
Мне удалось настроить ее для работы с другими частями моего приложения, которые не используют Spark, но мне неясно о том, как этого добиться при использовании Spark. Единственный способ добиться этого - создать другого потребителя до создания прямого потока и использовать его для получения общего количества разделов, а затем закрыть этого потребителя. Ниже приведен код этой реализации:
public abstract class MessageConsumer<T>
{
public JavaInputDStream<ConsumerRecord<String, T>> createConsumer(final JavaStreamingContext jsc,
final String topic, final String servers, final String groundStation)
{
final Properties props = ConsumerUtils.getKafkaParams(servers, getGroupId(), getDeserializerClassName());
final Consumer<String, T> tempConsumer = new KafkaConsumer<>(props);
final int numPartitions = tempConsumer.partitionsFor(topic).size();
final int partition = calculateKafkaPartition(groundStation.getBytes(), numPartitions);
final TopicPartition topicPartition = new TopicPartition(topic, partition);
tempConsumer.close();
return KafkaUtils.createDirectStream(
jsc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, T>Assign(Collections.singletonList(topicPartition),
ConsumerUtils.getKafkaParams(servers, getGroupId(), getDeserializerClassName())));
}
protected abstract String getDeserializerClassName();
protected abstract String getGroupId();
private static int calculateKafkaPartition(final byte[] keyBytes, final int numberOfPartitions)
{
return Utils.toPositive(Utils.murmur2(keyBytes)) % numberOfPartitions;
}
}
Мне это не кажется правильным, наверняка есть лучший способ сделать это?