Нужно ли открывать новое соединение с БД для каждой записи в Кассандре? - PullRequest
0 голосов
/ 28 мая 2018

Я пытаюсь написать простую программу на JAVA, которая генерирует некоторые данные (просто POJO), которые публикуются в теме Кафки.Из этой темы подписчик выбирает данные и должен записать их в базу данных Cassandra.

Создание и выборка работают нормально, но когда дело доходит до записи данных в базу данных Cassandra, есть кое-что, что заставляет меняинтересно.

Когда я пытаюсь записать данные, мне всегда нужно открывать новое соединение с БД.Выглядит очень неприятно.

 @Override
  public void run() {

    setRunning(true);


    try {
      konsument.subscribe(Collections.singletonList(ServerKonfiguration.TOPIC));

      while (running) {

        ConsumerRecords<Long, SensorDaten> sensorDaten = konsument.poll(Long.MAX_VALUE);
        sensorDaten.forEach(
                datum -> {
                  CassandraConnector cassandraConnector = new CassandraConnector();

                  cassandraConnector.schreibeSensorDaten(datum.key(), datum.value());
                  System.out.printf(
                          "Consumer Record:(%d, %s, %d, %d)\n",
                          datum.key(), datum.value(), datum.partition(), datum.offset());
                });
      }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      konsument.close();
    }
  }

Приведенный выше фрагмент кода работает, но, как я уже говорил, для каждой записи мне нужно создавать новое соединение.

Когда я инициализирую cassandraConnector вне цикла, я делаю одну успешную запись, а затем получаю исключения «Нет доступных хостов».

Класс CassandraConnector:

 public class CassandraConnector {

  private final String KEYSPACE = "ba2";
  private final String SERVER_IP = "127.0.0.1";
  private Cluster cluster;
  private Session session;

  public CassandraConnector() {
    cluster = Cluster.builder().addContactPoint(SERVER_IP).build();
    session = cluster.connect(KEYSPACE);
  }

  public void schreibeSensorDaten(Long key, SensorDaten datum) {

    try {

      session.execute(
          "INSERT INTO.....

Ответы [ 2 ]

0 голосов
/ 28 мая 2018
    public class SensorDatenKonsument implements Runnable {

  /** Kafka Konsument */
  private final KafkaConsumer<Long, SensorDaten> konsument;

  /** Einrichtung der Verbindung zu Cassandra */
  private final Cluster cluster =
      Cluster.builder().addContactPoint(TestKonfiguration.CASS_SERVER_IP).build();

  private final Session session = cluster.connect(TestKonfiguration.KEYSPACE);

  public SensorDatenKonsument(String groupId) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, TestKonfiguration.BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    props.put(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SensorDatenDeserializer.class.getName());
    this.konsument = new KafkaConsumer<>(props);
  }

  @Override
  public void run() {


    try {
      konsument.subscribe(Collections.singletonList(TestKonfiguration.TOPIC));
      PreparedStatement prepStmt =
          session.prepare(
              "INSERT INTO wetterdaten (id, date_time, air_temp, std_air_temp, humidity, std_humidity,"
                  + "IR_temp, std_IR_temp, air_pressure, std_pressure, wind_speed, std_wind_speed, light_A,"
                  + "std_light_A, light_B, std_light_B, distance, std_distance, counter, roll, pitch,"
                  + "X_accel, std_X_accel, Y_accel, std_Y_accel, Z_accel, std_Z_accel, battery, error,"
                  + "WDT_trace, crc3) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");

      while (true) {
        ConsumerRecords<Long, SensorDaten> kafkaRecord = konsument.poll(Long.MAX_VALUE);
        System.out.println("*** Poll ***");

        kafkaRecord.forEach(
            datum -> {
              session.execute(
                  prepStmt.bind(...
0 голосов
/ 28 мая 2018

Нет, вам нужно повторно использовать экземпляры кластера / сеанса - они довольно тяжелы для инициализации ...

Также лучше использовать подготовленные операторы для вставки данных - послевы создаете сеанс, делаете что-то вроде:

PreparedStatement pStmt = session.prepare("INSERT INTO ... VALUES (?, ?)");

и затем в цикле

session.execute(pStmt.bind(datum.key(), datum.value()));

Что касается ошибки, пожалуйста, проверьте журналы на стороне Кассандры.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...