Не удалось остановить службу Hadoop IPC - PullRequest
0 голосов
/ 05 ноября 2011

Я использую Hadoop IPC для создания службы генерации последовательных номеров, но не могу остановить сервер при выходе из программы. Кто-нибудь может мне помочь?

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;

import dk.aau.cs.cloudetl.common.CEConstants;
import dk.aau.cs.cloudetl.hadoop.fs.FSUtil;

public class SequenceServer extends Thread implements  ClientProtocol {

    Map<String, Integer> seqMap = new HashMap<String, Integer>();
    Configuration conf;
    Server server;
    Path seqFile;
    volatile private boolean running = true;        

    public SequenceServer(Configuration conf) {
        try {
            this.conf = conf;
            this.seqFile = new Path(CEConstants.META_DIR + Path.SEPARATOR
                    + "cloudETL.seq");

            InetAddress addr = InetAddress.getLocalHost();
            server = RPC.getServer(this, addr.getHostName(),CEConstants.SEQ_SERVER_PORT, 5, true, conf);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try {
            readSeqsFromHDFS();
            server.start();


            System.out.println("=============Start==============");
            while(running){
                sleep(5000);
            }
            System.out.println("=============END==============");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }



    private void readSeqsFromHDFS() {
        try {
            FileSystem fs = FileSystem.getLocal(conf);
            if (fs.exists(seqFile)) {
                SequenceFile.Reader reader = new SequenceFile.Reader(fs,
                        seqFile, conf);
                Text key = new Text();
                IntWritable value = new IntWritable();
                while (reader.next(key, value)) {
                    String name = key.toString();
                    int seq = value.get();
                    seqMap.put(name, seq);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void writeSeqsToHDFS() {
        try {
            FileSystem fs = FileSystem.getLocal(conf);
            Path tmp = new Path(seqFile.getParent() + Path.SEPARATOR
                    + "tmp.seq");
            SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
                    tmp, Text.class, IntWritable.class);
            for (Entry<String, Integer> entry : seqMap.entrySet()) {
                String name = entry.getKey();
                int seq = entry.getValue();
                writer.append(new Text(name), new IntWritable(seq));
            }
            writer.close();

            FSUtil.replaceFile(new File(tmp.toString()),
                    new File(seqFile.toString()));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    synchronized public void stopServer() {
        try {
            System.out.println(server.getNumOpenConnections() );
            server.stop();

            writeSeqsToHDFS();
            running = false;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public long getProtocolVersion(String protocol, long clientVersion)
            throws IOException {
        return versionID;
    }

    @Override
    synchronized public IntWritable nextSeq(Text name) {
        String seqName = name.toString();
        if (!seqMap.containsKey(seqName)) {
            seqMap.put(seqName, new Integer(CEConstants.SEQ_INCR_DELTA));
            return new IntWritable(0);
        } else {
            int ret = seqMap.get(seqName);
            seqMap.put(seqName, ret + CEConstants.SEQ_INCR_DELTA);
            return new IntWritable(ret);
        }
    }

    public static void main(String[] args) {
        SequenceServer server = new SequenceServer(new Configuration());
        server.start();

        server.stopServer();
    }
}

У меня есть другая клиентская программа для получения уникального номера. Я не буду публиковать здесь.


Спасибо за ваш ответ. Я знаю проблему, как вы сказали. Тем не менее, моя текущая проблема не в состоянии остановить сервер RPC. Поскольку RPC-сервер работает в режиме демона, даже если я запускаю stop (), он все равно не может выйти. Вы можете попробовать это:

import java.io.IOException;
import java.net.InetAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;

public class Test {
    Server server;

    public Test() {
        try {
            InetAddress addr = InetAddress.getLocalHost();
            server = RPC.getServer(this, addr.getHostName(),16000, 5, true, new Configuration());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start(){
        server.start();
    }

    public void stop(){
        server.stop();
    }

    public static void main(String[] args) {
        Test test = new Test();
        test.start();
        test.stop();
    }
}

Спасибо! Но это все равно не работает. Не могли бы вы попробовать мой пример. Вы просто копируете и сохраняете как Test.java, затем запускаете его. Вы увидите, что он не может выйти из основного потока.

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;

interface MyProtocal extends org.apache.hadoop.ipc.VersionedProtocol {
    public static final long versionID = 1L;

    IntWritable nextSeq(Text name);
}

public class Test implements MyProtocal {
    Server server;

    public Test() {
        try {
            InetAddress addr = InetAddress.getLocalHost();
            server = RPC.getServer(this, addr.getHostName(), 16000, 5, true,
                    new Configuration());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start() {
        server.start();
    }

    public void stop() {
        server.stop();
    }

    @Override
    public long getProtocolVersion(String protocol, long clientVersion)
            throws IOException {
        return versionID;
    }

    @Override
    public IntWritable nextSeq(Text name) {
        return new IntWritable(999);
    }

    static class SEQ {
        MyProtocal client;

        public SEQ() {
            InetAddress addr;
            try {
                addr = InetAddress.getLocalHost();
                client = (MyProtocal) RPC.waitForProxy(MyProtocal.class,
                        MyProtocal.versionID,
                        new InetSocketAddress(addr.getHostName(), 16000),
                        new Configuration());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void print() {
            System.out.println(client.nextSeq(new Text("aa")).get());
        }

        public void stop() {
            RPC.stopProxy(client);
        }
    }

    public static void main(String[] args) {
        Test server = new Test();
        server.start();

        SEQ seq = new SEQ();
        seq.print();
        seq.stop();
        server.stop();
    }
}

1 Ответ

0 голосов
/ 06 ноября 2011

Ваш дизайн сломан. Зачем вам нужно выполнять вашу программу в отдельном потоке?

Он уже работает в основном потоке, а сервер RPC также работает в отдельном потоке.

Мое предложение будет удалить ваши собственные потоки и просто вызвать метод run без цикла while() и остановить сервер после него.

Общее примечание : внедрить Runnable вместо расширения от Thread

Если вам нужно придерживаться вашего бесполезного потока, тогда вызовите ваш основной метод server.join() и вызовите stopServer () с конца вашего run метода.

...