Твиттер поток API с агентами в F # - PullRequest
2 голосов
/ 25 августа 2010

Из блога Don Syme (http://blogs.msdn.com/b/dsyme/archive/2010/01/10/async-and-parallel-design-patterns-in-f-reporting-progress-with-events-plus-twitter-sample.aspx)) Я пытался внедрить прослушиватель потока Twitter. Моя цель - следовать указаниям документации Twitter API, в которой говорится, что «твиты часто следует сохранять или помещать в очередь перед обработкой при сборке».высоконадежная система ".

Поэтому мой код должен иметь два компонента:

  • Очередь, которая накапливается и обрабатывает каждое состояние / твит json
  • Что-точтобы прочитать поток Twitter, который выводит в очередь твит в виде строк json

Я выбираю следующее:

  • Агент, которому я публикую каждый твит, который декодируетjson, и выдает его в базу данных
  • Простой веб-запрос http

Я также хотел бы вывести в текстовый файл любую ошибку при вставке в базу данных (возможно, я переключусь наагент супервизора за все ошибки).

Две проблемы:

  • моя стратегия здесь хороша? Если я правильно понимаю, агент ведет себя как умная очередьd обрабатывает свои сообщения асинхронно (если в его очереди 10 парней, он будет обрабатывать их сразу, вместо того, чтобы ждать, пока 1-е завершит, затем 2-е и т. д.), правильно?
  • Согласно сообщению Дона Сайма, все до того времени было Изолировано, поэтому StreamWriter и дамп базы данных Изолированы.Но так как мне это нужно, я никогда не закрываю соединение с базой данных ...?

Код выглядит примерно так:

let dumpToDatabase databaseName = 
   //opens databse connection 
   fun tweet -> inserts tweet in database

type Agent<'T> = MailboxProcessor<'T>



 let agentDump =
            Agent.Start(fun (inbox: MailboxProcessor<string>) ->
               async{
                   use w2 = new StreamWriter(@"\Errors.txt")
                   let dumpError  =fun (error:string) -> w2.WriteLine( error )
                   let dumpTweet =  dumpToDatabase "stream"
                   while true do 
                       let! msg = inbox.Receive()
                       try 
                           let tw = decode msg
                           dumpTweet tw
                       with 
                       | :? MySql.Data.MySqlClient.MySqlException as ex -> 
    dumpError (msg+ex.ToString() ) 
                        | _ as ex -> () 



                             }
                             )

    let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
    let parameters = "track=RT&"
    let stream_url = filter_url

    let stream = twitterStream MyCredentials stream_url parameters


    while true do 
        agentDump.Post(stream.ReadLine())

Большое спасибо!

Редактирование кода с агентом процессора:

let dumpToDatabase (tweets:tweet list)= 
    bulk insert of tweets in database    

let agentProcessor = 
        Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
           async{
               while true do 
                       let! msg = inbox.Receive()
                       try
                          msg
                          |> List.map(decode)
                          |> dumpToDatabase 
                        with
                        | _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
                 }
                 )



let agentDump =
        Agent.Start(fun (inbox: MailboxProcessor<string>) ->
                  let rec loop messageList count = async{
                      try
                          let! newMsg = inbox.Receive()
                          let newMsgList = newMsg::messageList
                          if count = 10 then 
                               agentProcessor.Post( newMsgList )
                               return! loop [] 0
                          else                    
                               return! loop newMsgList (count+1)
                      with
                      | _ as ex -> Console.WriteLine("Dump "+ex.ToString())

                  }
                  loop [] 0)

let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url

let stream = twitterStream MyCredentials stream_url parameters


while true do 
    agentDump.Post(stream.ReadLine())

1 Ответ

5 голосов
/ 25 августа 2010

Я думаю, что лучший способ описать агента - это запущенный процесс, который сохраняет некоторое состояние и может взаимодействовать с другими агентами (или веб-страницами или базой данных). При написании приложения на основе агентов вы часто можете использовать несколько агентов, которые отправляют сообщения друг другу.

Я думаю, что идея создать агента, который читает твиты из Интернета и сохраняет их в базе данных, является хорошим выбором (хотя вы также можете хранить твиты в памяти как состояние агента).

  • Я бы не оставлял соединение с базой данных открытым все время - MSSQL (и MySQL, вероятно, тоже) реализует пул соединений, поэтому он не будет закрывать соединение автоматически, когда вы его отпустите. Это означает, что более безопасно и аналогично эффективно повторно открывать соединение каждый раз, когда вам нужно записать данные в базу данных.

  • Если вы не ожидаете получения большого количества сообщений об ошибках, я, вероятно, сделал бы то же самое для файлового потока (при записи вы можете открыть его, чтобы новый контент был добавлен в конец).

Работа очереди агентов F # заключается в том, что они обрабатывают сообщения одно за другим (в вашем примере вы ожидаете сообщение, используя inbox.Receive(). Когда очередь содержит несколько сообщений, вы получите их одно за другим (в цикле).

  • Если вы хотите обрабатывать несколько сообщений одновременно, вы можете написать агента, который ожидает, скажем, 10 сообщений, а затем отправляет их в виде списка другому агенту (который затем будет выполнять массовую обработку).

  • Вы также можете указать параметр timeout для метода Receive, чтобы вы могли ждать не более 10 сообщений, пока они все приходят в течение одной секунды - таким образом, вы можете довольно элегантно реализовать массовую обработку это не хранит сообщения в течение длительного времени.

...