что в кафке значение поля DefaultRecord - PullRequest
0 голосов
/ 19 февраля 2019

Недавно я проверяю код кафки и тестирую.Я обнаружил странный случай: я печатаю байтовый буфер на записи SocketServer processCompletedReceives, а также печатаю значение на точке журнала sotre следующим образом: запись SocketServer

  private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
  try {
    openOrClosingChannel(receive.source) match {
      case Some(channel) =>
        val header = RequestHeader.parse(receive.payload)
        val connectionId = receive.source
        val context = new RequestContext(header, connectionId, channel.socketAddress,
          channel.principal, listenerName, securityProtocol)
        val req = new RequestChannel.Request(processor = id, context = context,
          startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)

        if(header.apiKey() == ApiKeys.PRODUCE){
          LogHelper.log("produce request: %v" + java.util.Arrays.toString(receive.payload.array()))
        }

...

точка лога

validRecords.records().asScala.foreach { record =>
    LogHelper.log("buffer info: value " + java.util.Arrays.toString(record.value().array()))
}

но результат печати другой.и record.value () - это не то, что я передал в клиентском значении, например:

    public void run() {
    int messageNo = 1;
    while (true) {
        String messageStr = "Message_" + messageNo;
        long startTime = System.currentTimeMillis();
        if (isAsync) { // Send asynchronously
            producer.send(new ProducerRecord<>(topic,
                messageNo,
                messageStr), new DemoCallBack(startTime, messageNo, messageStr));
        } else { // Send synchronously
            try {
                producer.send(new ProducerRecord<>(topic,
                    messageNo,
                    messageStr)).get();
                System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        ++messageNo;
    }
}

результат печати - это не не String messageStr = "Message_" + messageNo;, что произошло в случае.

1 Ответ

0 голосов
/ 22 февраля 2019

сделано.Я пишу код следующим образом:

public class KVExtractor {
    private static final Logger logger = LoggerFactory.getLogger(KVExtractor.class);

    public static Map.Entry<byte[], byte[]> extract(Record record) {
        if (record.hasKey() && record.hasValue()) {
            byte[] key = new byte[record.key().limit()];
            record.key().get(key);

            byte[] value = new byte[record.value().limit()];
            record.value().get(value);

            System.out.println("key : " + new String(key) + " value: " +  new String(value));
            return new AbstractMap.SimpleEntry<byte[], byte[]>(key, value);
        }else if(record.hasValue()){
            // illegal impl
            byte[] data = new byte[record.value().limit()];
            record.value().get(data);
            System.out.println("no key but with value : " + new String(data));
        }
        return null;
    }
}

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...