Не нулевое значение из одного потока, равно нулю в другом - PullRequest
0 голосов
/ 02 мая 2018

Я работаю на многопользовательском TCP-сервере на Java. Поскольку сервер должен выполнять множество одновременных операций, я создал поток, используемый для отправки данных моему клиенту через сокет. Это определяется следующим образом:

public class MessageSender implements Runnable{

    private OutputStream output;
    private volatile ArrayList<byte[]> messagesToSend;

    public MessageSender(OutputStream _output)
    {
        output = _output;
        messagesToSend = new ArrayList<byte[]>(0);
    }

    public void run()
    {
        while (true)
        {
           if (messagesToSend.size() > 0)
           {
               sendMessage(messagesToSend.get(0));
               messagesToSend.remove(0);
           }
        }
    }

    public void setMessageToSend(Message message)
    {
        messagesToSend.add(message.getMessageHandler().getBytes());
    }

    public void sendMessage(byte[] bytes)
    {
        if (bytes == null)
        {
            System.out.println("bytes = null");
            return ;
        }
        try
        {
            output.write(bytes, 0, bytes.length);
        }
        catch (IOException e)
        {
            System.out.println(e);
        }
    }
}

Класс Message, который вы можете увидеть в setMessageToSend(), - мой класс. Я также создал другие классы (Channel_test, Channel_status), соответствующие определенным сообщениям, которые наследуются от класса Message.

У меня есть другой поток, в котором я создаю Channel_status, как показано ниже:

eventBus.consumer(msg + port , message -> {
    sender.setMessageToSend(new Channel_status(eParameters));
    channel_test = new Channel_test(eParameters);
    System.out.println("Created Message");
});

Это обработчик Vertx, который вызывается только один раз при получении определенного сообщения от клиента. Это называется, и channel_status создается как ожидалось. В этом же потоке у меня есть таймер Vertx, в котором я отправляю массив байтов, содержащийся в моем классе channel_status, в MessageSender.

vertx.setPeriodic(duration, id -> {
    System.out.println(Arrays.toString(channel_test.getMessageHandler().getBytes()));
    sender.setMessageToSend(channel_test);
});

Как видите, я печатаю его значение, чтобы убедиться, что оно не равно нулю. И это никогда не будет нулевым в таймере. Но когда MessageSender получает его, иногда он становится нулевым (иногда я имею в виду примерно 1/50 раз). Зачем? Это проблема параллелизма?

Спасибо.

Ответы [ 2 ]

0 голосов
/ 02 мая 2018

Создание ArrayList<byte[]> volatile не гарантирует, что при добавлении нового элемента в thread1, thread2 сможет его увидеть.

Вы должны использовать список, поддерживающий одновременность , например:

messagesToSend  = Collections.synchronizedList(new ArrayList<byte[]>(0));

Это позволяет убедиться, что при добавлении нового элемента с помощью setMessageToSend новый элемент будет виден в методе run.

Обновление:

Collections.synchronizedList все еще не может гарантировать потокобезопасность в этом случае. Поскольку в методе run эти три операции не являются атомарными.

if (messagesToSend.size() > 0)
{
    sendMessage(messagesToSend.get(0));  
    messagesToSend.remove(0);
}

A BlockingQueue может решить эту проблему:

private BlockingQueue<byte[]> messagesToSend;

public MessageSender(OutputStream _output) {
    output = _output;
    messagesToSend = new LinkedBlockingQueue<>();
}

public void run() {
    while (true) {
        try {
            sendMessage(messagesToSend.take());  // this will get blocked untill the queue is not empty
        } catch (InterruptedException e) {

        }
    }
}

public void setMessageToSend(Message message) {
    try {
        messagesToSend.put(message.getMessageHandler().getBytes()); // put new element in the queue, check if run method is blocked, wake it up
    } catch (InterruptedException e) {

    }
}
0 голосов
/ 02 мая 2018

Измените свой run метод на

public void run() {
    while (true) {
        synchronized(messagesToSend) {
           while (messagesToSend.isEmpty()) {
               messagesToSend.wait();
           }
        }
        sendMessage(messagesToSend.get(0));
        messagesToSend.remove(0);
   }

}

А setMessageToSend() как

public void setMessageToSend(Message message) {
    messagesToSend.add(message.getMessageHandler().getBytes());
    messagesToSend.notify();
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...