Ожидание загрузки строк базы данных с использованием TableDependency и F # - PullRequest
0 голосов
/ 05 ноября 2018

У меня есть проект F #, который загружает некоторые файлы во внешнюю подсистему, а затем использует Зависимость таблицы для ожидания добавления ряда строк в таблицу в качестве побочного эффекта.

Таблица Зависимости используется в типе ниже, чтобы следить за изменениями в БД. Вызывает пользовательское событие, когда строка добавляется / изменяется / что угодно:

// just using this type for the RecordChangedEvent to marshal the id we want into something
type AccountLoaded() = 
    let mutable someId = ""

    // this property name matches the name of the table column (SomeId)
    member this.SomeId 
        with get () = someId
        and set (value) = someId <- value

// AccountLoadWatcher
type AccountLoadWatcher() = 
    let mutable _tableDependency = null
    let event = new Event<_>()

    interface IDisposable with
        member this.Dispose() = 
            _tableDependency.Stop()
            _tableDependency.Dispose()

    // custom event we can send when an account is loaded

    [<CLIEvent>]
    member this.AccountLoaded = event.Publish

    member private this.NotifyAccountLoaded(sender : RecordChangedEventArgs<AccountLoaded>) = 
        let accountLoaded = sender.Entity
        event.Trigger(accountLoaded.SomeId)

    member this.Watch() = 
        _tableDependency <- DbLib.getTableDependency "dbo" "AccountTable" 
                                null
        _tableDependency.OnChanged.Add(this.NotifyAccountLoaded)
        _tableDependency.Start()

Что я хочу сделать, это взять вышеуказанный объект и просто подождать загрузки всех строк с идентификаторами, которые мне нужны. То, что у меня пока есть:

let waitForRows(csvFileRows) =
  let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows

  let mutable collected = Set.empty
  let isInSet id = Set.contains id idsToWaitFor
  let notDone = not <| (Set.difference idsToWaitFor collected = Set.empty)
  let accountLoadedHandler id = 
      collected <- collected.Add id
      printfn "Id loaded %s, waiting for %A\n" id (Set.difference idsToWaitFor collected)
  loadToSubsystem csvFileRows |> ignore

  // wait for all the watcher events; filtering each event object for ids we care about
     watcher.AccountLoaded
           |> Observable.takeWhile (fun _ -> notDone) 
           |> Observable.filter (fun e -> isInSet e) 
           |> Observable.subscribe accountLoadedHandler
           |> ignore

  doMoreWork()

но это просто продолжает делатьMoreWork, не дожидаясь всех событий, которые мне нужны выше.

Нужно ли использовать задачу или асинхронную работу? Агенты F #?

1 Ответ

0 голосов
/ 06 ноября 2018

Учитывая, что вы используете Observable.takeWhile в вашем примере, я предполагаю, что вы используете оболочку FSharp.Control.Reactive , чтобы получить доступ ко всему спектру реактивных комбинаторов.

У вашего подхода есть несколько хороших идей, таких как использование takeWhile для ожидания сбора всех идентификаторов, но использование мутации весьма неудачно - это может быть даже небезопасно из-за возможных условий гонки.

Хорошей альтернативой является использование одной из различных функций scan для сбора состояния по мере возникновения событий. Вы можете использовать Observable.scanInit, чтобы начать с пустого набора и добавить все идентификаторы; затем Observable.takeWhile, чтобы продолжать принимать события, пока у вас не будут все идентификаторы, которые вы ожидаете. Чтобы фактически ждать (и блокировать), вы можете использовать Observable.wait. Примерно так:

let waitForRows(csvFileRows) =
  let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows

  let finalCollectedIDs = 
    watcher.AccountLoaded
    |> Observable.scanInit Set.empty (fun collected id -> Set.add id collected) 
    |> Observable.takeWhile (fun collected -> not (Set.isSubset idsToWaitFor co llected))
    |> Observable.wait

  printfn "Completed. Final collected IDs are: %A" finalCollectedIDs 
...