Гонка данных при чтении содержимого архива с архивом в потоке. Что я сделал не так? - PullRequest
0 голосов
/ 20 апреля 2020

Я пытаюсь распараллелить чтение и обработку архивов, которые уже находятся в памяти, но я получаю гонку данных при вызове функции libarchive в потоке. Google sanitizer говорит, что проблема заключается в вызове функции archive_read_open_memory внутри get_archive_contens. Но я прочитал, что все функции в libarchive должны быть поточно-ориентированными. Может кто-нибудь сказать мне, что я сделал не так? Вот мой код темы.

void indexing_thread(std::mutex &m,
                     int &current_indexing_threads, concurrent_queue<std::pair<std::string, std::string>> &raw_files_q,
                     concurrent_queue<std::map<std::string, size_t>> &words_q) {
    while (true) {
        auto raw_file = raw_files_q.front();
        std::string file_buffer = raw_file.first;
        std::string ext = raw_file.second;
        if (file_buffer.empty() && ext.empty()) {
            break;
        }
        raw_files_q.pop();
        std::string file_content;
        if (ext == ".zip") {
            auto archive_contents = get_archive_content(file_buffer);
            for (int i = 0; i < archive_contents.size(); ++i) {
                auto cur_ext = boost::filesystem::extension(archive_contents[i]);
                if (cur_ext == ".txt") {
                    file_content = get_archive_file_contents(archive_contents[i], archive_contents, file_buffer);
                    file_content = convert_to_normalized_utf_string(file_content);
                }
            }
            for (int i = 0; i < archive_contents.size(); ++i) {
                auto cur_ext = boost::filesystem::extension(archive_contents[i]);
                if (cur_ext == ".txt") {
                    file_content = get_archive_file_contents(archive_contents[i], archive_contents, file_buffer);
                    file_content = convert_to_normalized_utf_string(file_content);
                }
            }
        }
        auto words = word_count_map_nonparallel(file_content);
        words_q.push_back(words);
    }
    m.lock();
    current_indexing_threads--;
    if (current_indexing_threads == 0) {
        words_q.push_back(std::map<std::string, size_t>{});
    }
    m.unlock();
}

get_archive_content code:

std::string
get_archive_file_contents(const std::string &filename, std::vector<std::string> contents, std::string file_buffer) {
    if (std::find(contents.begin(), contents.end(), filename) == contents.end()) {
        throw FileDoesNotExistsException(filename);
    }
    struct archive_entry *entry;
    struct archive *archive = archive_read_new();
    archive_read_support_filter_all(archive);
    archive_read_support_format_all(archive);
    archive_read_support_format_raw(archive);
    archive_read_support_format_empty(archive);
    int reading_result = archive_read_open_memory(archive, file_buffer.data(), file_buffer.size());
    if (reading_result != 0) {
        throw std::runtime_error("Error reading archive");
    }
    void *buf;
    int64_t length;
    while (archive_read_next_header(archive, &entry) == ARCHIVE_OK) {
        if (archive_entry_filetype(entry) == AE_IFREG) {
            length = archive_entry_size(entry);
            buf = malloc(length);
            if (!buf) {
                archive_read_data_skip(archive);
                continue;
            }
            archive_read_data(archive, buf, length);
            break;
        }
    }
    std::string result = static_cast<char *>(buf);
    return result;
}

UPD: отчет о дезинфицирующем средствах в нитях Google Report 1 Report 2

1 Ответ

0 голосов
/ 20 апреля 2020

Я разобрался. Проблема была в двоичном файле, который устанавливается из репозиториев Ubuntu. Я решил эту проблему, установив libarchive из исходников.

...