Потребитель Spring Kafka намного медленнее в пакетном режиме по сравнению с облачным потоком Spring - PullRequest
0 голосов
/ 06 июля 2019

Spring Kafka (2.2.2.RELEASE), при получении записей партиями мы видим проблему с производительностью.

Используя облачный поток Spring, мы можем потреблять около 80 тыс. Об / мин, то же самое с Spring Kafka, использующим партии (с размером партии 50 или 500), мы можем потреблять ~ 8 тыс. Об / мин.

Есть ли какие-либо проблемы с производительностью в Spring Kafka с пакетами или у нас отсутствует какая-либо конфигурация, которая может это точно настроить?

Пружинный код Кафка: ConcurrentKafkaConsumerListener:

@EnableKafka
@Configuration
public class ConcurrentKafkaConsumerListenerConfiguration {
  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SomeResourceDeserialiser.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
    props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
    props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return props;
  }

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);

    return factory;
  }

Потребитель:

@Slf4j
@Component
@Profile(Profiles.SOME)
public class SomeKafkaConsumer {

  @Value("${li.enable-api}")
  private boolean shouldMakeAPICall;

  @KafkaListener(topics = "SomeTopic", id = "batch-listener-test-0001")
  public void receiveData(@Payload List<SomeResource> someResources,
      Acknowledgment acknowledgment,
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
      @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    acknowledgment.acknowledge();
}
}

Deserialiser:

public class SomeResourceDeserialiser implements Deserializer<SomeResource> {

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
  }

  @Override
  public SomeResource deserialize(String topic, byte[] data) {
    if (data != null) {
      try {
        ObjectMapper objectMapper = new ObjectMapper();
      objectMapper.registerModule(new JavaTimeModule());
      objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
      objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

        return objectMapper.readValue(data, SomeResource.class);
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
    return null;
  }

  @Override
  public void close() {

  }
}

Код весеннего облачного потока:

application.yml

spring:
  cloud:
    stream:
      kafka:
        bindings:
          some-consumer-channel:
            consumer:
              autoCommitOffset: false
        binder:
          brokers: broker:9092,localhost:29092
          autoCreateTopics: false
          autoAddPartitions: false
      bindings:
        some-consumer-channel:
          destination: topics

Потребитель:

@EnableBinding(SomeStream.class)
@Slf4j
@AllArgsConstructor
@Component
@Profile(Profiles.Some)
public class SomeKafkaConsumer {

  @StreamListener(SomeStream.SOME_CONSUMER_CHANNEL)
  public void receiveData(Message<SomeResource> someResource) {
}
}

SomeStream:

public interface SomeStream {

  @java.lang.SuppressWarnings("squid:S1214")
  String SOME_CONSUMER_CHANNEL = "some-consumer-channel";

  @Input(SOME_CONSUMER_CHANNEL)
  SubscribableChannel someConsumerChannel();
}

SomeResource:

@Builder
@AllArgsConstructor
@Getter
@Setter
@EqualsAndHashCode(callSuper = false)
@Slf4j
public class SomeResource {

  @NotNull
  @NotEmpty
  private String someId;

  @NotNull
  @NotEmpty
  private String someotherId;

  @NotNull
  @NotEmpty
  private String state;

  @NotNull
  @NotEmpty
  private String someanotherId;

  @NotNull
  @NotEmpty
  private String market;

  @NotNull
  @NotEmpty
  private int quantity;

  @NotNull
  @NotEmpty
  private ZonedDateTime lastUpdatedTimestamp;

  @NotNull
  @NotEmpty
  private String traceId;

  public SomeResource() {
    // Default constructor
  }
}
...