какой-то Scala Actor переходит в состояние ожидания, когда одновременно работают 8-10 актеров - PullRequest
2 голосов
/ 27 января 2011

В моей модели около 8-9 актеров Scala. Каждый актер имеет свою очередь на RabbitMQ Server

в методе действия каждого Актера. Он постоянно выводится в очередь как

def act {
    this ! 1
    loop {
      react {
        case 1 => processMessage(QManager.getMessage); this ! 1
      }
    }
  } 

Я метод кролика MQ QManager getMessage

def getMessage: MyObject = {
    getConnection
    val durable = true
    channel.exchangeDeclare(EXCHANGE, "direct", durable)
    channel.queueDeclare(QUEUE, durable, false, false, null)
    channel queueBind (QUEUE, EXCHANGE, _ROUTING_KEY)
    consumer = new QueueingConsumer(channel)
    channel basicConsume (QUEUE, false, consumer)

    var obj = new MyObject
    try {
      val delivery = consumer.nextDelivery
      val msg = new java.io.ObjectInputStream(
        new java.io.ByteArrayInputStream(delivery.getBody)).readObject()
      obj = msg.asInstanceOf[MyObject]
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
    } catch {
      case e: Exception =>logger.error("error in Get Message", e);endConnection
    }
    endConnection
    obj
  }

Все 9 актеров имеют свой собственный тип объекта и собственный QManager

в GetMessage я использую Rabbitmq QueueConsumer

 val delivery = consumer.nextDelivery

и метод nextDelivery возвращает объект, когда он находится в очереди этот метод переводит актера в состояние ожидания

когда я запускаю все 8 актеров, только 4 из них работают нормально, другие не указаны. Я проверил каждого актера, работающего независимо друг от друга, они отлично работают при запуске Alone

Проблема возникает, когда я начинаю больше, чем 4 актера

Есть ли какие-либо проблемы с потоками актеров скалы?

Ответы [ 2 ]

5 голосов
/ 27 января 2011

Отказ от ответственности: я являюсь ПО Akka

Как говорит Рекс, вы заняты ожиданием, перебирая потоки в общем пуле потоков.

Я не знаю,у вас есть возможность протестировать Akka, но у нас есть поддержка потребителей (и производителей) AMQP в качестве участников: Akka-AMQP

Создание сообщений AMQP:

    val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
    val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters), producerId = Some("my_producer"))
producer ! Message("Some simple sting data".getBytes, "some.routing.key")

Использование сообщений AMQP:

val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
val myConsumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing.key", actorOf(new Actor { def receive = {
  case Delivery(payload, _, _, _, _, _) => log.info("Received delivery: %s", new String(payload))
}}), None, Some(exchangeParameters)))

Другой вариант - использовать Akka-Camel для приема и создания сообщений AMQP с актерами

5 голосов
/ 27 января 2011

Все ваши актеры все время бегают; они никогда не делают перерыв. Поскольку актеры разделены между собой общим потоком, это означает, что счастливые актеры-победители бегают все время, а неудачникам не хватает времени вообще. Если вы хотите иметь сущность, которая все время берет для себя весь поток, обычно лучше использовать java Thread или, по крайней мере, использовать receive вместо react. Вы также можете увеличить размер пула актеров, чтобы он соответствовал количеству актеров, но, как правило, если у вас очень большое количество актеров, которые работают постоянно, вам следует более тщательно продумать, как вы структурируете свою программу.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...