Извлечь данные в фрейм данных из базы данных Oracle в Pyspark 1.5 - PullRequest
0 голосов
/ 06 октября 2018

У меня проблема с pyspark.Я хочу получить данные из базы данных оракула.Моя основная проблема заключается в создании URL-адреса jdbc.

Я пробовал два способа, и оба ошибки по ошибке.Ниже мой код источника. Не могли бы вы помочь мне построить правильный запрос : Я уточнил, что использую Spark 1.5 (функции Spark 2.0 не будут работать).Большое спасибо,

##### 
from pyspark import SparkContext,SparkConf
appName='Import-Data' 
try:
    sc.stop()
    except :
    print 'spark context does not exists'
else:
    print 'existing spark context stopped'
 conf = SparkConf().setAppName(appName)
 conf.set("spark.executor.instances", "9")
 conf.set("spark.executor.cores", "4")
 conf.set("spark.executor.memory", "8g")
 sc = SparkContext(conf=conf)

import numpy as np
import datetime as dt
import pandas as pd
import glob
import os
import re

sqlsc = SQLContext(sc)
from pyspark import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
sqlsc = SQLContext(sc)
from pyspark import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

#Connection a la base de donnees
#First way (YYYY is the user and XXXXXX is the password)
#MyDataFrame = sqlsc.read.load(source="jdbc",url="jdbc:oracle:thin://Server/DATABASE?   user=YYYY&password=XXXXXX",dbtable="schema.table")

#Second way
MyDataFrame = sqlsc.read.load(source="jdbc",url="jdbc:oracle:thin:YYYY/XXXXXX@Server:1521/DATABASE",dbtable="Schema.table")


#Here is the error I am facing:

Py4JJavaErrorTraceback (most recent call last)
<ipython-input-21-82abab7efad2> in <module>()
----> 1 MyDataFrame.show(5)
/usr/iop/current/spark-client/python/pyspark/sql/dataframe.py in show(self, n, truncate)
    254         +---+-----+
    255         """
--> 256         print(self._jdf.showString(n, truncate))
    257
    258     def __repr__(self):
/usr/iop/current/spark-client/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539
    540         for temp_arg in temp_args:
/usr/iop/current/spark-client/python/pyspark/sql/utils.py in deco(*a, **kw)
     34     def deco(*a, **kw):
     35         try:
---> 36             return f(*a, **kw)
     37         except py4j.protocol.Py4JJavaError as e:
 38             s = e.java_exception.toString()
/usr/iop/current/spark-client/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(
Py4JJavaError: An error occurred while calling o152.showString.
: java.lang.IllegalStateException: SparkContext has been shutdown
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1814)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
        at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
        at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
        at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
        at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
        at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314)
        at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377)
        at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
       at java.lang.Thread.run(Thread.java:745)

 Here are the environments variable set:
export PATH=/gpfs/user/$USER/env_python2/bin:/gpfs/user/$USER/env_python3/bin:$PATH
#ajout de R
export PATH=/gpfs/user/common/R-devel/R-3.4.1/bin:$PATH
#Lib pour Jupyter
export LD_LIBRARY_PATH=/gpfs/user/common/jupyter/sqlite/sqlite/lib:$LD_LIBRARY_PATH
export LD_LIBRARY_PATH=/gpfs/user/common/jupyter/sqlite/sqlite/lib:$LD_LIBRARY_PATH
export SPARK_CLASSPATH=/soft/ora1120/db/jdbc/lib/ojdbc6.jar:/gpfs/user/e547041/jupyter/toolbox/spark-csv_2.10-0.1.jar

Примечание: я использую Юпитер под spak 1.5

1 Ответ

0 голосов
/ 06 октября 2018

Ваш код четко указывает на то, что JVM SparkContext не работает

Py4JJavaError: An error occurred while calling o152.showString.
: java.lang.IllegalStateException: SparkContext has been shutdown

Это может произойти, если Python SparkContext был неправильно остановлен, или когда есть какая-то проблема конфигурации, препятствующая Java SparkContext с начала.

В этом состоянии любое действие, необязательно jdbc, не будет выполнено.

Чтобы решить эту проблему, вы должны определить, почему контекст не запускается должным образом.Глядя на код, который вы разместили (трудно полностью проанализировать его без контекста и правильного отступа), вполне вероятно, что

try:
   sc.stop()

оставляет ваш драйвер в каком-то неопределенном состоянии.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...