Как я могу сделать внешние методы прерываемыми? - PullRequest
30 голосов
/ 28 декабря 2010

Проблема

Я запускаю несколько вызовов какого-либо внешнего метода через ExecutorService . Я хотел бы иметь возможность прерывать эти методы, но, к сожалению, они сами не проверяют флаг прерывания. Можно ли каким-либо образом заставить эти методы вызывать исключение?

Мне известно, что выбрасывание исключения из произвольного местоположения потенциально опасно , в моем конкретном случае я готов воспользоваться этим шансом и готов разобраться с последствиями.

Подробнее

Под «внешним методом» я подразумеваю некоторые методы, которые приходят из внешней библиотеки, и я не могу изменить ее код (ну, я могу, но это сделает его кошмаром обслуживания, когда будет выпущена новая версия).

Внешние методы вычислительно дороги, не связаны с вводом-выводом, поэтому они не реагируют на регулярные прерывания, и я не могу принудительно закрыть канал, сокет или что-то еще. Как я упоминал ранее, они также не проверяют флаг прерывания.

Код концептуально что-то вроде:

// my code
public void myMethod() {
    Object o = externalMethod(x);
}

// External code
public class ExternalLibrary {
    public Object externalMethod(Object) {
        innerMethod1();
        innerMethod1();
        innerMethod1();
    }

    private void innerMethod1() {
        innerMethod2();
        // computationally intensive operations
    }

    private void innerMethod2() {
        // computationally intensive operations
    }
}

Что я пробовал

Thread.stop() теоретически будет делать то, что я хочу, но не только не рекомендуется, но и доступно только для реальных потоков, в то время как я работаю с задачами исполнителя (которые могут также делиться потоками с будущие задачи, например при работе в пуле потоков). Тем не менее, если лучшего решения не будет найдено, я преобразую свой код в устаревшие потоки и использую этот метод.

Еще один вариант, который я пробовал, - пометить myMethod() и подобные методы специальной аннотацией «Interruptable», а затем использовать AspectJ (в котором я, по общему признанию, новичок) для перехвата всех вызовов методов - что-то вроде:

@Before("call(* *.*(..)) && withincode(@Interruptable * *.*(..))")
public void checkInterrupt(JoinPoint thisJoinPoint) {
    if (Thread.interrupted()) throw new ForcefulInterruption();
}

Но withincode не является рекурсивным для методов, вызываемых соответствующими методами, поэтому мне придется редактировать эту аннотацию во внешнем коде.

Наконец, это похоже на мой предыдущий вопрос - хотя заметное отличие состоит в том, что теперь я имею дело с внешней библиотекой.

Ответы [ 8 ]

5 голосов
/ 29 декабря 2010

Мне приходят в голову следующие странные идеи:

  • Использование библиотеки модификации байт-кода , такой как Javassist, для введения проверок прерываний в различных точках внутри байт-кода,Просто в начале методов может быть недостаточно, поскольку вы упоминаете, что эти внешние методы не являются рекурсивными, поэтому вы можете принудительно остановить их в любой момент.Выполнение этого на уровне байтового кода также сделало бы его очень отзывчивым, например, даже если внешний код выполняется внутри цикла или что-то еще, можно было бы ввести проверки прерываний.Однако это приведет к дополнительным издержкам, поэтому общая производительность будет ниже.
  • Запуск отдельных процессов (например, отдельных виртуальных машин) для внешнего кода.Прерывание процессов может быть намного проще для кода, чем другие решения.Недостатком является то, что вам потребуется какой-то канал связи между внешним кодом и вашим кодом, например, IPC, сокеты и т. Д. Вторым недостатком является то, что вам нужно гораздо больше ресурсов (ЦП, память) для запуска новых виртуальных машин, и это может бытьдля конкретной среды.Это будет работать, если вы запустите пару задач, используя внешний код, но не сотни задач.Кроме того, производительность пострадает, но само вычисление будет так же быстро, как оригинал.Процессы могут быть принудительно остановлены с помощью java.lang.Process.destroy ()
  • С помощью специального SecurityManager , который выполняет проверку прерываний для каждого из методов checkXXX.Если внешний код каким-то образом вызывает привилегированные методы, вам может быть достаточно прервать работу в этих местах.Примером может служить java.lang.SecurityManager.checkPropertyAccess (String), если внешний код периодически читает системное свойство.
4 голосов
/ 29 декабря 2010

Это решение не простое, но оно может работать: используя Javassist или CGLIB, вы можете вставить код в начало каждого внутреннего метода (те, которые, вероятно, вызываются методом main run ()), чтобы проверить, Поток жив или какой-то другой флаг (если это какой-то другой флаг, вам также нужно добавить его вместе с методом для его установки).

