org.apache.spark.sql.AnalysisException: таблица и представление не найдены - PullRequest
0 голосов
/ 01 июня 2018

Я загружаю данные neo4j в фрейм данных spark, используя neo4j-spark разъем .Я могу получить его успешно, так как я могу показать данные.Затем я регистрирую фрейм данных с помощью метода createOrReplaceTempView () .Затем я пытаюсь запустить на нем spark sql, но это дает исключение, говорящее

org.apache.spark.sql.AnalysisException: Table or view not found: neo4jtable;

Вот как выглядит весь мой код:

import java.text.ParseException;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.neo4j.spark.Neo4JavaSparkContext;
import org.neo4j.spark.Neo4j;

import scala.collection.immutable.HashMap;

public class Neo4jDF {

    private static Neo4JavaSparkContext neo4jjsc;
    private static SparkConf sConf;
    private static JavaSparkContext jsc;
    private static SparkContext sc; 
    private static SparkSession ss;

    private static Dataset<Row> neo4jdf;
    static String neo4jip = "ll.mm.nn.oo";

    public static void main(String[] args) throws AnalysisException, ParseException 
    {
        setSparkConf();
        setJavaSparkContext();
        setNeo4jJavaSparkContext();
        setSparkContext();
        setSparkSession();

        neo4jdf = loadNeo4jDataframe();
        neo4jdf.createOrReplaceTempView("neo4jtable");

        neo4jdf.show(false); //this prints correctly

        Dataset<Row> neo4jdfsqled = ss.sql("SELECT * from neo4jtable");

        neo4jdfsqled.show(false); //this throws exception
    }

    private static Dataset<Row> loadNeo4jDataframe(String pAutosysBoxCaption)
    {
        Neo4j neo4j = new Neo4j(jsc.sc());
        HashMap<String, Object> a = new HashMap<String, Object>();
        Dataset<Row> rdd = neo4j.cypher("cypher query deleted for irrelevance", a).loadDataFrame();
        return rdd;
    }

    private static void setSparkConf()
    {
        sConf = new SparkConf().setAppName("GetNeo4jToRddDemo");
        sConf.set("spark.neo4j.bolt.url", "bolt://" + neo4jip + ":7687");
        sConf.set("spark.neo4j.bolt.user", "neo4j");
        sConf.set("spark.neo4j.bolt.password", "admin");
        sConf.setMaster("local");
        sConf.set("spark.testing.memory", "471859200");
        sConf.set("spark.sql.warehouse.dir", "file:///D:/Mahesh/workspaces/spark-warehouse");
    }

    private static void setJavaSparkContext()
    {
        jsc = new JavaSparkContext(sConf);
    }

    private static void setSparkContext()
    {
        sc = JavaSparkContext.toSparkContext(jsc);
    }

    private static void setSparkSession()
    {
        ss = new SparkSession(sc);
    }

    private static void setNeo4jJavaSparkContext()
    {
        neo4jjsc = Neo4JavaSparkContext.neo4jContext(jsc);
    }
}

Я чувствую, что проблема может заключаться в том, как всепеременные среды искры созданы.Сначала я создал SparkConf sConf.
Из sConf, я создал JavaSparkContext jsc.
Из jsc, я создал SparkContext sc.
Из sc, я создал SparkSession ss.
С ss я создал Neo4jJavaSparkContext neo4jjjsc.

Так визуально:

sConf -> jsc -> sc       -> ss 
             -> neo4jjsc 

Также обратите внимание, что

  • Внутри loadNeo4jDataframe (), яиспользуйте sc для создания экземпляра Neo4j neo4j, который затем используется для извлечения данных neo4j.
  • Данные выбираются с использованием экземпляра Neo4j.
  • neo4jjjsc никогда не используется, но я сохранил его как возможную подсказку к проблеме.

Учитывая все эти моменты и наблюдения, пожалуйста, скажите мне, почему я получаю исключение таблицы, не найденное?Я, должно быть, упускаю что-то глупое.: \

Обновление

Пробная настройка ss (после выборки данных с использованием SparkContext из neo4j) следующим образом:

private static void setSparkSession(SparkContext sc)
{
    ss = new SparkSession(sc);
}

private static Dataset<Row> loadNeo4jDataframe(String pAutosysBoxCaption)
{
    Neo4j neo4j = new Neo4j(sc);

    Dataset<Row> rdd = neo4j.cypher("deleted cypher for irrelevance", a).loadDataFrame();

    //initalizing ss after data is fetched using SparkContext of neo4j
    setSparkSession(neo4j.sc());  
    return rdd;
}

Обновление 2

Из комментариев только что понял, что neo4j создает свой собственный сеанс искры, используя предоставленный ему экземпляр контекста искры sc.У меня нет доступа к этой искровой сессии.Итак, как я должен добавить / зарегистрировать произвольный фрейм данных (здесь neo4jdf), который создается в каком-то другом сеансе искры (здесь сеанс spark, созданный neo4j.cypher), в мой сеанс искры ss?

1 Ответ

0 голосов
/ 01 июня 2018

Исходя из симптомов, мы можем заключить, что оба фрагмента кода используют разные SparkSession / SQLContext.Предполагая, что в коннекторе Neo4j нет ничего необычного, вы можете исправить это, изменив:

private static void setSparkSession()
{
    ss = SparkSession().builder.getOrCreate();
}

или инициализировав SparkSession перед вызовом setNeo4jJavaSparkContext.

Еслиони не будут работать, вы можете переключиться на использование createGlobalTempView.

Важно :

В общем, я бы рекомендовал инициализировать одиночный SparkSession с использованием шаблона компоновщика, иполучение других контекстов (SparkContexts) из него, когда это необходимо.

...