Возобновить операцию Enum.each при чтении данных из файла в Elixir - PullRequest
0 голосов
/ 29 сентября 2018

У меня есть такой тип Enum.each, который проходит через многие из них.

exids
|> Enum.each(fn (exid) ->
  request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/", "Directories", "Name")
  |> Enum.sort |> Enum.each(fn (year) ->
    request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/", "Directories", "Name")
    |> Enum.sort |> Enum.each(fn (month) ->
      request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/", "Directories", "Name")
      |> Enum.sort |> Enum.each(fn (day) ->
        request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/", "Directories", "Name")
        |> Enum.sort |> Enum.each(fn (hour) ->
          request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/?limit=3600", "Files", "name")
          |> Enum.sort |> Enum.each(fn (file) ->
            exist_on_seaweed?("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
            |> copy_or_skip("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
            save_current_directory(exid, year, month, day, hour, file)
          end)
        end)
      end)
    end)
  end)
  next_exid_index = Enum.find_index(exids, fn(x) -> x == exid end)
  File.write!("#{@root_dir}/moving_old_data", "#{Enum.at(exids, next_exid_index + 1)}")
end)

Это очень длительный цикл, но он не обрабатывает логику остановки и возобновления.

Я попытался сохранить текущие данные в файл и возобновить их оттуда при перезагрузке как

exids
|> clean_already_completed(0)
|> Enum.each(fn (exid) ->
  request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/", "Directories", "Name")
  |> Enum.sort |> clean_already_completed(1) |> Enum.each(fn (year) ->
    request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/", "Directories", "Name")
    |> Enum.sort |> clean_already_completed(2) |> Enum.each(fn (month) ->
      request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/", "Directories", "Name")
      |> Enum.sort |> clean_already_completed(3) |> Enum.each(fn (day) ->
        request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/", "Directories", "Name")
        |> Enum.sort |> clean_already_completed(4) |> Enum.each(fn (hour) ->
          request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/?limit=3600", "Files", "name")
          |> Enum.sort |> clean_already_completed(5) |> Enum.each(fn (file) ->
            exist_on_seaweed?("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
            |> copy_or_skip("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
            save_current_directory(exid, year, month, day, hour, file)
          end)
        end)
      end)
    end)
  end)
  next_exid_index = Enum.find_index(exids, fn(x) -> x == exid end)
  File.write!("#{@root_dir}/moving_old_data", "#{Enum.at(exids, next_exid_index + 1)}")
end)

, тогда как clean_already_completed работает так.

  defp clean_already_completed(list, index), do: get_recent_value(index) |> dont_reduce?(list)

  defp dont_reduce?(nil, list), do: list
  defp dont_reduce?(last, list), do: Enum.drop_while(list, fn el -> el != last end)

  defp get_recent_value(index), do: read_recent_file() |> Enum.at(index)

  defp read_recent_file, do: File.read("#{@root_dir}/moving_old_data") |> file_is_present()

  defp file_is_present({:error, :enoent}), do: []
  defp file_is_present({:ok, ""}), do: []
  defp file_is_present({:ok, data}), do: data |> String.split(" ")

Но это не работает, как ожидалось, когдаЯ останавливаюсь и перезапускаю его, даже во время работы.Он пропускает следующий год, дни и месяцы.

В сохраненном файле у меня есть такие данные

1-granby-row 2017 03 18 05 43_49_000.jpg

Например, при сбое я просто хочу возобновить каждый цикл сэти значения.вышеприведенная строка объясняется как exid year month day hour file.

Есть ли какой-нибудь возможный способ сделать это лучше?что из информации файла, если весь цикл начинается снова, возобновить его оттуда?

1 Ответ

0 голосов
/ 29 сентября 2018

Лучшим способом было бы разделить процесс на более мелкие шаги:

  1. Запустите вложенные операторы each, чтобы создать манифест всех файлов, которые необходимо переместить.
  2. Сохраняйте полные пути к этим файлам в постоянном хранилище данных, таком как Redis, Postgres или Mnesia.
  3. Теперь итерируйте все эти пути, перемещая файлы и помечая их как «выполненные», когда онибыли успешно переданы.
  4. По любой причине, когда происходит сбой приложения или процесс останавливается в середине, вы можете перезапустить приложение и продолжить с того места, где остановились.Поскольку у вас уже есть разрешенный список всех файлов, их путей и статусов, вы можете просто выбрать следующий путь, помеченный как «не выполнено», и продолжить его перемещение.

Как яуже упоминал в моем другом ответе ранее, все это можно сделать, просто используя библиотеку обработки заданий;все, что вам нужно сделать, это разрешить все пути и добавить их в качестве заданий, которые необходимо обработать.В случае ошибок, сбоев и других сбоев, они могут перезапустить «задания», которые были неудачными, и возобновить задания, которые не были завершены.


Вот как будет выглядеть код, если вырешите сделать это без библиотеки обработки заданий:

# Steps 1 and 2
def build_manifest do
  all_files =
    exids
    |> Enum.map(fn exid -> "#{@seaweedfs}/#{exid}/snapshots/recordings/" end)
    |> Enum.flat_map(&get_dir_paths/1)         # Get Year directories
    |> Enum.flat_map(&get_dir_paths/1)         # Get Month directories
    |> Enum.flat_map(&get_dir_paths/1)         # Get Day directories
    |> Enum.flat_map(&get_dir_paths/1)         # Get Hour directories
    |> Enum.flat_map(&get_file_paths/1)        # Get files in the hour directories

  # Now store these files to a datastore
end


# Steps 3 & 4
def copy_all_files do
  case get_next_incomplete_file() do
    nil ->
      IO.puts("All files have been copied")

    path ->
      if exist_on_seaweed?(path)
        copy_file(path)
      end

      copy_all_files()
  end
end


defp get_dir_paths(path) do
  path
  |> request_from_seaweedfs("Directories", "Name")
  |> Enum.sort
  |> Enum.map(&Path.join(path, &1))
end

defp get_file_paths(path) do
  (path <> "?limit=3600")
  |> request_from_seaweedfs("Files", "Name")
  |> Enum.sort
  |> Enum.map(&Path.join(path, &1))
end

defp get_next_incomplete_file do
  # query your datastore to get the next file that is
  # marked "not done"
end

defp copy_file(path) do
  # copy the file
  # and then mark it "done" in the datastore
end
...