как возобновить задачу эликсира, откуда он сломал эликсир феникс - PullRequest
0 голосов
/ 26 сентября 2018

Мы используем SeaweedFS , которая является файловой системой для хранения (изображений) файлов. Она работает как rest API.Мы пытаемся переместить данные с одного сервера на другой.

Есть несколько уровней каталогов данных.Базовая схема, с помощью которой сохраняется изображение:

http://{server}:8888/ballymore-project-wave/snapshots/recordings/{year}/{month}/{day}/{hour}/00_00_000.jpg

Каждый уровень каталога имеет свой собственный возврат в виде JSON, например

{
    "Path": "/ballymore-project-wave/snapshots/recordings/",
    "Files": null,
    "Directories": [
        {
            "Name": "2016",
            "Id": 91874
        },
        {
            "Name": "2017",
            "Id": 1538395
        }
    ],
    "Limit": 100,
    "LastFileName": "",
    "ShouldDisplayLoadMore": false
}

. Ответ выше длякогда вы пытаетесь получить годы для записей, то же самое относится к месяцу, дням и часу.есть небольшое изменение, когда вы выбираете один час как

{
    "Path": "/ballymore-project-wave/snapshots/recordings/2016/11/02/01/",
    "Files": [
        {
            "name": "00_00_000.jpg",
            "fid": "29515,744a5a496b97ff98"
        },
        {
            "name": "00_01_000.jpg",
            "fid": "29514,744a5aa52ea3cf3d"
        }
    ],
    "Directories": null,
    "Limit": 100,
    "LastFileName": "02_15_000.jpg",
    "ShouldDisplayLoadMore": true
}

Теперь нам нужно переместить все эти данные с одного сервера на другой.Я написал для него скрипт:

  defp move_snapshots(exids) do
    exids
    |> Enum.each(fn (exid) ->
      request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/", "Directories", "Name")
      |> Enum.sort |> Enum.each(fn (year) ->
        request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/", "Directories", "Name")
        |> Enum.sort |> Enum.each(fn (month) ->
          request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/", "Directories", "Name")
          |> Enum.sort |> Enum.each(fn (day) ->
            request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/", "Directories", "Name")
            |> Enum.sort |> Enum.each(fn (hour) ->
              request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/?limit=3600", "Files", "name")
              |> Enum.sort |> Enum.each(fn (file) ->
                exist_on_seaweed?("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
                |> copy_or_skip("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
              end)
            end)
          end)
        end)
      end)
    end)
  end

Это основная функция, exids, предназначенная для идентификации типа строки всех камер, например, это ballymore-project-wave.

В приведенном выше сценарии я проверяю каждый уровень, и если что-то присутствует, я иду глубже и до последнего, я проверяю, является ли его действительное изображение как

  defp exist_on_seaweed?(url) do
    hackney = [pool: :seaweedfs_download_pool, recv_timeout: 30_000_000]
    case HTTPoison.get("#{@seaweedfs}#{url}", ["Accept": "application/json"], hackney: hackney) do
      {:ok, %HTTPoison.Response{status_code: 200, body: data}} -> {:ok, data}
      _error ->
        :not_found
    end
  end

  defp copy_or_skip(:not_found, _path), do: :noop
  defp copy_or_skip({:ok, data}, path) do
    hackney = [pool: :seaweedfs_upload_pool]
    case HTTPoison.post("#{@seaweedfs_new}#{path}", {:multipart, [{path, data, []}]}, [], hackney: hackney) do
      {:ok, _response} -> Logger.info "[seaweedfs_save]"
      {:error, error} -> Logger.info "[seaweedfs_save] [#{inspect error}]"
    end
  end

Это все работает нормально, но у меня есть небольшоеВопрос возобновления этого, когда он по какой-то причине потерпел крах или сломался, мне нужно руководство / идея для этого.Как вы можете видеть, если камера exids имеет значение 200 и она разоряется на 100 или, может быть, она возобновится, но с самого начала мы не можем удалять вещи на старом сервере после перемещения до полного перемещения. Любая помощь будет принята с благодарностью.Также, если вы думаете, что в коде могут быть некоторые улучшения, которые будут полезны.

1 Ответ

0 голосов
/ 26 сентября 2018

Пока вы не опубликуете фактическую трассировку стека или подробности ошибки, с которой вы столкнулись, невозможно точно определить, в чем дело.Но для начала вот несколько советов, которые могут помочь:

  • Вам следует разбить ваш метод move_snapshots на что-то более понятное, возможно, используя что-то вроде Enum.reduce/3 с рекурсией и вызовом метода copy_or_skip в качестве базового случая.

  • Попробуйте обернуть реализацию copy_or_skip метода в try/rescue, спасая любые исключения, регистрируя их и переходя к следующему.

    defp copy_or_skip(args, path) do
      # Your Implementation
    rescue
      error -> Logger.error("Exception caught on #{inspect(path)}\n#{inspect(error)}")
    end
    
  • Вы также можете просто просмотреть список всех файлов и добавить действительные пути к некоторому «Рабочему» вБиблиотека обработки заданий, такая как Que или Toniq.Библиотека выполнит все операции перемещения и пометит их как успешные или неудачные.Затем вы можете вернуться, чтобы увидеть, какие операции завершились неудачно, и выяснить, что их вызвало, или автоматически перезапустить сбойные операции.


Еще несколько советов по повышению надежности и производительности кода:

  • Используйте Stream или, что еще лучше, Flow, чтобы разделить задачи и обрабатывать их параллельно.
  • Выполнитефактическая операция перемещения в отдельном Task процессе, в идеале управляемом Supervisor.(Опционально используйте Пул).
...