Java [Peer-To-Peer]: запускаемый неожиданно останавливается / блокируется - PullRequest
0 голосов
/ 21 марта 2019

Я занимаюсь разработкой простой распределенной книги . Я хочу иметь возможность запускать узлы на разных портах, которые могут взаимодействовать друг с другом. Тогда у каждой программы будет файл, в который она будет записывать вновь обнаруженные узлы.

Сначала в этот файл жестко запрограммированы только самые надежные узлы. Вот что процедурно происходит:

1) Я запускаю новый узел, который запускает HTTP-сервер (я использую com.sun.HttpServer). На сервере есть обработчик GetAddress, который прослушивает запросы, которые идут на указанный URI. Затем он получает IP и PORT (который указан в параметрах запроса URI), получает семафор для файла known_nodes.txt и записывает вновь полученный адрес партнера в этот файл, если он еще не существует, и отправьте содержимое вновь обновленного файла в виде списка json обратно запрашивающей стороне.

2) В моем классе Node (который, как упоминалось ранее, запускается HTTPServer в отдельном потоке), я создаю ScheduledExecutorService и даю ему возможность запуска каждые несколько секунд, задачей которого будет подключение к URL-адресам. присутствует в файле known_nodes.txt и запросить у них known_nodes. Если мы получили узлы, которых ранее не было в нашем файле known_nodes, мы перезаписываем наш файл.

NOW! Если я запускаю узел и пытаюсь запросить его из браузера, все идет по плану - мы получаем запрос, записываем его в наш файл, затем наш исполняемый файл попытается подключиться по адресу, указанному в запросе. Если мы поймали SocketTimeoutException , мы удалим адрес из нашего файла known_nodes.txt.

Проблема возникает, когда я запускаю два узла, работающие, скажем, на портах 8001 и 8002. Обратите внимание, что у каждого узла есть свой собственный файл known_nodes. Что происходит, так это то, что один из узлов прекратит выполнение задачи DiscoverAddresses, а другой - нет. Таким образом, один узел перестал получать запросы.

NB! Узел, который остановит свою запланированную задачу, ВСЕ ЕЩЕ отправит как минимум ОДИН запрос на обнаружение, а затем умрет / заблокируется (?).

Вот код для запускаемой задачи:

    @Override
public void run() {
    log.info("still running ");
    PeerAddressesHolder inactiveNodes = new PeerAddressesHolder();
    ApplicationConfiguration appConf = ApplicationConfiguration.getInstance();

    for (PeerAddress peerAddress : knownNodes.getAddresses()) {
        if (isSameNode(peerAddress)) {
            continue;
        }

        String urlString = String.format("http://%s:%s%s?myport=%d", peerAddress.getIP(), peerAddress.getPort(), Constants.GET_ADDRESS, myPort);
        try {
            StringBuilder result = new StringBuilder();
            URL url = new URL(urlString);
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();

            conn.setConnectTimeout(5000);
            conn.setRequestMethod("GET");

            try (InputStream connInputStream = conn.getInputStream();
                 InputStreamReader ir = new InputStreamReader(connInputStream);
                 BufferedReader br = new BufferedReader(ir)){

                String line;
                while ((line = br.readLine()) != null) {
                    result.append(line).append('\n');
                }
            } catch (Exception e) {
                log.warn("Couldn't read from connection input stream",e);
            }


            PeerAddressesHolder peerAddressesHolder = gson.fromJson(result.toString(), PeerAddressesHolder.class);

            boolean fetchedNew = false;
            for (PeerAddress fetchedAddress : peerAddressesHolder.getAddresses()) {
                if (!isValidAddress(peerAddress)) {
                    log.warn("Peer has sent us a null-address. It will be ignored.");
                    return;
                }
                if (!knownNodes.contains(fetchedAddress)) {
                    knownNodes.addAddress(fetchedAddress);
                    fetchedNew = true;
                }
            }

            if (fetchedNew) {
                FileUtils.writeToFile(appConf.getKnownNodesFilePath(), gson.toJson(knownNodes), false);
            }


        } catch (SocketTimeoutException e) {
            if (appConf.getMostReliableNodes().contains(peerAddress)) {
                log.warn("Most reliable node not available: " + peerAddress);
            } else {
                inactiveNodes.addAddress(peerAddress);
                log.warn("Connection timeout from " + peerAddress + ". It will be removed.");

            }

        } catch (Exception e) {
            log.warn("Couldn't discover new addresses." + e);
        }
    }

    try {
        knownNodes.removeAll(inactiveNodes.getAddresses());
        FileUtils.writeToFile(appConf.getKnownNodesFilePath(), gson.toJson(knownNodes), false);
    } catch (IOException ioe) {
        log.warn("Couldn't write to file after deleting dead node", ioe);
    }
}

А вот как я начинаю это при создании Узла.

public NetworkNode(int port) {
    this.appConf = ApplicationConfiguration.getInstance();
    this.port = port;

    log.info("Starting a new node on port " + port);
    try {
        this.knownNodes = FileUtils.createPeerAddressesList(appConf.getKnownNodesFilePath());
    } catch (Exception e) {
        log.error("Error while trying to construct a list of peer addresses from file content on path: " + appConf.getKnownNodesFilePath());
    }

    scheduledExecutorService = Executors.newScheduledThreadPool(4);
    scheduledExecutorService.scheduleAtFixedRate(new DiscoverAddressesTask(knownNodes, this.port), 3, 4, TimeUnit.SECONDS);

Все методы, связанные с чтением / записью файлов, выполняются с использованием конструкции try-with-resources, поэтому моя первоначальная идея, что работоспособность останавливается из-за некоторых незакрытых потоков, вероятно, недопустима.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...