Alpakka s3 `multipartUpload` не загружает файлы - PullRequest
0 голосов
/ 21 сентября 2018

У меня вопрос по поводу интеграции alpakka_kafka+alpakka_s3.Alpakka s3 multipartUpload не загружает файлы, когда я использую исходники alpakka kafka.

kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
    bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
    bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore

Однако, как только я добавил .take(100) после kafkaSource.Все работало нормально.

kafkaSource.take(100) ~> kafkaSubscriber.serializer.deserializeFlow ~>     bcast.in
    bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
    bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore

Любая помощь будет принята с благодарностью.Заранее спасибо!

Вот полный фрагмент кода:

// Source
val kafkaSource: Source[(CommittableOffset, Array[Byte]), Consumer.Control] = {
    Consumer
      .committableSource(consumerSettings, Subscriptions.topics(prefixedTopics))
      .map(committableMessage => (committableMessage.committableOffset, committableMessage.record.value))
      .watchTermination() { (mat, f: Future[Done]) =>
        f.foreach { _ =>
          log.debug("consumer source shutdown, consumerId={}, group={}, topics={}", consumerId, group,     prefixedTopics.mkString(", "))
        }

        mat
      }
  }

// Flow
val commitFlow: Flow[CommittableOffset, Done, NotUsed] = {
    Flow[CommittableOffset]
      .groupedWithin(batchingSize, batchingInterval)
      .map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
      .mapAsync(parallelism = 3) { msg =>
        log.debug("committing offset, msg={}", msg)

        msg.commitScaladsl().map { result =>
          log.debug("committed offset, msg={}", msg)
          result
        }
      }
  }

private val kafkaMsgToByteStringFlow = Flow[KafkaMessage[Any]].map(x => ByteString(x.msg + "\n"))

private val kafkaMsgToOffsetFlow = {
    implicit val askTimeout: Timeout = Timeout(5.seconds)
    Flow[KafkaMessage[Any]].mapAsync(parallelism = 5) { elem =>
      Future(elem.offset)
    }
  }


// Sink

val s3Sink = {
      val BUCKET = "test-data"
      s3Client.multipartUpload(BUCKET, s"tmp/data.txt")



// Doesnt' work..... ( no files are showing up on the S3)
kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
        bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
        bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore

// This one works...
kafkaSource.take(100) ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
        bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
        bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore

Ответы [ 2 ]

0 голосов
/ 12 ноября 2018
 private def running: Receive = {
    case Subscribe(subscriberId) =>

      val kafkaSubscriber = new KafkaSubscriber(
        serviceName = "akka_kafka_subscriber",
        group = kafkaConfig.group,
        topics = kafkaConfig.subscriberTopics,
        system = system,
        configurationProperties = Seq(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")
      )

      RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
        import GraphDSL.Implicits._
        val bcast = builder.add(Broadcast[KafkaMessage[Any]](2))

        kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> kafkaSubscriber.filterTypeFlow[Any] ~> bcast.in
        bcast.out(0) ~> kafkaMsgToStringFlow
          .groupedWithin(BATCH_SIZE, BATCH_DURATION)
          .map(group => group.foldLeft(new StringBuilder()) { (batch, elem) => batch.append(elem) })
          .mapAsync(parallelism = 3) { data =>
            self ? ReadyForUpload(ByteString(data.toString()),UUID.randomUUID().toString,subscriberId)
          } ~> Sink.ignore
        bcast.out(1) ~> kafkaMsgToOffsetFlow ~> kafkaSubscriber.commitFlow ~> Sink.ignore
        ClosedShape
      }).withAttributes(ActorAttributes.supervisionStrategy(decider)).run()
      sender ! "subscription started"

    case ready: ReadyForUpload=>
      println("==========================Got ReadyForUpload: " + ready.fileName)
      val BUCKET = "S3_BUCKET"
      Source.single(ready.data).runWith(s3Client.multipartUpload(BUCKET, s"tmp/${ready.fileName}_${ready.subscriberId}.txt"))
      sender() ! "Done"
0 голосов
/ 25 сентября 2018

На самом деле, он загружает.Проблема в том, что вам нужно отправить запрос на завершение на s3, чтобы завершить загрузку, и именно тогда ваш файл будет доступен внутри корзины.Бьюсь об заклад, так как источник kafka без take(n) никогда не прекращает производить данные в нисходящем направлении, приемник никогда не отправляет запрос завершения на s3, потому что поток на самом деле никогда не завершается, поэтому приемник всегда ожидает загрузки дополнительных данных перед завершением запроса.

Нет способа сделать то, что вы хотите, просто загрузив все в один файл, поэтому мой совет: сгруппируйте ваши сообщения kafkaSource и отправьте сжатый массив [байт] в приемник.Трюк заключается в том, что вам нужно создать один приемник для каждого файла, а не использовать только один приемник.

...