Я пытаюсь использовать пакет Cerner Bunsen для обработки FHIR в PySpark на AWS EMR, в частности, класс Bundles и его методы. Я создаю сеанс spark, используя Apache Livy API,
def create_spark_session(master_dns, kind, jars):
# 8998 is the port on which the Livy server runs
host = 'http://' + master_dns + ':8998'
data = {'kind': kind, 'jars': jars}
headers = {'Content-Type': 'application/json'}
response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
logging.info(response.json())
return response.headers
Где kind = pyspark3 и jars - это S3-местоположение, в котором находится jar (bunsen-shaded-1.4.7.jar)
Преобразование данных пытается импортировать банку и вызвать методы с помощью:
# Setting the Spark Session and Pulling the Existing SparkContext
sc = SparkContext.getOrCreate()
# Cerner Bunsen
from py4j.java_gateway import java_import, JavaGateway
java_import(sc._gateway.jvm,"com.cerner.bunsen.Bundles")
func = sc._gateway.jvm.Bundles()
Я получаю сообщение об ошибке:
"py4j.protocol.Py4JError: Произошла ошибка при вызове None.com.cerner.bunsen.Bundles. Трассировка: \ npy4j.Py4JException: конструктор com.cerner.bunsen.Bundles ([]) не существует "
Это В первый раз я попытался использовать java_import, чтобы была полезна любая помощь.
РЕДАКТИРОВАТЬ: Я немного изменил сценарий преобразования и теперь вижу другую ошибку. Я вижу, как jar добавляется в журналы, поэтому я уверен, что он там есть и что функциональность jars: jars работает так, как задумано. Новое преобразование:
# Setting the Spark Session and Pulling the Existing SparkContext
sc = SparkContext.getOrCreate()
# Manage logging
#sc.setLogLevel("INFO")
# Cerner Bunsen
from py4j.java_gateway import java_import, JavaGateway
java_import(sc._gateway.jvm,"com.cerner.bunsen")
func_main = sc._gateway.jvm.Bundles
func_deep = sc._gateway.jvm.Bundles.BundleContainer
fhir_data_frame = func_deep.loadFromDirectory(spark,"s3://<bucket>/source_database/Patient",1)
fhir_data_frame_fromJson = func_deep.fromJson(fhir_data_frame)
fhir_data_frame_clean = func_main.extract_entry(spark,fhir_data_frame_fromJson,'patient')
fhir_data_frame_clean.show(20, False)
и новая ошибка:
Объект 'JavaPackage' не может быть вызван
Поиск этой ошибки имеет было немного бесполезно, но опять же, если у кого-то есть идеи, я с удовольствием их приму.