Обработать JSON как единый объект в потоке Akka, а не повторять его - PullRequest
0 голосов
/ 16 июня 2020

Я пытаюсь связать потребителя с производителем, используя потоки Akka.

while (true) {
            JsonNode msg = producer.getNextDataEnvelope();
            if (msg == null) {
                break;
            }
            System.out.println(msg.toString());
            final Source<JsonNode, NotUsed> source = Source.from(getJSONMessage(msg));
            final Sink<JsonNode, CompletionStage<Done>> sink =
              Sink.foreach(receivedMsg -> consumer.sendJson((ObjectNode) receivedMsg));

            final RunnableGraph<CompletionStage<Done>> runnable = source.toMat(sink, Keep.right());
            final CompletionStage<Done> producerConsumer = runnable.run(system);
            Thread.sleep(1);
        }

        private static ObjectNode getJSONMessage(JsonNode message) {
          JsonNode pipelineMsg = message.get(KEYNAME);
          return (ObjectNode)pipelineMsg;
        }

json, созданный после getJSONMessage, выглядит примерно так:

{
  a: {
  },
  b: {
  }
}

When this JSON goes to consumer it is processing it as 

a: {}

Сначала, а затем

b: {
}

Как мне обработать всю полезную нагрузку JSON сразу в akka-stream, а не повторять ее по JSON полезной нагрузке.

1 Ответ

0 голосов
/ 17 июня 2020

Метод Source.from создает Akka Streams Source из объекта Iterable. Поскольку ObjectNode реализует Iterable<JsonNode>, путем итерации по его дочерним элементам ваш поток будет иметь один элемент для каждого значения в объекте.

Вы можете создать Source с одним элементом, используя Source.single, но в этом случае нет никакой пользы от использования Akka Streams в этом методе. Было бы гораздо проще передать узел непосредственно в consumer.sendJson, как в этом примере:

while (true) {
    JsonNode msg = producer.getNextDataEnvelope();
    if (msg == null) {
        break;
    }
    System.out.println(msg.toString());
    ObjectNode receivedMsg = getJSONMessage(msg);
    consumer.sendJson(receivedMsg);
}

private static ObjectNode getJSONMessage(JsonNode message) {
    JsonNode pipelineMsg = message.get(KEYNAME);
    return (ObjectNode)pipelineMsg;
}

С другой стороны, если вы действительно хотите воспользоваться преимуществами Akka Streams, это будет иметь больше смысла чтобы превратить producer в Source<JsonNode>, consumer в Sink или Flow, и выполнить всю обработку в потоке:

producer
     .map(dataEnvelope -> getJSONMessage(dataEnvelope))
     .runWith(consumer, system);
...