Как я могу использовать функцию saveAsTable, когда у меня два ноутбука Spark работают параллельно в одном ноутбуке? - PullRequest
0 голосов
/ 07 октября 2019

У меня два записанных потока Spark в записной книжке для параллельной работы.

  spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
  df1 = spark \
      .readStream.format("delta") \
      .table("test_db.table1") \
      .select('foo', 'bar')


  writer_df1 = df1.writeStream.option("checkpoint_location", checkpoint_location_1) \
      .foreachBatch(
      lambda batch_df, batch_epoch:
      process_batch(batch_df, batch_epoch)
  ) \
      .start()

  spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
  df2 = spark \
  .readStream.format("delta") \
  .table("test_db.table2") \
  .select('foo', 'bar')

  writer_df2 = merchant_df.writeStream.option("checkpoint_location", checkpoint_location_2) \
    .foreachBatch(
    lambda batch_df, batch_epoch:
    process_batch(batch_df, batch_epoch)
  ) \
    .start()

Эти кадры данных затем обрабатываются построчно, каждая строка отправляется в API. Если вызов API сообщает об ошибке, я затем преобразовываю строку в JSON и добавляю эту строку в общую таблицу сбоев в кирпичах данных.

columns = ['table_name', 'record', 'time_of_failure', 'error_or_status_code']
vals = [(table_name, json.dumps(row.asDict()), datetime.now(), str(error_or_http_code))]
error_df = spark.createDataFrame(vals, columns)
error_df.select('table_name','record','time_of_failure', 'error_or_status_code').write.format('delta').mode('Append').saveAsTable("failures_db.failures_db)"

При попытке добавить строку в эту таблицу, saveAsTable()вызов здесь вызывает следующее исключение.

py4j.protocol.Py4JJavaError: Произошла ошибка при вызове o3578.saveAsTable. : java.lang.IllegalStateException: Не удается найти идентификатор REPL в локальных свойствах Spark. Spark-submit и R не поддерживают транзакции записи из разных кластеров. Если вы используете R, пожалуйста, переключитесь на Scala или Python. Если вы используете spark-submit, пожалуйста, преобразуйте его в JAR-задание Databricks. Или вы можете отключить многокластерную запись, установив для «spark.databricks.delta.multiClusterWrites.enabled» значение «false». Если это отключено, запись в одну таблицу должна происходить из одного кластера. Пожалуйста, проверьте https://docs.databricks.com/delta/delta-intro.html#frequently-asked-questions-faq для получения более подробной информации.

Если я закомментирую один из потоков и перезапущу записную книжку, любые ошибки из вызовов API будут вставлены в таблицу без проблем. ,Я чувствую, что мне нужно добавить какую-то конфигурацию, но я не уверен, куда идти.

1 Ответ

0 голосов
/ 08 октября 2019

Не уверен, что это лучшее решение, но я считаю, что проблема возникает из-за того, что каждый поток записывает в таблицу одновременно. Я разделил эту таблицу на отдельные таблицы для каждого потока, и после этого она сработала.

...