Я пытаюсь воспроизвести решение старой школы в Котлине для классической проблемы Consumer-Producer с несколькими потоками и общим пространством памяти. В Java я бы использовал синхронизированные методы для доступа к общему пространству. Однако в Kotlin кажется, что @Synchronized
аннотированный метод выбрасывает IllegalMonitorStateException
. Я ожидал, что аннотированные методы должны вести себя точно так же, как в Java , но, похоже, это не так. Я решил проблему с функцией synchronized(){}
, но все еще озадачен тем, что @Synchronized
не работает. Почему это?
В следующем коде Producer «создает» новое значение, увеличивая счетчик (Long) внутри SynchronizedBox
, и Consumer читает это значение, а затем выводит его на консоль.
Kotlin MessageBox, который не работает
package concurrency.producer_consumer
class MessageBox(var message: Long = 0): SynchronizedBox {
private val lock = Object()
private var empty = true
@Synchronized
override fun syncIncrement() {
while (!empty) {
lock.wait()
}
message++
empty = false
lock.notifyAll()
}
@Synchronized
override fun readValue(): Long {
while (empty) {
lock.wait()
}
val readValue = message
empty = true
lock.notifyAll()
return readValue
}
}
Вариант Java, который работает:
package concurrency.producer_consumer;
public class JBox implements SynchronizedBox {
private long value = 0;
private boolean empty = true;
@Override
public synchronized void syncIncrement() {
while (!empty) {
try {
wait();
} catch (InterruptedException e) {
}
}
value++;
empty = false;
notifyAll();
}
@Override
public synchronized long readValue() {
while (empty) {
try {
wait();
} catch (InterruptedException e) {}
}
empty = true;
return value;
}
}
Версия Kotlin, которая действительно работает:
package concurrency.producer_consumer
class MessageBox(var message: Long = 0): SynchronizedBox {
private val lock = Object()
private var empty = true
override fun syncIncrement() {
synchronized(lock) {
while (!empty) {
lock.wait()
}
message++
empty = false
lock.notifyAll()
}
}
override fun readValue(): Long {
synchronized(lock) {
while (empty) {
lock.wait()
}
empty = true
lock.notifyAll()
return message
}
}
}
Остальной код:
Потребитель :
пакет concurrency.producer_consumer
class Consumer(private val messageBox: SynchronizedBox): Runnable {
override fun run() {
println("consumer thread: ${Thread.currentThread().id}: started")
while (true) {
println("consumer: ${messageBox.readValue()}")
Thread.sleep(1_000)
}
}
}
Производитель
class Producer(private val messageBox: SynchronizedBox): Runnable {
override fun run() {
println("producer thread: ${Thread.currentThread().id}: started")
while (true) {
messageBox.syncIncrement()
Thread.sleep(1_000)
}
}
}
Интерфейс
package concurrency.producer_consumer
interface SynchronizedBox {
fun syncIncrement()
fun readValue(): Long
}
Launcher
package concurrency.producer_consumer
fun main() {
val box: SynchronizedBox = MessageBox()
val producer1 = Producer(box)
val consumer = Consumer(box)
val threadP1 = Thread(producer1)
val threadC = Thread(consumer)
threadP1.start()
threadC.start()
}