Потоки Akka Могу ли я сохранить состояние во внешнем объекте? - PullRequest
0 голосов
/ 27 мая 2020

У меня есть издатель с потоком сообщений. И я должен отправлять потребителям только сообщения, основанные на подписчиках потребителей.

На моем локальном компьютере работает 2 ступени без нагрузки, но при нагрузочном тестировании не работает, многие сообщения идут не так.

У меня две версии проблемы:

  1. Я новичок в потоке akka и не вижу в do c немного тот же случай, объединяющий 2 потока
  2. Неправильно использую scala варианты протектора

Что это может быть?

import akka._
import akka.http.scaladsl.model.http2.PeerClosedStreamException
import akka.stream._
import akka.stream.scaladsl._

import scala.collection.mutable
import scala.concurrent.Future
import scala.util.matching.Regex

class RtkBrokerImpl(materializer: Materializer) extends MessageBroker {

  private implicit val mat: Materializer = materializer

  val mutableArray = mutable.Map[String, Source[ConsumeRequest, NotUsed]]()

  val (inboundHub: Sink[ProduceRequest, NotUsed], outboundHub: Source[ConsumeResponse, NotUsed]) =
    MergeHub.source[ProduceRequest]
      .async
      .mapAsyncUnordered(100)(x => Future.successful(ConsumeResponse(x.key, x.payload)))
      .toMat(BroadcastHub.sink)(Keep.both)
      .run()

  val all: RunnableGraph[Source[ProduceRequest, NotUsed]] =
    MergeHub.source[ProduceRequest]
      .toMat(BroadcastHub.sink)(Keep.right)

  val queue = Sink.queue[ProduceRequest](100)


  override def produce(in: Source[ProduceRequest, NotUsed]): Future[ProduceResponse] = {
    in.to(inboundHub).run()
    Future.never
  }

  override def consume(in: Source[ConsumeRequest, NotUsed]): Source[ConsumeResponse, NotUsed] = {
    val patternRepo = new PatternsRepository
    in.runForeach { req =>
      req.action match {
        case ConsumeRequest.Action.SUBSCRIBE => patternRepo.putPatterns(req.keys)
        case ConsumeRequest.Action.UNSUBSCRIBE => patternRepo.dropPatterns(req.keys)
      }
    }

    outboundHub.async.filter(res => patternRepo.checkKey(res.key))
  }

}

class PatternsRepository {

  import RtkBrokerImpl._

  import scala.collection.JavaConverters._

  val concurrentSet: mutable.Set[String] = java.util.concurrent.ConcurrentHashMap.newKeySet[String]().asScala

  def getPatterns():  mutable.Set[String] = {
    concurrentSet
  }

  def checkKey(key: String): Boolean = {
    concurrentSet.contains(key) ||
      concurrentSet.exists(x => isInPattern(key, x))
  }

  def dropPatterns(string: Seq[String]) = {
    string.foreach(concurrentSet.remove)
  }

  def putPatterns(string: Seq[String]) = {
    concurrentSet.addAll(string)
  }
}

object RtkBrokerImpl {

  def isInPattern(key: String, pattern: String): Boolean = {
    if (pattern.exists(x => x == '#' || x == '*')) {
      val splittedPatten = pattern.split(".")
      val splittedKey = key.split(".")
      if (!splittedPatten.contains("#")) {
        if (splittedKey.size != splittedKey.size)
          return false
        splittedPatten.zip(splittedKey)
          .forall { case (key, pat) => if (pat == "*") true else key == pat }
      } else {
        val regExp = pattern.replaceAll(".", "\\.")
          .replaceAll("\\*", "[A-Za-z0-9]+")
          .replaceAll("#", "\\S+")
        new Regex(regExp).matches(key)
      }
    } else {
      key == pattern
    }
  }

}
...