Не умеет итератор через javaRDD - PullRequest
0 голосов
/ 26 марта 2019

Я пытаюсь перебрать RDD и применить некоторую логику к каждой строке и отправить ее в API.

Но СДР не входит в цикл while.

if (dataFrame.toJSON().toJavaRDD().take(1).size() > 0) {

    System.out.println("jsonString:#######");

    // System.out.println(dataFrame.toJSON().toJavaRDD().take(1));

    dataFrame.toJSON().toJavaRDD().foreachPartition(new VoidFunction<Iterator<String>>() {
      private static final long serialVersionUID = 1L;               
   @Override
    public void call(Iterator < String > jsonString) throws Exception {
      System.out.println("#######");

      while (jsonString.hasNext()) {
        final String str = jsonString.next();
        if (str != null && !str.equals("")) {

          System.out.println("jsonString:" + jsonString);


        }

      }

    }
  });
}

Ответы [ 2 ]

2 голосов
/ 27 марта 2019

, если это поможет, вот программа, которую я использовал для проверки вашего случая.

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class StackOverflow20190326_2 {
    public static void main(String args[]) {
        SparkSession spark = SparkSession.builder().appName("StackOverflow20190326").master("local").getOrCreate();

        // generate a dummy 2-liner dataset
        Dataset<Row> ds = spark.sql("select 1 as idx, 'this is line 1' as value union select 2 as idx, 'This is the second line' as value");

        test(ds);

        spark.stop();

    }

    private static void test(Dataset<Row> dataFrame) {

        JavaRDD<String> javaRDD = dataFrame.toJSON().toJavaRDD();
        if (javaRDD.take(1).size() > 0) {

            System.out.println("jsonString:#######");

            javaRDD.foreachPartition(jsonString -> {
                System.out.println("#######" + jsonString);

                while (jsonString.hasNext()) {
                    final String str = jsonString.next();
                    if (str != null && !str.equals("")) {

                        System.out.println("jsonString:" + str);

                    }

                }

            });
        }
    }
}

Вывод выглядит следующим образом:


    jsonString:#######
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(empty iterator)
    #######IteratorWrapper(non-empty iterator)
    jsonString:{"idx":1,"value":"this is line 1"}
    #######IteratorWrapper(non-empty iterator)
    jsonString:{"idx":2,"value":"This is the second line"}

Как видите, существует много пустых разделов, но две исходные строки хорошо выводятся.

Я использую spark 2.4, как вы можете видеть из pom.xml maven:

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.borgoltz.test</groupId>
    <artifactId>spark-client</artifactId>
    <version>0.0.1-SNAPSHOT</version>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <parent>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-parent_2.12</artifactId>
        <version>2.4.0</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
    </dependencies>

</project>

Последнее, но не менее важное

Работаете ли вы в локальном режиме?Потому что в противном случае замыкание в вызове .foreachPartition() может быть вызвано на удаленных исполнителях, поэтому println будут выводиться на другие машины, на которых работает драйвер ... Простой способ проверить это проверить журналы наисполнителей или заменить System.out.println записью в HDFS, например ...

HTH!

0 голосов
/ 30 апреля 2019

Это работает для меня

if (dataFrame.take(1).length > 0) {
    Iterator<String> itt = dataFrame.toJSON().toJavaRDD().collect().iterator();
    while(itt.hasNext()) { 
        String field = itt.next();
        JSONObject jsonResponse = new JSONObject(field);
        System.out.println("jsonString:" + jsonResponse );

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...