Flink RichParallelSourceFunction - закрыть () против отмены () - PullRequest
0 голосов
/ 26 июня 2019

Я реализую функцию RichParallelSourceFunction, которая читает файлы через SFTP. RichParallelSourceFunction наследует cancel () от SourceFunction и close () от RichFunction (). Насколько я понимаю, и cancel (), и close () вызываются до того, как источник будет отключен. Поэтому в обоих из них я должен добавить логику для остановки бесконечного цикла, который читает файлы.

Когда я устанавливаю параллелизм источника на 1 и запускаю задание Flink из IDE, среда выполнения Flink вызывает stop () сразу после вызова start () и все задание останавливается. Я этого не ожидал.

Когда я устанавливаю параллелизм источника на 1 и запускаю задание Flink в кластере, задание выполняется как обычно. Если я оставлю параллельность источника по умолчанию (в моем случае 4), задание будет работать как обычно.

Использование Flink 1.7.


public class SftpSource<TYPE_OF_RECORD>
    extends RichParallelSourceFunction<TYPE_OF_RECORD>
{
    private final SftpConnection mConnection;
    private boolean mSourceIsRunning;

    @Override 
    public void open(Configuration parameters) throws Exception
    {
        mConnection.open();
    }

    @Override 
    public void close()
    {
        mSourceIsRunning = false;
    }


    @Override
    public void run(SourceContext<TYPE_OF_RECORD> aContext)
    {
        while (mSourceIsRunning)
        {
            synchronized ( aContext.getCheckpointLock() )
            {
                // use mConnection
                // aContext.collect() ...
            }

            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException ie)
            {
                mLogger.warn("Thread error: {}", ie.getMessage() );
            }
        }

        mConnection.close();
    }


    @Override
    public void cancel()
    {
        mSourceIsRunning = false;
    }
}

Так что у меня есть обходные пути, и вопрос больше в теории. Почему close () вызывается, если параллелизм равен 1 и задание запускается из среды IDE (т. Е. Из командной строки)? Кроме того, close () и cancel () делают то же самое в RichParallelSourceFunction?

Ответы [ 3 ]

1 голос
/ 27 июня 2019

Почему close () вызывается, если параллелизм равен 1 и задание запускается из IDE.

close вызывается после последнего вызова основных методов работы (например, map или join). Этот метод может быть использован для очистки работы. Он будет вызываться независимо от числа, определенного в параллелизме.

Кроме того, close () и cancel () делают то же самое в RichParallelSourceFunction?

Это не одно и то же, взгляните на то, как это описано.

Cancels the source. Most sources will have a while loop inside the run(SourceContext) method. The implementation needs to ensure that the source will break out of that loop after this method is called.

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html#cancel--

Следующая ссылка может помочь вам понять жизненный цикл задачи: https://ci.apache.org/projects/flink/flink-docs-stable/internals/task_lifecycle.html#operator-lifecycle-in-a-nutshell

1 голос
/ 27 июня 2019

Я думаю, что javadocs более чем самоочевидны:

Gracefully Stopping Functions
Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction} interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the state and the emitted elements in a consistent state.

- SourceFunction.cancel

Cancels the source. Most sources will have a while loop inside the run(SourceContext) method. The implementation needs to ensure that the source will break out of that loop after this method is called.
A typical pattern is to have an "volatile boolean isRunning" flag that is set to false in this method. That flag is checked in the loop condition.

When a source is canceled, the executing thread will also be interrupted (via Thread.interrupt()). The interruption happens strictly after this method has been called, so any interruption handler can rely on the fact that this method has completed. It is good practice to make any flags altered by this method "volatile", in order to guarantee the visibility of the effects of this method to any interruption handler.

- SourceContext.close

This method is called by the system to shut down the context.

Примечание , вы можете отменить SourceFunction, но остановить SourceContext

0 голосов
/ 01 июля 2019

Я нашел ошибку в моем коде. Вот исправление

public void open(Configuration parameters) throws Exception
{
    mConnection.open();
    mSourceIsRunning = true;
}

Теперь close () не вызывается, пока я не решу остановить рабочий процесс, в этом случае сначала вызывается cancel (), а затем close (). Мне все еще интересно, как параллелизм повлиял на поведение.

...