У меня есть конвейер pyspark, который должен экспортировать таблицу как файл csv в hdfs и на сервер sftp (данные будут взяты командой crm после).
Для экспорта в HDFS это очень образец, и он работает как шарм, но для экспорта данных в файл sftp я сделал следующее:
def export_to_sftp():
dataframe.coalesce(1).options(codec=compression).write.mode("overwrite").option('encoding',encoding).csv(file_to_hdfs, header=True, nullValue='', sep=';')
copyToLocalFile(file_to_hdfs,local_machine_file) # copy from HDFS to LOCAL using hadoop API
cnopts = pysftp.CnOpts()
hostkeys = None
if cnopts.hostkeys.lookup(server) is not None:
hostkeys = cnopts.hostkeys
cnopts.hostkeys = None
try:
with pysftp.Connection(host=server, username=login,
password=password, cnopts=cnopts) as sftp:
if hostkeys is not None:
hostkeys.add(server, sftp.remote_server_key.get_name(), sftp.remote_server_key)
hostkeys.save(pysftp.helpers.known_hosts())
sftp.put(local_machine_file, sftp_path)
except Exception as e:
log.exception(e)
finally:
log.info("Cleaning up)
shutil.rmtree(local_tmp)
Этот метод отлично работает, когда файлы не слишком большие, но для некоторых таблиц он не работает, как в моем локальном linux машина У меня недостаточно места на диске,
Так можно ли использовать pysftp для копирования удаленного файла из hdfs в sftp в потоковом методе без копирования на локальный компьютер?