Я предлагаю Javassist / CGLIB вместо расширения класса с помощью кода, потому что вы упоминаете, что он внешний, и вы не хотите изменять исходный код, и он может измениться в будущем. Поэтому добавление проверок прерываний во время выполнения будет работать для текущей версии, а также для будущих версий, даже если изменятся внутренние имена методов (или их параметры, возвращаемые значения и т. Д.). Вам просто нужно взять класс и добавить проверки прерываний в начале каждого метода, который не является методом run ().

2 голосов
/ 09 ноября 2012

Вы писали:

Другой вариант, который я пробовал, - пометить myMethod() и аналогичные методы специальной аннотацией "Interruptable", а затем использовать AspectJ (в которой я, по общему признанию, новичок)для перехвата всех вызовов методов там - что-то вроде:

@Before("call(* *.*(..)) && withincode(@Interruptable * *.*(..))")
public void checkInterrupt(JoinPoint thisJoinPoint) {
    if (Thread.interrupted()) throw new ForcefulInterruption();
}

Но withincode не является рекурсивным для методов, вызываемых соответствующими методами, поэтому мне придется отредактировать эту аннотацию во внешнем коде.

Идея AspectJ хороша, но вам нужно

  • использовать cflow() или cflowbelow() для рекурсивного соответствия определенному потоку управления (например, что-то вроде @Before("cflow(execution(@Interruptable * *(..)))")).
  • Убедитесь, что вы также создаете свою внешнюю библиотеку, а не только свой собственный код.Это можно сделать, используя двоичное переплетение, инструментируя классы файла JAR и переупаковывая их в новый файл JAR, или применяя LTW (переплетение времени загрузки) во время запуска приложения (т.е. во время загрузки класса).

Возможно, вам даже не понадобится аннотация маркера, если ваша внешняя библиотека имеет имя пакета, которое вы можете указать within().AspectJ действительно мощный, и часто есть несколько способов решить проблему.Я бы порекомендовал использовать его, потому что он был сделан для таких усилий, как ваш.

2 голосов
/ 05 апреля 2011

Возможен вариант:

  1. Сделать виртуальную машину подключенной к себе с помощью JDI.
  2. Найдите поток, в котором выполняется ваша задача.Это не тривиально, но, поскольку у вас есть доступ ко всем фреймам стека, это, безусловно, выполнимо.(Если вы поместите уникальное поле id в объекты задачи, вы сможете определить поток, который его выполняет.)
  3. Остановить поток асинхронно.

Хотя я не думаю, что остановленный поток может серьезно помешать исполнителям (в конце концов, они должны быть отказоустойчивыми), существует альтернативное решение, которое не предусматривает остановку потока.

При условии, что ваши задачи ничего не изменяют в других частях системы (что вполне справедливо, иначе вы бы не пытались их сбить), вы можете использовать JDI для удаления ненужных кадров стека ивыйдите из задачи в обычном режиме.

public class StoppableTask implements Runnable {

private boolean stopped;
private Runnable targetTask;
private volatile Thread runner;
private String id;

public StoppableTask(TestTask targetTask) {
    this.targetTask = targetTask;
    this.id = UUID.randomUUID().toString();
}

@Override
public void run() {
    if( !stopped ) {
        runner = Thread.currentThread();
        targetTask.run();
    } else {
        System.out.println( "Task "+id+" stopped.");
    }
}

public Thread getRunner() {
    return runner;
}

public String getId() {
    return id;
}
}

Это исполняемый файл, который оборачивает все ваши остальные выполняемые объекты.Он хранит ссылку на исполняющий поток (будет важно позже) и идентификатор, чтобы мы могли найти его с помощью вызова JDI.

public class Main {

public static void main(String[] args) throws IOException, IllegalConnectorArgumentsException, InterruptedException, IncompatibleThreadStateException, InvalidTypeException, ClassNotLoadedException {
    //connect to the virtual machine
    VirtualMachineManager manager = Bootstrap.virtualMachineManager();
    VirtualMachine vm = null;
    for( AttachingConnector con : manager.attachingConnectors() ) {
        if( con instanceof SocketAttachingConnector ) {
            SocketAttachingConnector smac = (SocketAttachingConnector)con;
            Map<String,? extends Connector.Argument> arg = smac.defaultArguments();
            arg.get( "port" ).setValue( "8000");
            arg.get( "hostname" ).setValue( "localhost" );
            vm = smac.attach( arg );
        }
    }

    //start the test task
    ExecutorService service = Executors.newCachedThreadPool();
    StoppableTask task = new StoppableTask( new TestTask() );
    service.execute( task );
    Thread.sleep( 1000 );

    // iterate over all the threads
    for( ThreadReference thread : vm.allThreads() ) {
        //iterate over all the objects referencing the thread
        //could take a long time, limiting the number of referring
        //objects scanned is possible though, as not many objects will
        //reference our runner thread
        for( ObjectReference ob : thread.referringObjects( 0 ) ) {
            //this cast is safe, as no primitive values can reference a thread
            ReferenceType obType = (ReferenceType)ob.type();
            //if thread is referenced by a stoppable task
            if( obType.name().equals( StoppableTask.class.getName() ) ) {

                StringReference taskId = (StringReference)ob.getValue( obType.fieldByName( "id" ));

                if( task.getId().equals( taskId.value() ) ) {
                    //task with matching id found
                    System.out.println( "Task "+task.getId()+" found.");

                    //suspend thread
                    thread.suspend();

                    Iterator<StackFrame> it = thread.frames().iterator();
                    while( it.hasNext() ) {
                        StackFrame frame = it.next();
                        //find stack frame containing StoppableTask.run()
                        if( ob.equals( frame.thisObject() ) ) {
                            //pop all frames up to the frame below run()
                            thread.popFrames( it.next() );
                            //set stopped to true
                            ob.setValue( obType.fieldByName( "stopped") , vm.mirrorOf( true ) );
                            break;
                        }
                    }
                    //resume thread
                    thread.resume();

                }

            }
        }
    }

}
}

