Google Pub / Sub Subscriber не получает сообщения - PullRequest
0 голосов
/ 28 мая 2018

Во-первых, у меня нет опыта работы с Akka, поэтому я очень плохо отлаживаю это самостоятельно.Я попробовал пример из здесь , и публикация сообщений работает (что означает работу учетных данных), но сообщения не отправляются.Учетной записи службы предоставляются все разрешения.

Мой код выглядит следующим образом, что в основном точно так же, как в примере:

package com.example.google.pubsub

import java.io.FileInputStream
import java.security.spec.PKCS8EncodedKeySpec
import java.security.{KeyFactory, PrivateKey}
import java.util.Base64

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
import akka.stream.alpakka.googlecloud.pubsub.scaladsl.GooglePubSub
import akka.stream.alpakka.googlecloud.pubsub._
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.{Done, NotUsed}
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential

import scala.concurrent.duration._
import scala.concurrent.Future

object SubscriberMain extends App {
  println("#### SUBSCRIBER ####")

  val privateKey: PrivateKey = {
    import java.io.FileInputStream
    val credential = GoogleCredential.fromStream(new FileInputStream("mycredentials.json"))
    val privateKey = credential.getServiceAccountPrivateKey
    privateKey
  }
  val clientEmail = "main-19@weirdproject.iam.gserviceaccount.com"
  val projectId = "weirdproject"
  val apiKey = "xxxx"
  val subscription = "somesubscription"

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()

  val subscriptionSource: Source[ReceivedMessage, NotUsed] =
    GooglePubSub.subscribe(projectId, apiKey, clientEmail, privateKey, subscription)

  val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
    GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription)

  subscriptionSource
    .map { message =>
      val data = message.message.data
      println(s"received a message: $data")
      message.ackId
    }
    .groupedWithin(1000, 1.minute)
    .map(AcknowledgeRequest.apply)
    .to(ackSink)

}

Я обнаружил, что akka.stream.alpakka.googlecloud.pubsub.GooglePubSubSource.createLogic никогда не выполняетсяКажется, это причина, по которой сообщения не выбираются.

1 Ответ

0 голосов
/ 29 мая 2018

У вас есть определение потока, но вы его не запускаете.Вызовите run():

subscriptionSource
  .map { message =>
    val data = message.message.data
    println(s"received a message: $data")
    message.ackId
  }
  .groupedWithin(1000, 1.minute)
  .map(AcknowledgeRequest.apply)
  .to(ackSink)
  .run() // <---

В качестве альтернативы используйте runWith(), это удобный метод, который возвращает материализованное значение Sink:

val result: Future[Done] =
  subscriptionSource
    .map { message =>
      val data = message.message.data
      println(s"received a message: $data")
      message.ackId
    }
    .groupedWithin(1000, 1.minute)
    .map(AcknowledgeRequest.apply)
    .runWith(ackSink)

Подробнее об определении итекущие потоки найдены здесь .

...