Я вставляю в таблицу Cassandra типичные записи строк записи Kafka, получая из-за искровой потоковой передачи "Invalid STRING constant (53689) для" calendar_period_id "типа bigint"
Поскольку сообщения Kafka имеют формат String.Как изменить строковый тип строки данных SQL на целочисленный при вставке в таблицу Cassandra.Я пробовал много способов, но получил тот же результат.
Строки потребителя Kafka поставляются со строковыми форматами.Я пытаюсь изменить его в формат Integer.Но в случае - (отрицательный символ), NULL бросает константу Invalid STRING (-1) для "brand_id" типа bigint
.writeStream
.foreach(new cassinsupddel())
.outputMode("Append")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
import org.apache.spark.SparkConf
import org.apache.spark.sql.{ForeachWriter, SparkSession}
class cassinsupddel() extends ForeachWriter[org.apache.spark.sql.Row] {
val cassandraDriver = new CassandraDriver();
def open(partitionId: Long, version: Long): Boolean = {
// open connection
println(s"in my Open connection")
true
}
def process(record: org.apache.spark.sql.Row) = {
if(op_type == "I") {
println(s"Process insert new $record")
cassandraDriver.connector.withSessionDo(session =>
session.execute(s"""
insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink}(external_usn, plan_status_code, brand_id, brand_name)
values (record.getAs[Int](6)}', '${record.getAs[Int](76)}', '${record.getAs[Int](40)}', '${record(41)}')"""
)
)
}
}
}
образца таблицы Кассандры
CREATE TABLE archive_dub (
external_usn bigint,
plan_status_code bigint,
brand_id bigint,
brand_name text,
break_number text}
потребительские строки идут со строковыми форматами.Я пытаюсь изменить его в формат Integer.Но в случае - (отрицательный символ), ноль это бросает Invalid STRING constant (53689) for "brand_id " of type bigint