И для справки, вызов "library", с которым я тестировал:

public class TestTask implements Runnable {

    @Override
    public void run() {
        long l = 0;
        while( true ) {
            l++;
            if( l % 1000000L == 0 )
                System.out.print( ".");
        }

    }
}

Вы можете попробовать его, запустив класс Main с параметром командной строки -agentlib:jdwp=transport=dt_socket,server=y,address=localhost:8000,timeout=5000,suspend=n.Работает с двумя оговорками.Во-первых, если выполняется нативный код (thisObject кадра - ноль), вам придется подождать, пока он не закончится.Во-вторых, блоки finally не вызываются, поэтому различные ресурсы могут быть утечки.

1 голос
/ 09 апреля 2011

Я взломал уродливое решение моей проблемы. Это не красиво, но в моем случае это работает, поэтому я публикую это здесь на случай, если это кому-нибудь еще поможет.

То, что я сделал, было профиль библиотечные части моего приложения, надеясь, что я смогу выделить небольшую группу методов, которые вызываются неоднократно - например, некоторые get методы или equals() или что-то подобное эти строки; и тогда я мог бы вставить следующий сегмент кода туда:

if (Thread.interrupted()) {
    // Not really necessary, but could help if the library does check it itself in some other place:
    Thread.currentThread().interrupt();
    // Wrapping the checked InterruptedException because the signature doesn't declare it:
    throw new RuntimeException(new InterruptedException());
}

Либо вставьте его вручную, отредактировав код библиотеки, либо автоматически, написав соответствующий аспект. Обратите внимание, что если библиотека пытается поймать и проглотить RuntimeException, выброшенное исключение можно заменить чем-то другим, что библиотека не пытается перехватить.

К счастью для меня, используя VisualVM , я смог найти одиночный метод, вызываемый очень много раз при конкретном использовании библиотеки. После добавления указанного выше сегмента кода он теперь корректно реагирует на прерывания.

Это, конечно, не поддерживаемо, плюс ничто действительно не гарантирует, что библиотека будет неоднократно вызывать этот метод в других сценариях; но это сработало для меня, и поскольку относительно легко профилировать другие приложения и вставлять туда проверки, я считаю это общим, хотя и некрасивым решением.

0 голосов
/ 07 ноября 2012

Насколько мне известно, есть два способа использования аспекта

  • AspecJ
  • Spring AOP

AspectJ - это настраиваемый компилятор, который перехватывает методы, которыекомпилируется (что означает, что он не может получить «внешние методы». Spring AOP (по умолчанию) использует проксирование классов для перехвата методов во время выполнения (поэтому он может перехватывать «внешние методы»), но проблема в Spring AOP заключается в том, что он не может использовать проксиуже прокси-классы (которые AspectJ может делать, потому что он не поддерживает прокси-классы). Я думаю, что AspectJ мог бы помочь вам в этом случае.

0 голосов
/ 05 апреля 2011

Как сказал mhaller , лучший вариант - запустить новый процесс.Поскольку ваши задания не являются такими совместными, у вас никогда не будет гарантий на завершение потока.

Хорошим решением вашей проблемы будет использование библиотеки, которая поддерживает произвольную паузу / остановку 'lightweight threads 'например Akka вместо службы исполнителя, хотя это может быть немного излишним.

Хотя Я никогда не использовал Akka и не может подтвердить, что он работает, как вы ожидаете, в документации указано, что есть stop () метод для остановки актеров.

0 голосов
/ 28 декабря 2010

Если внутренние методы имеют похожие имена, вы можете использовать определение pointcut в xml (spring / AspectJ) вместо аннотаций, поэтому модификация кода внешней библиотеки не требуется.

...