Как сначала получить из одного потока, а затем перейти к другому потоку - PullRequest
0 голосов
/ 26 февраля 2019

Мне нужно вызвать cassandra для получения даты, а затем передать полученную дату другому потоку, который собирается вставить данные в базу данных.

def fetchDate: Future[Done] =
  readJournal(persistenceKey)
    .drop(3)
    .take(1)
    .map(l => l.mydate)
    .runWith(Sink.ignore)

def insertRowsToDb: Future[Done] = 
  readJournal(somePersistenceKey)
    .drop(4)
    .take(1)
    .map(data => MyClass(data))
    .mapAsync(1) { myData => 
      for {
        insert <- myRepository.insert(data.id, fetchDate) //error here because fetchDate is unavailable 
      }
    }

  class MyRepository(tableName: String) {
    def insert(id: String, fetchedDate: Long): Future[Int] =
      config.db.run {
        sqlu"""INSERT INTO #${tableName}
           VALUES (
            ${id},
            ${fetchedDate}
           )
          """
      }

Вопрос

  • Как сначала выполнить fetchDate, а затем передать результат в строку myRepository.insert?

1 Ответ

0 голосов
/ 26 февраля 2019

У вас две проблемы.Во-первых, fetchDate даже не содержит дату!Исправление означает, что запуск с Sink, который делает больше, чем просто «игнорирование»:

def fetchDateFut: Future[Date] =
  readJournal(persistenceKey)
    .drop(3)
    .take(1)
    .map(l => l.mydate)
    .runWith(Sink.last)

Затем вам нужно flatMap ваш Future, чтобы ввести в область эту дату:

def insertRowsToDb: Future[Done] = fetchDateFut.flatMap { fetchDate: Date =>
  readJournal(somePersistenceKey)
    .drop(4)
    .take(1)
    .map(data => MyClass(data))
    .mapAsync(1) { myData => 
      for {
        insert <- myRepository.insert(data.id, fetchDate)
      }
    }
}
...