Хотите прочитать данные cassandra, используя сценарий python поверх spark, но pysaprk не может прочитать пустую строку, которая есть в таблице Cassandra - PullRequest
0 голосов
/ 03 июня 2019

Попытка загрузить чтение таблицы Cassandra и загрузка ее в DWH, но сценарий pyspark / python не может прочитать некоторые конкретные столбцы, поскольку этот столбец заморожен, а одно поле пусто.Ниже приведено определение таблицы:

CREATE TABLE end_details (
 partner_id text,
 site_id text,
 end_id text,
 drives set<frozen<asset_drive>>,
 installed_softwares set<frozen<asset_software>>,
 networks set<frozen<endpoint_network>>
 PRIMARY KEY ((partner_id, site_id), endpoint_id))

Где имя столбца: накопители, установленные_программы и сети - это замороженные UDT.Вышеупомянутый столбец имеет метку времени как один из его полей, и они являются пустыми.

Мы обнаружили, что если любой из таких столбцов имеет пустую метку времени, то pyspark не может преобразовать его в строку или что-либо еще, кроме броскаошибка как NULLPOINTER EXCEPTION

Мы попытались передать ее в виде пользовательской схемы, создав функцию.

from pyspark.sql.types import DateType
from pyspark.sql.types import TimestampType


def get_instance_usage_schema():

 columns =          ['name','version','publisher','user_name','kind','intel64bit','info_string','appstore','location']

columns_struct_fields = [StructField(field_name, StringType(), True) for field_name in columns]

# Add columns for non-string fields
columns_struct_fields.append(StructField("install_date", TimestampType(), True))
columns_struct_fields.append(StructField("last_access_datetime", TimestampType(), True))

schema = StructType(columns_struct_fields)
return schema

и выше, функцию, которую мы передали в следующем коде

 customschema = StructType([StructField("partner_id", StringType(),      True), StructField("site_id", StringType(), True),

StructField ("endpoint_id", StringType (), True), StructField ("instal_softwares", ArrayType (get_instance_usage_schema (), True), True)])

table_df = sqlContext.read.format("org.apache.spark.sql.cassandra")\
.options(table='end_details', keyspace='asset_db')\
.schema(customschema).load()

           data_df=table_df.select("partner_id","site_id","endpoint_id","installed_softwares")

from pyspark.sql.functions import col,explode,struct
from pyspark.sql import types
import json
from pyspark.sql.types import *
from pyspark.sql import functions

table_df = sqlContext.read.format("org.apache.spark.sql.cassandra")\
.options(table='end_details', keyspace='asset_db')\
.load()

Ниже приведено сообщение об ошибке

Py4JJavaError: Произошла ошибка при вызове o452.collectToPython.: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0 на этапе 6.0 не выполнена 1 раз, последний сбой: потерянная задача 0.0 на этапе 6.0 (TID 30, localhost, драйвер исполнителя): java.lang.NullPointerExceptionв com.datastax.driver.core.AbstractData.hashCode (AbstractData.java:589) в com.datastax.driver.core.UDTValue.hashCode (UDTValue.java:76) в java.util.HashMap.hash (HashMap.java: 338) в java.util.HashMap.put (HashMap.java:611) в java.util.HashSet.add (HashSet.java:219)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...