Создайте несколько соединений с помощью весенней загрузки с Mapr-DB - PullRequest
0 голосов
/ 12 января 2019

У меня есть требование создать несколько соединений с MapR-DB. У меня объект Конфигурация создан и передаю объект в класс репозитория. Я хочу понять, является ли использование пружинного прицела «прототип» элегантным способом сделать это.

Вот класс RawMaprWriterThread, который создает объект соединения и передает его в качестве аргумента метода сохранения. Это создается несколько раз несколькими потоками

@Component
@Scope("prototype")
public class RawMaprWriterThread implements Runnable {
    final Logger logger = LoggerFactory.getLogger(RawMaprWriterThread.class);

    private List<RawSyslogMessageVO> messageList;

    @Autowired
    private RawSyslogMaprRepository rawSyslogRepository;

    public List<RawSyslogMessageVO> getMessageList() {
        return messageList;
    }

    @Override
    public void run() {
        logger.info("<----------Creating config object for MapR DB Writer------------>");
        Configuration conf = HBaseConfiguration.create();
        conf.set("mapr.hbase.default.db", "maprdb");
        logger.info("<----------Executing save with messageList of size ------------> " + messageList.size());
        rawSyslogRepository.save(messageList, conf);
    }

    public void setMessageList(List<RawSyslogMessageVO> messageList) {
        logger.info("<-----------------RawMaprWriterThread setMessageList executed ----------->");
        this.messageList = messageList;
    }

}

Ниже приведен класс хранилища, который использует объект конфигурации и записывает данные в MapR-DB

@Component
public class RawSyslogMaprRepository {

    final Logger logger = LoggerFactory.getLogger(RawSyslogMaprRepository.class);

    public static final byte[] COLUMFAMILY_KEY_BYTES = "key".getBytes();
    public static final byte[] COLUMFAMILY_RAWSYSLOG_BYTES = "rawinfo".getBytes();

    static byte[] qKey = Bytes.toBytes("key");
    static byte[] qApplianceID = Bytes.toBytes("applianceID");
    static byte[] qPartyID = Bytes.toBytes("partyID");
    static byte[] qPartyName = Bytes.toBytes("partyName");
    static byte[] qInventryName = Bytes.toBytes("inventryName");
    static byte[] qSentTime = Bytes.toBytes("sentTime");
    static byte[] qMessage = Bytes.toBytes("message");
    static byte[] qTrackID = Bytes.toBytes("trackID");

    public void save(List<RawSyslogMessageVO> syslogList, Configuration conf) {
        String mapRPath = "/app/SmartAnalytics/Apps/DaaS/Network/syslograw";
        TableName tableName = TableName.valueOf(mapRPath);
        try {
            Connection mapRConnection = ConnectionFactory.createConnection(conf);
            BufferedMutator mutator = mapRConnection.getBufferedMutator(tableName);
            List<Put> puts = new ArrayList<Put>();
            logger.info("Creating list of puts ");
            for (RawSyslogMessageVO syslog : syslogList) {
                Date startTime = Calendar.getInstance()
                    .getTime();
                Put p = new Put(Bytes.toBytes(syslog.getKey()));
                p.addColumn(COLUMFAMILY_RAWSYSLOG_BYTES, qKey, Bytes.toBytes(syslog.getKey()));
                p.addColumn(COLUMFAMILY_RAWSYSLOG_BYTES, qApplianceID, Bytes.toBytes(syslog.getApplianceID()));
                p.addColumn(COLUMFAMILY_RAWSYSLOG_BYTES, qPartyID, Bytes.toBytes(syslog.getPartyID()));
                p.addColumn(COLUMFAMILY_RAWSYSLOG_BYTES, qPartyName, Bytes.toBytes(syslog.getPartyName()));
                p.addColumn(COLUMFAMILY_RAWSYSLOG_BYTES, qInventryName, Bytes.toBytes(syslog.getInventryName()));
                p.addColumn(COLUMFAMILY_RAWSYSLOG_BYTES, qSentTime, Bytes.toBytes(syslog.getSentTime()));
                p.addColumn(COLUMFAMILY_RAWSYSLOG_BYTES, qTrackID, Bytes.toBytes(syslog.getTrackID()));
                p.addColumn(COLUMFAMILY_RAWSYSLOG_BYTES, qMessage, Bytes.toBytes(syslog.getMessage()));
                logger.debug("---p-" + p);
                Date endTime = Calendar.getInstance()
                    .getTime();
                long durationInMilliSec = endTime.getTime() - startTime.getTime();
                logger.debug("Time Taken for RawSyslog for each Message in Mapr :: " + durationInMilliSec);
                puts.add(p);

            }

            logger.info("size of puts to the list " + puts.size());
            logger.info("Mutator is mutating");
            mutator.mutate(puts);
            mutator.flush();
            mutator.close();
            mapRConnection.close();

        } catch (IOException e) {
            logger.error("erorr while getting MapR connection", e);
        }

    }
}

Правильно ли я поступаю?

...