У меня была похожая проблема с конвейером потока данных. При попытке запустить его в потоке данных я получил:
java.lang.NullPointerException
org.apache.beam.runners.dataflow.worker.WindmillTimeUtils.harnessToWindmillTimestamp(WindmillTimeUtils.java:58)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:400)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1230)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Они проблема была RabbitMqIO
использует метку времени из сообщений, поступающих из RabbitMq, например, для водяных знаков. Оказалось, что в сообщениях от RabbitMq в моем случае не была установлена временная метка (она не установлена по умолчанию в RabbitMq), и она была нулевой. Я исправил это, подготовив патч для занятий в Apache Beam. Я сделал изменение в конструкторе RabbitMqMessage
. Теперь это выглядит так:
public RabbitMqMessage(String routingKey, QueueingConsumer.Delivery delivery) {
this.routingKey = routingKey;
body = delivery.getBody();
contentType = delivery.getProperties().getContentType();
contentEncoding = delivery.getProperties().getContentEncoding();
headers = delivery.getProperties().getHeaders();
deliveryMode = delivery.getProperties().getDeliveryMode();
priority = delivery.getProperties().getPriority();
correlationId = delivery.getProperties().getCorrelationId();
replyTo = delivery.getProperties().getReplyTo();
expiration = delivery.getProperties().getExpiration();
messageId = delivery.getProperties().getMessageId();
/*
*** IMPORTANT ***
Sometimes timestamp in RabbitMq message properties is 'null'. `RabbitMqIO` uses that value as
watermark, when it is `null` it causes exceptions, 'null' has to be replaced with some value in this case current time
*/
// timestamp = delivery.getProperties().getTimestamp();
timestamp = delivery.getProperties().getTimestamp() == null ? new Date() : delivery.getProperties().getTimestamp();
type = delivery.getProperties().getType();
userId = delivery.getProperties().getUserId();
appId = delivery.getProperties().getAppId();
clusterId = delivery.getProperties().getClusterId();
}
и мне пришлось изменить метод advance()
в RabbitMqIO
, чтобы не использовать свойство timestamp
, которое может быть нулевым:
@Override
public boolean advance() throws IOException {
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000);
if (delivery == null) {
return false;
}
if (source.spec.useCorrelationId()) {
String correlationId = delivery.getProperties().getCorrelationId();
if (correlationId == null) {
throw new IOException(
"RabbitMqIO.Read uses message correlation ID, but received "
+ "message has a null correlation ID");
}
currentRecordId = correlationId.getBytes(StandardCharsets.UTF_8);
}
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
checkpointMark.sessionIds.add(deliveryTag);
current = new RabbitMqMessage(source.spec.routingKey(), delivery);
/*
*** IMPORTANT ***
Sometimes timestamp in RabbitMq messages is 'null' stream in Dataflow fails because
watermark is based on that value, 'null' has to be replaced with some value. `RabbitMqMessage` was changed
to use `new Date()` in this situation and now timestamp can be taken from it
*/
//currentTimestamp = new Instant(delivery.getProperties().getTimestamp());
currentTimestamp = new Instant(current.getTimestamp());
if (currentTimestamp.isBefore(checkpointMark.oldestTimestamp)) {
checkpointMark.oldestTimestamp = currentTimestamp;
}
} catch (Exception e) {
throw new IOException(e);
}
return true;
}
После запуска моего конвейера я снова получил это исключение в другом месте. На этот раз это было вызвано не установленным значением по умолчанию для свойства oldestTimestamp
в RabbitMQCheckpointMark
. Я сделал следующее изменение, и теперь RabbitMQCheckpointMark
выглядит так:
private static class RabbitMQCheckpointMark
implements UnboundedSource.CheckpointMark, Serializable {
transient Channel channel;
/*
*** IMPORTANT *** it should be initialized with some value because without it runner (e.g Dataflow) fails with 'NullPointerException'
Example error:
java.lang.NullPointerException
org.apache.beam.runners.dataflow.worker.WindmillTimeUtils.harnessToWindmillTimestamp(WindmillTimeUtils.java:58)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:400)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1230)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
*/
Instant oldestTimestamp = new Instant(Long.MIN_VALUE);
final List<Long> sessionIds = new ArrayList<>();
@Override
public void finalizeCheckpoint() throws IOException {
for (Long sessionId : sessionIds) {
channel.basicAck(sessionId, false);
}
channel.txCommit();
oldestTimestamp = Instant.now();
sessionIds.clear();
}
}
Все эти изменения исправили мой конвейер, и теперь он работает как положено. Я надеюсь, что вы найдете это полезным.