Метод Source.from
создает Akka Streams Source
из объекта Iterable
. Поскольку ObjectNode
реализует Iterable<JsonNode>
, путем итерации по его дочерним элементам ваш поток будет иметь один элемент для каждого значения в объекте.
Вы можете создать Source
с одним элементом, используя Source.single
, но в этом случае нет никакой пользы от использования Akka Streams в этом методе. Было бы гораздо проще передать узел непосредственно в consumer.sendJson
, как в этом примере:
while (true) {
JsonNode msg = producer.getNextDataEnvelope();
if (msg == null) {
break;
}
System.out.println(msg.toString());
ObjectNode receivedMsg = getJSONMessage(msg);
consumer.sendJson(receivedMsg);
}
private static ObjectNode getJSONMessage(JsonNode message) {
JsonNode pipelineMsg = message.get(KEYNAME);
return (ObjectNode)pipelineMsg;
}
С другой стороны, если вы действительно хотите воспользоваться преимуществами Akka Streams, это будет иметь больше смысла чтобы превратить producer
в Source<JsonNode>
, consumer
в Sink
или Flow
, и выполнить всю обработку в потоке:
producer
.map(dataEnvelope -> getJSONMessage(dataEnvelope))
.runWith(consumer, system);