Попытка загрузить чтение таблицы Cassandra и загрузка ее в DWH, но сценарий pyspark / python не может прочитать некоторые конкретные столбцы, поскольку этот столбец заморожен, а одно поле пусто.Ниже приведено определение таблицы:
CREATE TABLE end_details (
partner_id text,
site_id text,
end_id text,
drives set<frozen<asset_drive>>,
installed_softwares set<frozen<asset_software>>,
networks set<frozen<endpoint_network>>
PRIMARY KEY ((partner_id, site_id), endpoint_id))
Где имя столбца: накопители, установленные_программы и сети - это замороженные UDT.Вышеупомянутый столбец имеет метку времени как один из его полей, и они являются пустыми.
Мы обнаружили, что если любой из таких столбцов имеет пустую метку времени, то pyspark не может преобразовать его в строку или что-либо еще, кроме броскаошибка как NULLPOINTER EXCEPTION
Мы попытались передать ее в виде пользовательской схемы, создав функцию.
from pyspark.sql.types import DateType
from pyspark.sql.types import TimestampType
def get_instance_usage_schema():
columns = ['name','version','publisher','user_name','kind','intel64bit','info_string','appstore','location']
columns_struct_fields = [StructField(field_name, StringType(), True) for field_name in columns]
# Add columns for non-string fields
columns_struct_fields.append(StructField("install_date", TimestampType(), True))
columns_struct_fields.append(StructField("last_access_datetime", TimestampType(), True))
schema = StructType(columns_struct_fields)
return schema
и выше, функцию, которую мы передали в следующем коде
customschema = StructType([StructField("partner_id", StringType(), True), StructField("site_id", StringType(), True),
StructField ("endpoint_id", StringType (), True), StructField ("instal_softwares", ArrayType (get_instance_usage_schema (), True), True)])
table_df = sqlContext.read.format("org.apache.spark.sql.cassandra")\
.options(table='end_details', keyspace='asset_db')\
.schema(customschema).load()
data_df=table_df.select("partner_id","site_id","endpoint_id","installed_softwares")
from pyspark.sql.functions import col,explode,struct
from pyspark.sql import types
import json
from pyspark.sql.types import *
from pyspark.sql import functions
table_df = sqlContext.read.format("org.apache.spark.sql.cassandra")\
.options(table='end_details', keyspace='asset_db')\
.load()
Ниже приведено сообщение об ошибке
Py4JJavaError: Произошла ошибка при вызове o452.collectToPython.: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0 на этапе 6.0 не выполнена 1 раз, последний сбой: потерянная задача 0.0 на этапе 6.0 (TID 30, localhost, драйвер исполнителя): java.lang.NullPointerExceptionв com.datastax.driver.core.AbstractData.hashCode (AbstractData.java:589) в com.datastax.driver.core.UDTValue.hashCode (UDTValue.java:76) в java.util.HashMap.hash (HashMap.java: 338) в java.util.HashMap.put (HashMap.java:611) в java.util.HashSet.add (HashSet.java:219)