моя искра работы (java искры) убивается при запуске на кластере, но она отлично работает на локальном - PullRequest
0 голосов
/ 06 июля 2019

Я попытался запустить свой Java-код для искры на кластере пряжи, чтобы проверить эффективность, как на локальном уровне, при обработке больших данных это было медленнее, но после создания рабочего процесса и создания банок вышеупомянутого кода при отправке задания через некоторое времямой спаркджоб убивается, но когда я запускаю jar на локальном компьютере, он работает без ошибок.

Я использовал немного простого Java-метода в коде, где я беру два параметра как два пути и один из путивызывает ошибку в hdfs, показывая, что путь не найден (java.nio.file.NoSuchFileException)

ниже - это место, где происходит исключение

f.repartition(1).write()
.mode(SaveMode.Overwrite)
.format("com.databricks.spark.csv")
         .option("header", "true")
         .option("delimiter", "|")
         .save("E:\\data\\Dump\\praga\\showtest");


String inputFile ="E:\\data\\Dump\\praga\\showtest", outFile="E:\\data\\Dump\\praga\\showtest\\showout2.csv";


inputFile+="\\part-00000";

call(inputFile, outFile);

вышеупомянутый метод чистоjava-код и не содержит никаких методов spark, вызывает ли это выполнение задания?

метод ниже

public static void call(String a, String b) {

    ArrayList <Do> result = new ArrayList <Do> ();
    try {
        BufferedReader br = new BufferedReader(new FileReader(a));
        String line = "";
        Boolean setDate = false;
        while ((line = br.readLine()) != null) {
            Do do1 = new Do();

            System.err.println(line);
            java.lang.String[] lineArray = line.split("\\|");
            int index = 0;
            setDate = false;
            String openDate = null,
            closedDate = null;
            while (index < lineArray.length) {
                //System.out.println("processing data "+index);
                if (index == 0) {
                    do1.setTelephone(lineArray[0]);
                    do1.setOpen(lineArray[1]);
                    do1.setUpdate(lineArray[1]);
                    List < Do > do2 = result.stream().filter(customer - >lineArray[0].equals(customer.getTelephone())).collect(Collectors.toList());
                    if (!do2.isEmpty()) {
                        int indexOf = do2.size() - 1;
                        int inc = 0;
                        for (Do item: do2) {

                            if (indexOf == inc) {
                                ////
                                some conditions and getter setter methods using pojo classes

                            }
                            //System.out.println(do1);
                            if (!setDate) {
                                result.add(do1);
                            }
                        }
                        System.err.println("result size" + result.size());
                        System.out.println("result " + result);
                        br.close();

                        FileWriter writer;
                        //System.err.println("list "+result.size());
                        writer = new FileWriter(b); //here the output path is writted

                        //True = Append to file, false = Overwrite
                        //remove the header column from the list
                        result.remove(0);
                        //
                        // // Write CSV
                        String[] lineArray = headerColumn.split("\\|");
                        int index = 0;
                        while (index < lineArray.length) {
                            writer.write(lineArray[index]);
                            writer.write("|");
                            index++;
                        }
                        writer.write("\r\n");

                        for (Do do1: result) {
                            //for all columns conditions checked and thwy are wriiten as csv file format in the outpath
                        }

                        writer.write("\r\n");

                    }

                    System.out.println("Write success!");
                    System.err.println("result size" + result.size());
                    writer.close();

при запуске вышеуказанного кода в локальном файле CSV успешно записывается впуть к выходному файлу и код выполняются успешно

, но при запуске на кластере его убивают за hв браузере и показывает, что исключение пути не найдено с указанием пути вывода

до тех пор, пока эта часть кода не будет успешно выполнена в кластере пряжи оттенка браузера

f.repartition(1).write()
 .mode(SaveMode.Overwrite)
 .format("com.databricks.spark.csv")
          .option("header", "true")
          .option("delimiter", "|")
          .save("E:\\data\\Dump\\praga\\showtest");

в hdf, папка showtest будет создана и частичноФайлы 00000 также успешно создаются, но после того, как эта часть становится исключением, есть ли способ это исправить?

...