Я пытаюсь запустить этот маленький пример spark-xml, и он завершается неудачей, за исключением случаев, когда я делаю запрос на отправку.
Образец РЕПО: https://github.com/punithmailme/spark-xml-new
команда: . / Dse spark-submit --class MainDriver /Users/praj3/Desktop/projects/spark/main/build/libs/main.jar
import java.io.Serializable;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import lombok.Builder;
import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class MainDriver {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("Unit Test");
sparkConf.setMaster("local[2]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
SQLContext sqlCon = new SQLContext(javaSparkContext);
final JavaRDD<Book> parallelize = javaSparkContext
.parallelize(Arrays.asList(Book.builder().name("2341").test("34").build(),
Book.builder().name("2341").test("34").build()));
final JavaRDD<Row> map = parallelize.map(book -> RowFactory.create(
book.getName(),
book.getTest()
));
List<StructField> fields = new ArrayList<>();
fields.add(new StructField("Name", DataTypes.StringType, true, Metadata.empty()));
fields.add(new StructField("Test", DataTypes.StringType, true, Metadata.empty()));
final StructType structType = DataTypes.createStructType(fields);
final Dataset<Row> dataFrame = sqlCon.createDataFrame(map, structType);
dataFrame
.repartition(1)
.write()
.format("com.databricks.spark.xml")
.mode(SaveMode.Overwrite)
.option("rootTag", "n:Brands")
.option("rowTag", "n:Brand")
.save("new");
}
}
@Data
@Builder
class Book implements Serializable {
private final String name;
private final String test;
}
Исключение ..
Caused by: javax.xml.stream.XMLStreamException: Trying to output second root, <n:Brand>
at com.ctc.wstx.sw.BaseStreamWriter.throwOutputError(BaseStreamWriter.java:1537) ~[woodstox-core-asl-4.4.1.jar:4.4.1]
at com.ctc.wstx.sw.BaseStreamWriter.throwOutputError(BaseStreamWriter.java:1544) ~[woodstox-core-asl-4.4.1.jar:4.4.1]
at com.ctc.wstx.sw.BaseStreamWriter.reportNwfStructure(BaseStreamWriter.java:1572) ~[woodstox-core-asl-4.4.1.jar:4.4.1]
at com.ctc.wstx.sw.BaseNsStreamWriter.checkStartElement(BaseNsStreamWriter.java:469) ~[woodstox-core-asl-4.4.1.jar:4.4.1]
at com.ctc.wstx.sw.BaseNsStreamWriter.writeStartElement(BaseNsStreamWriter.java:290) ~[woodstox-core-asl-4.4.1.jar:4.4.1]
at com.sun.xml.internal.txw2.output.DelegatingXMLStreamWriter.writeStartElement(DelegatingXMLStreamWriter.java:45) ~[na:1.8.0_144]
at com.sun.xml.internal.txw2.output.IndentingXMLStreamWriter.writeStartElement(IndentingXMLStreamWriter.java:148) ~[na:1.8.0_144]
at com.databricks.spark.xml.parsers.StaxXmlGenerator$.apply(StaxXmlGenerator.scala:128) ~[main.jar:na]
at com.databricks.spark.xml.util.XmlFile$$anonfun$1$$anon$1.next(XmlFile.scala:108) ~[main.jar:na]
at com.databricks.spark.xml.util.XmlFile$$anonfun$1$$anon$1.next(XmlFile.scala:96) ~[main.jar:na]
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) ~[scala-library-2.11.11.jar:na]
Окружающая среда и зависимость
DataStax Enterprise 5.1.8 на Mac с указанными ниже зависимостями
[group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.1.2'],
[group: 'org.projectlombok', name: 'lombok', version: lombokVersion],
[group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.0.7'],
[group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.9.5'],
[group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.5'],
[group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.5'],
)
group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.1.2') {
exclude group: 'org.slf4j', module: 'slf4j-log4j12' //because of log4j and slf conflict
}
[group: 'com.databricks', name: 'spark-csv_2.11', version: '1.5.0'],
[group: 'com.databricks', name: 'spark-xml_2.11', version: '0.4.1']
)
Компоненты DSE 5.1.8
- Apache Cassandra ™ 3.11.1.2261
- Apache Solr ™ 6.0.1.0.2224
- Apache Spark ™ 2.0.2.17
- DSE Java Driver 1.2.6
- Spark Jobserver 0.6.2.237
Когда я запускаю это как основной метод как единый поток, он работает, Только при искровой отправке это не работает !!!