Spark Streaming: Избегайте многократных обращений к БД - PullRequest
0 голосов
/ 23 ноября 2018

У меня есть структурированная потоковая передача Spark, которая читает события пользовательского интерфейса из нескольких тем Kafka.Текущий поток выглядит следующим образом:

  1. Поток искры считывает смещения Кафки
  2. Для каждого смещения он поступает в базу данных и отображает одно из значений, поступающих из темы, в другое значение.
  3. Агрегирует данные
  4. Записывает данные в одну и ту же базу данных.

Столкнулся с этой проблемой, когда после 10-12 часов работы выдает ошибку too many db connection open.Это делается только для шага 2.

Использование базы данных Aerospike для этой работы Spark.Есть ли способ оптимизировать этот поток?Разве можно уменьшить количество обращений к базе данных?

Считать данные:

sparkSession.readStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaBootstrapServersString)
          .option("subscribe", newTopic)
          .option("startingOffsets", "latest")
          .option("enable.auto.commit", false)
          .option("failOnDataLoss", false)
          .load();

Сопоставить значение из базы данных и агрегировать данные:

dataset
        .map(
            new MapFunction<Row, Row>() {
              @Override
              public Row call(Row row) throws Exception {
                objects[1] = aerospikeDao.getSomeValueFromCode(row.getAs("code"));

                return new GenericRowWithSchema(objects, eventSpecificStructType);
              }
            },
            RowEncoder.apply(eventSpecificStructType)
        )
        .withWatermark("timestamp", "30 seconds")
        .select(
            col("timestamp"),
            col("platform"),
            col("some_value")
        )
        .groupBy(
            functions.window(col("timestamp"), "30 seconds"),
            col("platform"),
            col("some_value")
        )
        .agg(
            count(lit(1)).as("count")
        );

Запись в базу данных:

aggregatedDataset
        .writeStream()
        .option("startingOffsets", "earliest")
        .outputMode(OutputMode.Append())
        .foreach(sink)
        .trigger(Trigger.ProcessingTime("30 seconds"))
        .start();

DAO:

    public AerospikeClient connect() {
        if (aerospikeClient == null || !aerospikeClient.isConnected()) {
          setAerospikeClient();
        }
        return this.aerospikeClient;
      }

 public void close() {
        if (aerospikeClient != null && aerospikeClient.isConnected()) {
            aerospikeClient.close();
        }
    }
public String getSomeValueFromCode(String code) {
    connet();
    Record record = aerospikeClient.get(Policy, key, "SomeValue");
    close();
    return channel;
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...