Junit не может удалить @TempDir с файлом, созданным Spark Structured Streaming - PullRequest
5 голосов
/ 24 мая 2019

Я создал интеграционный тест для своего конвейера, чтобы проверить, генерируется ли правильный CSV-файл:

class CsvBatchSinkTest {

    @RegisterExtension
    static SparkExtension spark = new SparkExtension();

    @TempDir
    static Path directory;

    //this checks if the file is already available
    static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
        return Files.walk(directory.toPath()).anyMatch(f -> f.toString().endsWith(suffix));
    }

    //this gets content of file
    static List<String> extractFileWithSuffixContent(File file, String suffix) throws IOException {
        return Files.readAllLines(
                Files.walk(file.toPath())
                        .filter(f -> f.toString().endsWith(suffix))
                        .findFirst()
                        .orElseThrow(AssertionException::new));
    }

    @Test
    @DisplayName("When correct dataset is sent to sink, then correct csv file should be generated.")
    void testWrite() throws IOException, InterruptedException {

        File file = new File(directory.toFile(), "output");


        List<Row> data =
                asList(RowFactory.create("value1", "value2"), RowFactory.create("value3", "value4"));

        Dataset<Row> dataset =
                spark.session().createDataFrame(data, CommonTestSchemas.SCHEMA_2_STRING_FIELDS);

         dataset.coalesce(1)
                .write()
                .option("header", "true")
                .option("delimiter", ";")
                .csv(file.getAbsolutePath());

        Awaitility.await()
                .atMost(10, TimeUnit.SECONDS)
                .until(() -> isFileWithSuffixAvailable(file, ".csv"));

        Awaitility.await()
                .atMost(10, TimeUnit.SECONDS)
                .untilAsserted(
                        () ->
                                assertThat(extractFileWithSuffixContent(file, ".csv"))
                                        .containsExactlyInAnyOrder("field1;field2", "value1;value2", "value3;value4"));
    }
}

Реальный код выглядит немного иначе, это просто воспроизводимый пример.

Расширение Spark просто запускает локальное искрение перед каждым тестом, а закрытие - после.

Тест проходит, но затем, когда junit пытается очистить @TempDir, выдается следующее исключение:

Не удалось удалить временный каталог C: \ Users \ RK03GJ \ AppData \ Local \ Temp \ junit596680345801656194.Следующие пути не могут быть удалены

enter image description here

Можно ли как-то исправить эту ошибку?Я пытался дождаться, пока искра перестанет использовать awaility, но мне это не помогло.

Может быть, я могу как-то проигнорировать эту ошибку?

1 Ответ

2 голосов
/ 24 мая 2019

Быстрое предположение: вам нужно закрыть поток, возвращаемый Files.walk . Цитата из документов:

Если требуется своевременное удаление ресурсов файловой системы, следует использовать конструкцию try-with-resources, чтобы гарантировать, что метод потока close вызывается после завершения потоковых операций.

- https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#walk-java.nio.file.Path-java.nio.file.FileVisitOption...-

...