Я использую Spark Structured Stream для чтения сообщения от Kafka. Сообщения содержат путь к zip-файлу, содержащему JSON-файлы, я беру JSON-файлы и записываю их в HDFS с помощью следующего кода:
Сначала,Я создаю readStream для kafka.
Dataset<Row> dsRow = spark.readStream()
.format("kafka")
.options(/*KAFKA OPTIONS - Servers etc*/)
.load();
Затем я создаю writeStream для ForeachQuery
dsRow.writeStream()
.queryName("ForEachQuery")
.outputMode(OutputMode.Update())
.foreach(writer)
.start()
.awaitTermination();
И мой писатель выглядит так:
ForeachWriter<Row> writer = new ForeachWriter() {
@Override
public void process(Row arg0) {
//Here I'm extracting the ZIP content (JSON files) to HDFS.
}
@Override
public boolean open(long arg0, long arg1) { return true; }
@Override
public void close(long arg0, long arg1) { }
}
Иногда я получаю следующее исключение:
Нет аренды /user/myuser/path/filefromzip.json (индекс 1815182): файл не существует.Держатель DFSClient_NOMAPREDUCE_1932232_180 не имеет открытых файлов.
Я знаю, что это произошло, потому что я работаю параллельно, но как я могу это решить?Может это из-за обработки дубликатов?Если да, как я могу предотвратить использование метода open
?
Спасибо.