Я использую потоковую передачу pyspark для выполнения преобразования с сохранением состояния. Состояние содержит окончательные данные, и я хотел бы сохранить состояние Кассандра. Вот мой код:
# gets new stream of data
new_stream = get_kafka_stream()
# transformations happens here
transformed_stream = transform_stream(new_stream)
# I then update the state in memory
state_stream = transformed_stream.updateStateByKey(update_func)
# save the state to cassandra
load_state_stream(state_stream)
Вот функция, которая пытается загрузить поток на кассандру:
def load_state_stream(stream):
stream.foreachRDD(lambda rdd: save(rdd))
def save(rdd):
if not rdd.isEmpty():
df = rdd.toDF()
df.write
.format("org.apache.spark.sql.cassandra") \
.options(table="flat_table", keyspace="etl")\
.mode("append")
Вот журналы из Кассандры:
INFO [Native-Transport-Requests-1] 2018-05-04 08:42:01,106 AuthCache.java:172 - (Re)initializing PermissionsCache (validity period/update interval/max entries) (2000/2000/1000)
INFO [Native-Transport-Requests-1] 2018-05-04 08:42:30,669 AuthCache.java:172 - (Re)initializing RolesCache (validity period/update interval/max entries) (2000/2000/1000)
Вот журналы от Spark:
2018-05-07 03:02:40 ERROR TaskSetManager:70 - Task 21 in stage 15803.0 failed 1 times; aborting job
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 20.0 in stage 15803.0 (TID 89026, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 11.0 in stage 15803.0 (TID 89017, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 0.0 in stage 15803.0 (TID 89006, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 2.0 in stage 15803.0 (TID 89008, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 7.0 in stage 15803.0 (TID 89013, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 22.0 in stage 15803.0 (TID 89028, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 16.0 in stage 15803.0 (TID 89022, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 18.0 in stage 15803.0 (TID 89024, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 5.0 in stage 15803.0 (TID 89011, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 1.0 in stage 15803.0 (TID 89007, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 23.0 in stage 15803.0 (TID 89029, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 15.0 in stage 15803.0 (TID 89021, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 17.0 in stage 15803.0 (TID 89023, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 9.0 in stage 15803.0 (TID 89015, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 3.0 in stage 15803.0 (TID 89009, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 19.0 in stage 15803.0 (TID 89025, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 4.0 in stage 15803.0 (TID 89010, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 6.0 in stage 15803.0 (TID 89012, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 14.0 in stage 15803.0 (TID 89020, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 12.0 in stage 15803.0 (TID 89018, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 13.0 in stage 15803.0 (TID 89019, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 8.0 in stage 15803.0 (TID 89014, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN TaskSetManager:66 - Lost task 10.0 in stage 15803.0 (TID 89016, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 ERROR JobScheduler:91 - Error running job streaming job 1525662160000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/local/spark/python/pyspark/streaming/util.py", line 65, in call
r = self.func(t, *rdds)
File "/usr/local/spark/python/pyspark/streaming/dstream.py", line 159, in <lambda>
func = lambda t, rdd: old_func(rdd)
File "<ipython-input-10-690701637381>", line 3, in <lambda>
uncouple_stream.foreachRDD(lambda rdd: stream_to_cassandra(rdd))
File "<ipython-input-10-690701637381>", line 10, in stream_to_cassandra
temp_flatobs.write.format("org.apache.spark.sql.cassandra") .options(table="flat_obs", keyspace="etl") .mode("append") .save()
File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 701, in save
self._jwrite.save()
File "/usr/local/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/local/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o547939.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 in stage 15803.0 failed 1 times, most recent failure: Lost task 21.0 in stage 15803.0 (TID 89027, localhost, executor driver): java.net.SocketException: Socket is closed
at java.net.Socket.getInputStream(Socket.java:903)
at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:109)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Этот метод работает, но через некоторое время я получаю исключение для закрытого сокета от spark Что я могу делать не так.