Java, как избежать использования Thread.sleep () в цикле - PullRequest
0 голосов
/ 28 января 2019

С моего основного я начинаю два потока, называемых производителем и потребителем.Оба содержат while(true) цикл.Производственным циклом является UDP-сервер, следовательно, он не требует спящего режима.Моя проблема в потребительском цикле.Потребительский цикл удаляет объекты из связанной очереди и передает их функции для дальнейшей обработки.Исходя из того, что было исследовано, не рекомендуется использовать спящий поток в цикле, так как иногда O / S не освобождается в конце установленного времени.Если я удаляю спящий поток, когда приложение идеально, оно тянет процессор до 20-30%.

class Producer implements Runnable {
    private DatagramSocket dsocket;
    FError fer = new FError();

    int port =1548;
    ConcurrentLinkedQueue<String> queue;

    Producer(ConcurrentLinkedQueue<String> queue){
        this.queue = queue; 
    }

    @Override
    public void run() {

        try {

            // Create a socket to listen on the port.
            dsocket = new DatagramSocket(port);
            // Create a buffer to read datagrams into.
            byte[] buffer = new byte[30000];
            // Create a packet to receive data into the buffer
            DatagramPacket packet = new DatagramPacket(buffer,
            buffer.length);

            while (true) {
                try {

                   // Wait to receive a datagram
                    dsocket.receive(packet);
                    //Convert the contents to a string,
                    String msg = new String(buffer, 0, packet.getLength());

                    int ltr = msg.length();
                     // System.out.println("MSG =" + msg);

                    if(ltr>4)
                    {

                        SimpleDateFormat sdfDate = new SimpleDateFormat  ("yyyy-MM-dd HH:mm:ss");//dd/MM/yyyy

                        Date now = new Date();
                        String strDate = sdfDate.format(now);

                        //System.out.println(strDate);

                        queue.add(msg + "&" + strDate);

                     // System.out.println("MSG =" + msg);
                    }

                  // Reset the length of the packet before reusing it.
                   packet.setLength(buffer.length);

                } catch (IOException e) {
                    fer.felog("svr class", "producer", "producer thread",e.getClass().getName() + ": " + e.getMessage());
                    dsocket.close();
                    break; 
                }
            }

        } catch (SocketException e) {
          fer.felog("svr class", "producer","Another App using the udp port " + port, e.getClass().getName() + ": " + e.getMessage()); 

        }

    }

}

class Consumer implements Runnable {

    String str;  
    ConcurrentLinkedQueue<String> queue;

    Consumer(ConcurrentLinkedQueue<String> queue) {
        this.queue = queue;  
    }

    @Override
    public void run() {

        while (true) {
            try {

                while ((str = queue.poll()) != null) {

                    call(str);  // do further processing

                   }
            } catch (IOException e) {
                ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
                break;
            }

            try {
                Thread.sleep(500);
            } catch (InterruptedException ex) {

                ferpt.felog("svr class", "consumer","sleep", ex.getClass().getName() + ": " + ex.getMessage());
            }

        }

    }

}

Ответы [ 3 ]

0 голосов
/ 28 января 2019

Разрешить потребителю wait() объект, к которому у обоих есть доступ, и разрешить производителю notify() прослушивать этот объект при появлении новых сообщений.Тогда потребитель должен удалить все сообщения, а не одно, как в примере.

0 голосов
/ 28 января 2019

Правильное решение вашей проблемы - использовать очередь блокировки.Это дает вам несколько преимуществ:

  • не тратит время ожидания процессора
  • может иметь ограниченную емкость - представьте, что у вас быстрый производитель, но медленный потребитель -> если очередь неограниченный по размеру, ваше приложение может легко достичь условия OutOfMemory

Вот небольшая демонстрация, с которой вы можете поиграть:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProdConsTest {
    public static void main(String[] args) throws InterruptedException {
        final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        final Runnable producer = () -> {
            for (int i = 0; i < 1000; i++) {
                try {
                    System.out.println("Producing: " + i);
                    queue.put(i);

                    //Adjust production speed by modifying the sleep time
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    //someone signaled us to terminate
                    break;
                }
            }
        };

        final Runnable consumer = () -> {
            while (true) {
                final Integer integer;
                try {
                    //Uncomment to simulate slow consumer:
                    //Thread.sleep(1000);

                    integer = queue.take();
                } catch (InterruptedException e) {
                    //someone signaled us to terminate
                    break;
                }
                System.out.println("Consumed: " + integer);
            }
        };


        final Thread consumerThread = new Thread(consumer);
        consumerThread.start();

        final Thread producerThread = new Thread(producer);
        producerThread.start();

        producerThread.join();
        consumerThread.interrupt();
        consumerThread.join();
    }
}

Теперь раскомментируйте sleep() вПотребитель и наблюдать, что происходит с приложением.Если бы вы использовали решение на основе таймера, такое как предложенное ScheduledExecutorService, или вы были заняты ожиданием, то при быстром продюсере очередь росла бы бесконтрольно и в конечном итоге приводила к сбою приложения

0 голосов
/ 28 января 2019

Вместо Consumer extend Runnable вы можете изменить свой код так, чтобы он включал ScheduledExecutorService, который запускает опрос очереди каждые полсекунды вместо того, чтобы поток спал.Примером этого будет

public void schedule() {
    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    executor.scheduleAtFixedRate(() -> {
        String str;
        try {
            while ((str = queue.poll()) != null) {
                call(str);  // do further processing
            }
        } catch (IOException e) {
            ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
        }
    }, 0, 500, TimeUnit.MILLISECONDS);
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...