Как отправить пакетные данные с производителем Spring Kafka - PullRequest
1 голос
/ 21 октября 2019

В настоящее время у меня есть такой код:

KafkaTemplate<String, String> kafkaTemplate;

List<Pet> myData;

for(Pet p: myData) {
  String json = objectWriter.writeValueAsString(p)
  kafkaTemplate.send(topic, json)
}

, поэтому каждый элемент списка отправляется один за другим. Как мне отправить весь список сразу?

Ответы [ 2 ]

1 голос
/ 22 октября 2019

Таким образом, прямой способ отправки массовых сообщений на kafka напрямую с помощью KafkaTemplate или KafkaProducer отсутствует. Они не используют какой-либо метод, который принимает List объектов и отправляет их индивидуально в разные разделы.

Как производитель kafka отправляет сообщения в kafka?

KafkaProducer

Производитель Kafka создает пакет записей и затем отправляет все эти записи сразу, для получения дополнительной информации

Производитель состоит из пулабуферного пространства, содержащего записи, которые еще не были переданы на сервер, а также фоновый поток ввода-вывода, который отвечает за преобразование этих записей в запросы и их передачу в кластер.

Отправка () метод является асинхронным. При вызове он добавляет запись в буфер ожидающих отправки записей и сразу же возвращает. Это позволяет производителю объединять отдельные записи для повышения эффективности.

Асинхронная отправка

Пакетирование является одним из основных факторов эффективности, и дляПри включении пакетирования производитель Kafka будет пытаться накапливать данные в памяти и отправлять большие пакеты в одном запросе. Пакетная обработка может быть сконфигурирована так, чтобы накапливать не более фиксированного количества сообщений и ожидать не более определенной фиксированной задержки (скажем, 64 КБ или 10 мс). Это позволяет аккумулировать больше байтов для отправки и несколько больших операций ввода-вывода на серверах. Эта буферизация настраивается и дает механизм для компенсации небольшого дополнительного времени ожидания для лучшей пропускной способности.

Поскольку вы используете spring-kafka, вы можете отправлять List<Objects>, но здесь вы отправляете JSONArray из JSONObject вместо каждого JSONObject для раздела темы

public KafkaTemplate<String, List<Object>> createTemplate() {

        Map<String, Object> senderProps = senderProps();
ProducerFactory<Integer, String> pf =
          new DefaultKafkaProducerFactory<String, List<Object>>(senderProps);
        KafkaTemplate<String, List<Object>> template = new KafkaTemplate<>(pf);
return template;

 }

 public Map<String, Object> producerProps() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
       return props;

 }

KafkaTemplate<String, List<Object>> kafkaTemplate;
0 голосов
/ 22 октября 2019

Используя KafkaTemplate, на который уже ответил Deadpool, существует другой подход, состоящий в том, чтобы просто искать объект в байтовом массиве и отправлять весь объект.

Это не лучший подход, так как его отвергнутая лучшая практика Kafka - распределять и распараллеливать как можно больше. ,Поэтому в общем случае распространяйте сообщения и позволяйте производителю использовать буфер пула и разделы для распараллеливания. Но иногда нам может понадобиться конкретное использование ........

// Вы можете использовать любой список объектов, я просто использую String, но может быть улучшен для любого Object, поскольку String - ничтоно объект..Если вы используете Pojo, его можно преобразовать в строку JSON и передать в виде списка строк json.

public byte[] searlizedByteArray(List<String> listObject) throws IOException {

            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutput out = null;
            byte[] inByteArray = null;
            try {
                out = new ObjectOutputStream(bos);
                out.writeObject(listObject);
                out.flush();
                inByteArray = bos.toByteArray();
            } finally {
                if (bos != null)
                    bos.close();

            }
            return inByteArray;
        }

Обнулить массив байтов [] в список объектов

public List<String> desearlizedByteArray(byte[] byteArray) throws IOException, ClassNotFoundException {
    ByteArrayInputStream bis = new ByteArrayInputStream(byteArray);
    ObjectInput in = null;
    List<String> returnList=null;
    try {
      in = new ObjectInputStream(bis);
      List<String> o = (List<String>) in.readObject(); 

     for (String string : o) {
        System.out.println("==="+o);
    }

    } finally {
      try {
        if (in != null) {
          in.close();
        }
      } catch (IOException ex) {
        // ignore close exception
      }
    }
    return returnList;

}

Обратите внимание, что мы используем VALUE_SERIALIZER_CLASS_CONFIG в качестве ByteArraySerializer

public void publishMessage() throws Exception {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
//You might to increase buffer memory and request size in case of large size,
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "");
        properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "");

        Producer<String, byte[]> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, byte[]>(
                properties);
        try {
            List asl=new ArrayList<String>();
            asl.add("test1");
            asl.add("test2");

            byte[] byteArray = searlizedByteArray(asl);
            ProducerRecord producerRecord = new ProducerRecord<String, byte[]>("testlist", null,
                    byteArray);

            producer.send(producerRecord).get();

        } finally {
            if (producer != null) {
                producer.flush();
                producer.close();
            }

        }

    }

Наконец, в потребительском ByteArrayDeserializer будет работать

public void consumeMessage() {

    Properties properties = new Properties();

    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
    properties.setProperty("key.deserializer", StringDeserializer.class.getName());
    properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
    properties.setProperty("group.id", "grp_consumer");
    properties.put("auto.offset.reset", "earliest");


    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(properties);
    consumer.subscribe(Arrays.asList("testlist"));

    while (true) {
        ConsumerRecords<String, byte[]> records = consumer.poll(100);
        for (ConsumerRecord<String, byte[]> record : records) {


        }
    }



}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...