Недавно я проверяю код кафки и тестирую.Я обнаружил странный случай: я печатаю байтовый буфер на записи SocketServer processCompletedReceives, а также печатаю значение на точке журнала sotre следующим образом: запись SocketServer
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
openOrClosingChannel(receive.source) match {
case Some(channel) =>
val header = RequestHeader.parse(receive.payload)
val connectionId = receive.source
val context = new RequestContext(header, connectionId, channel.socketAddress,
channel.principal, listenerName, securityProtocol)
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)
if(header.apiKey() == ApiKeys.PRODUCE){
LogHelper.log("produce request: %v" + java.util.Arrays.toString(receive.payload.array()))
}
...
точка лога
validRecords.records().asScala.foreach { record =>
LogHelper.log("buffer info: value " + java.util.Arrays.toString(record.value().array()))
}
но результат печати другой.и record.value () - это не то, что я передал в клиентском значении, например:
public void run() {
int messageNo = 1;
while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}
результат печати - это не не String messageStr = "Message_" + messageNo;
, что произошло в случае.