Я реализую функцию RichParallelSourceFunction, которая читает файлы через SFTP. RichParallelSourceFunction наследует cancel () от SourceFunction и close () от RichFunction (). Насколько я понимаю, и cancel (), и close () вызываются до того, как источник будет отключен. Поэтому в обоих из них я должен добавить логику для остановки бесконечного цикла, который читает файлы.
Когда я устанавливаю параллелизм источника на 1 и запускаю задание Flink из IDE, среда выполнения Flink вызывает stop () сразу после вызова start () и все задание останавливается. Я этого не ожидал.
Когда я устанавливаю параллелизм источника на 1 и запускаю задание Flink в кластере, задание выполняется как обычно.
Если я оставлю параллельность источника по умолчанию (в моем случае 4), задание будет работать как обычно.
Использование Flink 1.7.
public class SftpSource<TYPE_OF_RECORD>
extends RichParallelSourceFunction<TYPE_OF_RECORD>
{
private final SftpConnection mConnection;
private boolean mSourceIsRunning;
@Override
public void open(Configuration parameters) throws Exception
{
mConnection.open();
}
@Override
public void close()
{
mSourceIsRunning = false;
}
@Override
public void run(SourceContext<TYPE_OF_RECORD> aContext)
{
while (mSourceIsRunning)
{
synchronized ( aContext.getCheckpointLock() )
{
// use mConnection
// aContext.collect() ...
}
try
{
Thread.sleep(1000);
}
catch (InterruptedException ie)
{
mLogger.warn("Thread error: {}", ie.getMessage() );
}
}
mConnection.close();
}
@Override
public void cancel()
{
mSourceIsRunning = false;
}
}
Так что у меня есть обходные пути, и вопрос больше в теории. Почему close () вызывается, если параллелизм равен 1 и задание запускается из среды IDE (т. Е. Из командной строки)?
Кроме того, close () и cancel () делают то же самое в RichParallelSourceFunction?