Реализация проекта Spring + Apache Flink с помощью Posgres - PullRequest
0 голосов
/ 26 мая 2020

У меня есть проект Gradle SpringBoot, использующий apache flink для обработки сигналов потока данных. Когда новый сигнал проходит через поток данных, я хотел бы запросить поиск (например, findById ()) его подробностей, используя идентификатор в таблице базы данных postgres, которая уже создана, чтобы получить дополнительную информацию о сигнале и обогатить данные. Я хотел бы избежать использования зависимостей Spring для выполнения поиска (т.е. репозитория Autowire) и хочу придерживаться реализации flink для поиска.

Где я могу указать, как добавить информацию о конфигурации подключения postgres, такую ​​как порт, база данных, URL-адрес, имя пользователя, пароль и т. Д. c ... (для простоты можно предположить, что postgres db является локально в моей машине). Это так же просто, как добавить конфигурацию в файл application.properties? если да, то как я могу написать метод запроса для поиска записи в таблице postgres при поиске по значению не первичного ключа?

Некоторые онлайн-источники предлагают использовать этот скелетный код, но я не уверен, как / id подходит для моего варианта использования. (У меня есть модель EventEntity, которая содержит все параметры / столбцы из таблицы, которую я ищу).

like so

    public class DatabaseMapper extends RichFlatMapFunction<String, EventEntity> {

        // Declare DB connection & query statements

        public void open(Configuration parameters) throws Exception {
            //Initialize DB connection
            //prepare query statements
        }

        @Override
        public void flatMap(String value, Collector<EventEntity> out) throws Exception {

        }
    }

1 Ответ

0 голосов
/ 28 мая 2020

Ваш пример кода верен. Вы можете установить весь свой собственный код инициализации и подготовки для PostgreSQL в методе open(). Затем вы можете использовать свои предварительно настроенные поля в своей функции flatMap().

Вот один пример для операций Redis

  • Я использовал RichAsyncFunction здесь и предлагаю вам сделать то же самое как это предлагается в качестве наилучшей практики. Подробнее читайте здесь: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html)
  • Вы можете передать параметры конфигурации в свой метод конструктора и использовать его в процессе инициализации

        public static class AsyncRedisOperations extends RichAsyncFunction<Object,Object> {
    
            private JedisPool jedisPool;
            private Configuration redisConf;
    
            public AsyncRedisOperations(Configuration redisConf) {
              this.redisConf = redisConf;
            } 
    
            @Override
            public void open(Configuration parameters) {
    
              JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
              jedisPoolConfig.setMaxTotal(this.redisConf.getInteger("pool", 8));
              jedisPoolConfig.setMaxIdle(this.redisConf.getInteger("pool", 8));
              jedisPoolConfig.setMaxWaitMillis(this.redisConf.getInteger("maxWait", 0));
    
              JedisPool jedisPool = new JedisPool(jedisPoolConfig,
                this.redisConf.getString("host", "192.168.10.10"),
                this.redisConf.getInteger("port", 6379), 5000);
    
              try {
                this.jedisPool = jedisPool;
                this.logger.info("Redis connected: " + jedisPool.getResource().isConnected());
              } catch (Exception e) {
                this.logger.error(BaseUtil.append("Exception while connecting Redis"));
              }
    
            }
    
            @Override
            public void asyncInvoke(Object in, ResultFuture<Object> out) {
    
              try (Jedis jedis = this.jedisPool.getResource()) {
                String key = jedis.get(key);
                this.logger.info("Redis Key: " + key);
              } 
    
            }
        }      
    
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...