Запуск пользовательского Java класса в PySpark на EMR - PullRequest
3 голосов
/ 22 января 2020

Я пытаюсь использовать пакет Cerner Bunsen для обработки FHIR в PySpark на AWS EMR, в частности, класс Bundles и его методы. Я создаю сеанс spark, используя Apache Livy API,

def create_spark_session(master_dns, kind, jars):
    # 8998 is the port on which the Livy server runs
    host = 'http://' + master_dns + ':8998'
    data = {'kind': kind, 'jars': jars}
    headers = {'Content-Type': 'application/json'}
    response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    logging.info(response.json())
    return response.headers

Где kind = pyspark3 и jars - это S3-местоположение, в котором находится jar (bunsen-shaded-1.4.7.jar)

Преобразование данных пытается импортировать банку и вызвать методы с помощью:

# Setting the Spark Session and Pulling the Existing SparkContext
sc = SparkContext.getOrCreate()

# Cerner Bunsen
from py4j.java_gateway import java_import, JavaGateway
java_import(sc._gateway.jvm,"com.cerner.bunsen.Bundles")
func = sc._gateway.jvm.Bundles()

Я получаю сообщение об ошибке:

"py4j.protocol.Py4JError: Произошла ошибка при вызове None.com.cerner.bunsen.Bundles. Трассировка: \ npy4j.Py4JException: конструктор com.cerner.bunsen.Bundles ([]) не существует "

Это В первый раз я попытался использовать java_import, чтобы была полезна любая помощь.

РЕДАКТИРОВАТЬ: Я немного изменил сценарий преобразования и теперь вижу другую ошибку. Я вижу, как jar добавляется в журналы, поэтому я уверен, что он там есть и что функциональность jars: jars работает так, как задумано. Новое преобразование:

# Setting the Spark Session and Pulling the Existing SparkContext
sc = SparkContext.getOrCreate()

# Manage logging
#sc.setLogLevel("INFO")

# Cerner Bunsen
from py4j.java_gateway import java_import, JavaGateway
java_import(sc._gateway.jvm,"com.cerner.bunsen")
func_main = sc._gateway.jvm.Bundles
func_deep = sc._gateway.jvm.Bundles.BundleContainer

fhir_data_frame = func_deep.loadFromDirectory(spark,"s3://<bucket>/source_database/Patient",1)
fhir_data_frame_fromJson = func_deep.fromJson(fhir_data_frame)
fhir_data_frame_clean = func_main.extract_entry(spark,fhir_data_frame_fromJson,'patient')
fhir_data_frame_clean.show(20, False)

и новая ошибка:

Объект 'JavaPackage' не может быть вызван

Поиск этой ошибки имеет было немного бесполезно, но опять же, если у кого-то есть идеи, я с удовольствием их приму.

1 Ответ

1 голос
/ 22 января 2020

Если вы хотите использовать функцию Scala / Java в Pyspark, вы также должны добавить пакет jar в classpath. Это можно сделать двумя различными способами:

Option1: В Spark отправьте с флагом --jars

 spark-submit example.py --jars /path/to/bunsen-shaded-1.4.7.jar

Option2: Добавить это в spark-defaults.conf файл в свойстве:

Добавьте следующий код в: path/to/spark/conf/spark-defaults.conf

# Comma-separated list of jars include on the driver and executor classpaths. 
spark.jars /path/to/bunsen-shaded-1.4.7.jar
...