Я загружаю данные neo4j в фрейм данных spark, используя neo4j-spark разъем .Я могу получить его успешно, так как я могу показать данные.Затем я регистрирую фрейм данных с помощью метода createOrReplaceTempView () .Затем я пытаюсь запустить на нем spark sql, но это дает исключение, говорящее
org.apache.spark.sql.AnalysisException: Table or view not found: neo4jtable;
Вот как выглядит весь мой код:
import java.text.ParseException;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.neo4j.spark.Neo4JavaSparkContext;
import org.neo4j.spark.Neo4j;
import scala.collection.immutable.HashMap;
public class Neo4jDF {
private static Neo4JavaSparkContext neo4jjsc;
private static SparkConf sConf;
private static JavaSparkContext jsc;
private static SparkContext sc;
private static SparkSession ss;
private static Dataset<Row> neo4jdf;
static String neo4jip = "ll.mm.nn.oo";
public static void main(String[] args) throws AnalysisException, ParseException
{
setSparkConf();
setJavaSparkContext();
setNeo4jJavaSparkContext();
setSparkContext();
setSparkSession();
neo4jdf = loadNeo4jDataframe();
neo4jdf.createOrReplaceTempView("neo4jtable");
neo4jdf.show(false); //this prints correctly
Dataset<Row> neo4jdfsqled = ss.sql("SELECT * from neo4jtable");
neo4jdfsqled.show(false); //this throws exception
}
private static Dataset<Row> loadNeo4jDataframe(String pAutosysBoxCaption)
{
Neo4j neo4j = new Neo4j(jsc.sc());
HashMap<String, Object> a = new HashMap<String, Object>();
Dataset<Row> rdd = neo4j.cypher("cypher query deleted for irrelevance", a).loadDataFrame();
return rdd;
}
private static void setSparkConf()
{
sConf = new SparkConf().setAppName("GetNeo4jToRddDemo");
sConf.set("spark.neo4j.bolt.url", "bolt://" + neo4jip + ":7687");
sConf.set("spark.neo4j.bolt.user", "neo4j");
sConf.set("spark.neo4j.bolt.password", "admin");
sConf.setMaster("local");
sConf.set("spark.testing.memory", "471859200");
sConf.set("spark.sql.warehouse.dir", "file:///D:/Mahesh/workspaces/spark-warehouse");
}
private static void setJavaSparkContext()
{
jsc = new JavaSparkContext(sConf);
}
private static void setSparkContext()
{
sc = JavaSparkContext.toSparkContext(jsc);
}
private static void setSparkSession()
{
ss = new SparkSession(sc);
}
private static void setNeo4jJavaSparkContext()
{
neo4jjsc = Neo4JavaSparkContext.neo4jContext(jsc);
}
}
Я чувствую, что проблема может заключаться в том, как всепеременные среды искры созданы.Сначала я создал SparkConf sConf
.
Из sConf
, я создал JavaSparkContext jsc
.
Из jsc
, я создал SparkContext sc
.
Из sc
, я создал SparkSession ss
.
С ss
я создал Neo4jJavaSparkContext neo4jjjsc
.
Так визуально:
sConf -> jsc -> sc -> ss
-> neo4jjsc
Также обратите внимание, что
- Внутри loadNeo4jDataframe (), яиспользуйте
sc
для создания экземпляра Neo4j neo4j
, который затем используется для извлечения данных neo4j. - Данные выбираются с использованием экземпляра
Neo4j
. neo4jjjsc
никогда не используется, но я сохранил его как возможную подсказку к проблеме.
Учитывая все эти моменты и наблюдения, пожалуйста, скажите мне, почему я получаю исключение таблицы, не найденное?Я, должно быть, упускаю что-то глупое.: \
Обновление
Пробная настройка ss
(после выборки данных с использованием SparkContext из neo4j) следующим образом:
private static void setSparkSession(SparkContext sc)
{
ss = new SparkSession(sc);
}
private static Dataset<Row> loadNeo4jDataframe(String pAutosysBoxCaption)
{
Neo4j neo4j = new Neo4j(sc);
Dataset<Row> rdd = neo4j.cypher("deleted cypher for irrelevance", a).loadDataFrame();
//initalizing ss after data is fetched using SparkContext of neo4j
setSparkSession(neo4j.sc());
return rdd;
}
Обновление 2
Из комментариев только что понял, что neo4j
создает свой собственный сеанс искры, используя предоставленный ему экземпляр контекста искры sc
.У меня нет доступа к этой искровой сессии.Итак, как я должен добавить / зарегистрировать произвольный фрейм данных (здесь neo4jdf
), который создается в каком-то другом сеансе искры (здесь сеанс spark, созданный neo4j.cypher
), в мой сеанс искры ss
?