Как добиться мьютекса по методу в Котлине и расставить приоритеты одного потока перед другим? - PullRequest
0 голосов
/ 26 февраля 2019

У меня есть две темы кафки my_priorized_topic и my_not_so_priorized_topic.Я хочу иметь мьютекс на EventProcessor.doLogic и всегда устанавливать приоритеты для сообщений дескриптора от my_prioritized_topic перед сообщениями от my_not_so_prioritized_topic

Может кто-нибудь дать мне несколько советов, как решить эту проблему с Kotlin, может быть, с сопрограммами?

class EventProcessor {
  fun doLogic(message: String) {
    ... // code which cannot be parallelized
  }
}
class KafkaConsumers(private val eventProcessor: EventProcessor) {
  @KafkaConsumer(topic = "my_priorized_topic")
  fun consumeFromPriorizedTopic(message: String) {
    eventProcessor.doLogic(message)
  }

  @KafkaConsumer(topic = "my_not_so_priorized_topic")
  fun consumeFromNotSoPrioritizedTopic(message: String) {
    eventProcessor.doLogic(message)  
  }
}

1 Ответ

0 голосов
/ 01 марта 2019

Вы можете создать два Channel для ваших задач с высоким и низким приоритетом.Затем, чтобы получить события из каналов, используйте выражение сопрограммы ' select и поместите канал задачи с высоким приоритетом первым.

Пример (строка является четным):

fun process(value: String) {
    // do what you want with the event
}

suspend fun selectFromHighAndLow(highPriorityChannel: ReceiveChannel<String>, lowPriorityChannel: ReceiveChannel<String>): String =
        select<String> {
            highPriorityChannel.onReceive { value ->
                value
            }
            lowPriorityChannel.onReceive { value ->
                value
            }
        }


val highPriorityChannel = Channel<String>()
val lowPriorityChannel = Channel<String>()
while (true) {
    process(selectFromHighAndLow(highPriorityChannel, lowPriorityChannel))
}

Чтобы отправить материал на эти каналы, вы можете использовать channel.send(event).

...