Выполните итерацию по различным столбцам, используя withcolumn в Java Spark - PullRequest
2 голосов
/ 28 января 2020

Я должен изменить Dataset<Row> в соответствии с некоторыми правилами, указанными в List<Row>. Я хочу перебрать столбцы Datset<Row>, используя Dataset.withColumn(...), как показано в следующем примере:

(import necesary libraries...)

SparkSession spark = SparkSession
                .builder()
                .appName("appname")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();

Dataset<Row> dfToModify = spark.read().table("TableToModify");

List<Row> ListListWithInfo = new ArrayList<>(Arrays.asList());

ListWithInfo.add(0,RowFactory.create("field1", "input1", "output1", "conditionAux1"));
ListWithInfo.add(1,RowFactory.create("field1", "input1", "output1", "conditionAux2"));
ListWithInfo.add(2,RowFactory.create("field1", "input2", "output3", "conditionAux3"));
ListWithInfo.add(3,RowFactory.create("field2", "input3", "output4", "conditionAux4"));
.
.
.

for (Row row : ListWithInfo) {

            String field = row.getString(0);
            String input = row.getString(1);
            String output = row.getString(2);
            String conditionAux = row.getString(3);

            dfToModify = dfToModify.withColumn(field,
                                    when(dfToModify.col(field).equalTo(input)
                                    .and(dfToModify.col("conditionAuxField").equalTo(conditionAux))
                                    ,output)
                                    .otherwise(dfToModify.col(field)));

        }

Код работает должным образом, но когда в Список, программа не заканчивается sh, и этот вывод отображается на экране:

0/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1653
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1650
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1635
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1641
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1645
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1646
20/01/27 17:48:18 INFO storage.BlockManagerInfo: Removed broadcast_113_piece0 on **************** in memory (size: 14.5 KB, free: 3.0 GB)
20/01/27 17:48:18 INFO storage.BlockManagerInfo: Removed broadcast_113_piece0 on ***************** in memory (size: 14.5 KB, free: 3.0 GB)
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1639
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1649
20/01/27 17:48:18 INFO spark.ContextCleaner: Cleaned accumulator 1651
20/01/27 17:49:18 INFO spark.ExecutorAllocationManager: Request to remove executorIds: 6
20/01/27 17:49:18 INFO cluster.YarnClientSchedulerBackend: Requesting to kill executor(s) 6
20/01/27 17:49:18 INFO cluster.YarnClientSchedulerBackend: Actual list of executor(s) to be killed is 6
20/01/27 17:49:18 INFO spark.ExecutorAllocationManager: Removing executor 6 because it has been idle for 60 seconds (new desired total will be 0)
20/01/27 17:49:19 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:19 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 6.
20/01/27 17:49:19 INFO scheduler.DAGScheduler: Executor lost: 6 (epoch 0)
20/01/27 17:49:19 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:19 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 6 from BlockManagerMaster.
20/01/27 17:49:19 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(6, *********************, 43387, None)
20/01/27 17:49:19 INFO storage.BlockManagerMaster: Removed 6 successfully in removeExecutor
20/01/27 17:49:19 INFO cluster.YarnScheduler: Executor 6 on **************** killed by driver.
20/01/27 17:49:19 INFO spark.ExecutorAllocationManager: Existing executor 6 has been removed (new total is 0)
20/01/27 17:49:20 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:21 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
20/01/27 17:49:22 INFO yarn.SparkRackResolver: Got an error when resolving hostNames. Falling back to /default-rack for all
.
.
.
.

Есть ли способ повысить эффективность с помощью Java Spark? (без использования для l oop или чего-то подобного)

1 Ответ

0 голосов
/ 03 февраля 2020

Наконец, я использовал метод withColumns Dataset<Row> objet. У этого метода нужны два аргумента:

.withColumns(Seq<String> ColumnsNames, Seq<Column> ColumnsValues);

И в Seq<String> нельзя дублировать.

Код выглядит следующим образом:


SparkSession spark = SparkSession
                .builder()
                .appName("appname")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();

Dataset<Row> dfToModify = spark.read().table("TableToModify");

List<Row> ListListWithInfo = new ArrayList<>(Arrays.asList());

ListWithInfo.add(0,RowFactory.create("field1", "input1", "output1", "conditionAux1"));
ListWithInfo.add(1,RowFactory.create("field1", "input1", "output1", "conditionAux2"));
ListWithInfo.add(2,RowFactory.create("field1", "input2", "output3", "conditionAux3"));
ListWithInfo.add(3,RowFactory.create("field2", "input3", "output4", "conditionAux4"));
.
.
.
// initialize values for fields and conditions
String field_ant = ListWithInfo.get(0).getString(0).toLowerCase();
String first_input = ListWithInfo.get(0).getString(1);
String first_output = ListWithInfo.get(0).getString(2);
String first_conditionAux = ListWithInfo.get(0).getString(3);
Column whenColumn = when(dfToModify.col(field_ant).equalTo(first_input)
                .and(dfToModify.col("conditionAuxField").equalTo(lit(first_conditionAux)))
                ,first_output);

// lists with the names of the fields and the conditions        
List<Column> whenColumnList = new ArrayList(Arrays.asList());
List<String> fieldsNameList = new ArrayList(Arrays.asList());

for (Row row : ListWithInfo.subList(1,ListWithInfo.size())) {

            String field = row.getString(0);
            String input = row.getString(1);
            String output = row.getString(2);
            String conditionAux = row.getString(3);

           if (field.equals(field_ant)) {
                 // if field is equals to fiel_ant the new condition is added to the previous one
                whenColumn = whenColumn.when(dfToModify.col(field).equalTo(input)
                        .and(dfToModify.col("conditionAuxField").equalTo(lit(conditionAux)))
                        ,output);
            } else {
                // if field is diferent to the previous:
                // close the conditions for this field
                whenColumn = whenColumn.otherwise(dfToModify.col(field_ant));

                // add to the lists the field(String) and the conditions (columns)
                whenColumnList.add(whenColumn);
                fieldsNameList.add(field_ant);

                // and initialize the conditions for the new field
                whenColumn = when(dfToModify.col(field).equalTo(input)
                                .and(dfToModify.col("branchField").equalTo(lit(branch)))
                        ,output);
            }

            field_ant = field;

        }

// add last values
whenColumnList.add(whenColumn);
fieldsNameList.add(field_ant);

// transform list to Seq
Seq<Column> whenColumnSeq = JavaConversions.asScalaBuffer(whenColumnList).seq();
Seq<String> fieldsNameSeq = JavaConversions.asScalaBuffer(fieldsNameList).seq();

Dataset<Row>  dfModified = dfToModify.withColumns(fieldsNameSeq, whenColumnSeq);

...