, если это поможет, вот программа, которую я использовал для проверки вашего случая.
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class StackOverflow20190326_2 {
public static void main(String args[]) {
SparkSession spark = SparkSession.builder().appName("StackOverflow20190326").master("local").getOrCreate();
// generate a dummy 2-liner dataset
Dataset<Row> ds = spark.sql("select 1 as idx, 'this is line 1' as value union select 2 as idx, 'This is the second line' as value");
test(ds);
spark.stop();
}
private static void test(Dataset<Row> dataFrame) {
JavaRDD<String> javaRDD = dataFrame.toJSON().toJavaRDD();
if (javaRDD.take(1).size() > 0) {
System.out.println("jsonString:#######");
javaRDD.foreachPartition(jsonString -> {
System.out.println("#######" + jsonString);
while (jsonString.hasNext()) {
final String str = jsonString.next();
if (str != null && !str.equals("")) {
System.out.println("jsonString:" + str);
}
}
});
}
}
}
Вывод выглядит следующим образом:
jsonString:#######
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(empty iterator)
#######IteratorWrapper(non-empty iterator)
jsonString:{"idx":1,"value":"this is line 1"}
#######IteratorWrapper(non-empty iterator)
jsonString:{"idx":2,"value":"This is the second line"}
Как видите, существует много пустых разделов, но две исходные строки хорошо выводятся.
Я использую spark 2.4, как вы можете видеть из pom.xml maven:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.borgoltz.test</groupId>
<artifactId>spark-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>2.4.0</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
</project>
Последнее, но не менее важное
Работаете ли вы в локальном режиме?Потому что в противном случае замыкание в вызове .foreachPartition()
может быть вызвано на удаленных исполнителях, поэтому println
будут выводиться на другие машины, на которых работает драйвер ... Простой способ проверить это проверить журналы наисполнителей или заменить System.out.println записью в HDFS, например ...
HTH!