Spark State Stream написать Кассандре - PullRequest
0 голосов
/ 04 мая 2018

Я использую потоковую передачу pyspark для выполнения преобразования с сохранением состояния. Состояние содержит окончательные данные, и я хотел бы сохранить состояние Кассандра. Вот мой код:

# gets new stream of data
new_stream = get_kafka_stream()

# transformations happens here
transformed_stream = transform_stream(new_stream)

# I then update the state in memory
state_stream = transformed_stream.updateStateByKey(update_func)

# save the state to cassandra
load_state_stream(state_stream)

Вот функция, которая пытается загрузить поток на кассандру:

def load_state_stream(stream):
     stream.foreachRDD(lambda rdd: save(rdd))

     def save(rdd):
            if not rdd.isEmpty():
               df = rdd.toDF() 
              df.write
              .format("org.apache.spark.sql.cassandra") \
             .options(table="flat_table", keyspace="etl")\
              .mode("append")

Вот журналы из Кассандры:

INFO  [Native-Transport-Requests-1] 2018-05-04 08:42:01,106 AuthCache.java:172 - (Re)initializing PermissionsCache (validity period/update interval/max entries) (2000/2000/1000)

INFO  [Native-Transport-Requests-1] 2018-05-04 08:42:30,669 AuthCache.java:172 - (Re)initializing RolesCache (validity period/update interval/max entries) (2000/2000/1000)

Вот журналы от Spark:

 2018-05-07 03:02:40 ERROR TaskSetManager:70 - Task 21 in stage 15803.0 failed 1 times; aborting job
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 20.0 in stage 15803.0 (TID 89026, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 11.0 in stage 15803.0 (TID 89017, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 0.0 in stage 15803.0 (TID 89006, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 2.0 in stage 15803.0 (TID 89008, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 7.0 in stage 15803.0 (TID 89013, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 22.0 in stage 15803.0 (TID 89028, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 16.0 in stage 15803.0 (TID 89022, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 18.0 in stage 15803.0 (TID 89024, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 5.0 in stage 15803.0 (TID 89011, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 1.0 in stage 15803.0 (TID 89007, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 23.0 in stage 15803.0 (TID 89029, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 15.0 in stage 15803.0 (TID 89021, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 17.0 in stage 15803.0 (TID 89023, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 9.0 in stage 15803.0 (TID 89015, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 3.0 in stage 15803.0 (TID 89009, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 19.0 in stage 15803.0 (TID 89025, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 4.0 in stage 15803.0 (TID 89010, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 6.0 in stage 15803.0 (TID 89012, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 14.0 in stage 15803.0 (TID 89020, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 12.0 in stage 15803.0 (TID 89018, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 13.0 in stage 15803.0 (TID 89019, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 8.0 in stage 15803.0 (TID 89014, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 WARN  TaskSetManager:66 - Lost task 10.0 in stage 15803.0 (TID 89016, localhost, executor driver): TaskKilled (Stage cancelled)
2018-05-07 03:02:40 ERROR JobScheduler:91 - Error running job streaming job 1525662160000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/streaming/util.py", line 65, in call
    r = self.func(t, *rdds)
  File "/usr/local/spark/python/pyspark/streaming/dstream.py", line 159, in <lambda>
    func = lambda t, rdd: old_func(rdd)
  File "<ipython-input-10-690701637381>", line 3, in <lambda>
    uncouple_stream.foreachRDD(lambda rdd: stream_to_cassandra(rdd))
  File "<ipython-input-10-690701637381>", line 10, in stream_to_cassandra
    temp_flatobs.write.format("org.apache.spark.sql.cassandra")            .options(table="flat_obs", keyspace="etl")            .mode("append")            .save()
  File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 701, in save
    self._jwrite.save()
  File "/usr/local/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o547939.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 in stage 15803.0 failed 1 times, most recent failure: Lost task 21.0 in stage 15803.0 (TID 89027, localhost, executor driver): java.net.SocketException: Socket is closed
    at java.net.Socket.getInputStream(Socket.java:903)
    at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:109)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Этот метод работает, но через некоторое время я получаю исключение для закрытого сокета от spark Что я могу делать не так.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...