Как использовать заимствованные данные Rust только для чтения несколькими потоками Java? - PullRequest
1 голос
/ 25 сентября 2019

У меня есть структура Foo и FooRef, в которой есть ссылки на данные из Foo:

struct Foo { /* ... */ }

struct FooRef<'foo> { /* ... */ }

impl Foo {
    pub fn create_ref<'a>(&'a self) -> FooRef<'a> { /* ... */ }
}

Теперь Foo напрямую не может использоваться в логике;Мне нужно FooRef.Создание FooRef требует большого количества вычислений, поэтому я делаю это один раз сразу после создания экземпляра Foo.FooRef является неизменным;он используется только для чтения данных.

Для доступа к этому экземпляру FooRef требуется несколько потоков.Как я могу это реализовать?Вызывающие потоки являются потоками Java, и это будет использоваться с JNI.Это предотвращает использование пула потоков с заданной областью действия, например.

Еще одна сложность заключается в том, что когда мне нужно обновить экземпляр Foo, чтобы загрузить в него новые данные.Затем мне также нужно пересоздать экземпляр FooRef.

Как этого добиться поточно-ориентированно и безопасно с памятью?Я попытался возиться с указателями и RwLock, но это привело к утечке памяти (использование памяти продолжало добавляться при каждой перезагрузке).Я разработчик Java, новичок в указателях.

Данные в Foo в основном текстовые и около 250 МБ.FooRef в основном str с и структуры str с заимствованы из Foo.

Мое объяснение использования Java

Я использую две long переменные в классе Javaхранить указатели на Foo и FooRef.Я использую статический ReentrantReadWriteLock для защиты этих указателей.

Если данные необходимо обновить в Foo, я получаю блокировку записи, сбрасываю FooRef, обновляю Foo, создаю новый FooRef и обновляю указатель ref в Java.

Если мне нужно прочитать данные (то есть, когда я не обновляю Foo), я получаю блокировку чтения и использую FooRef.

Утечка памяти видна только тогда, когда несколько Javaпотоки вызывают этот код.

Rust:

use jni::objects::{JClass, JString};
use jni::sys::{jlong, jstring};
use jni::JNIEnv;

use std::collections::HashMap;

macro_rules! foo_mut_ptr {
    ($env: expr, $class: expr) => {
        $env.get_field(*$class, "ptr", "J")
            .ok()
            .and_then(|j| j.j().ok())
            .and_then(|ptr| {
                if ptr == 0 {
                    None
                } else {
                    Some(ptr as *mut Foo)
                }
            })
    };
}

macro_rules! foo_ref_mut_ptr {
    ($env: expr, $class: expr) => {
        $env.get_field(*$class, "ptrRef", "J")
            .ok()
            .and_then(|j| j.j().ok())
            .and_then(|ptr| {
                if ptr == 0 {
                    None
                } else {
                    Some(ptr as *mut FooRef)
                }
            })
    };
}

macro_rules! foo_mut {
    ($env: expr, $class: expr) => {
        foo_mut_ptr!($env, $class).map(|ptr| &mut *ptr)
    };
}

macro_rules! foo_ref {
    ($env: expr, $class: expr) => {
        foo_ref_mut_ptr!($env, $class).map(|ptr| &*ptr)
    };
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_create(_env: JNIEnv, _class: JClass) -> jlong {
    Box::into_raw(Box::new(Foo::default())) as jlong
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_createRef(env: JNIEnv, class: JClass) -> jlong {
    let foo = foo_mut!(env, class).expect("createRef was called on uninitialized Data");
    let foo_ref = foo.create_ref();
    Box::into_raw(Box::new(foo_ref)) as jlong
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_reload(env: JNIEnv, class: JClass) {
    let foo = foo_mut!(env, class).expect("foo must be initialized");
    *foo = Foo {
        data: vec!["hello".to_owned(); 1024 * 1024],
    };
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_destroy(env: JNIEnv, class: JClass) {
    drop_ptr(foo_ref_mut_ptr!(env, class));
    drop_ptr(foo_mut_ptr!(env, class));
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_destroyRef(env: JNIEnv, class: JClass) {
    drop_ptr(foo_ref_mut_ptr!(env, class));
}

unsafe fn drop_ptr<T>(ptr: Option<*mut T>) {
    if let Some(ptr) = ptr {
        let _foo = Box::from_raw(ptr);
        // foo drops here
    }
}

#[derive(Default)]
struct Foo {
    data: Vec<String>,
}

#[derive(Default)]
struct FooRef<'a> {
    data: HashMap<&'a str, Vec<&'a str>>,
}

impl Foo {
    fn create_ref(&self) -> FooRef {
        let mut data = HashMap::new();
        for s in &self.data {
            let s = &s[..];
            data.insert(s, vec![s]);
        }
        FooRef { data }
    }
}

Java:

package test;

import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

public class App implements AutoCloseable {
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final ReadLock readLock = lock.readLock();
    private final WriteLock writeLock = lock.writeLock();

    private volatile long ptr;
    private volatile long ptrRef;
    private volatile boolean reload;

    static {
        System.loadLibrary("foo");
    }

    public static void main(String[] args) throws InterruptedException {
        try (App app = new App()) {
            for (int i = 0; i < 20; i++) {
                new Thread(() -> {
                    while (true) {
                        app.tryReload();
                    }
                }).start();
            }

            while (true) {
                app.setReload();
            }
        }
    }

    public App() {
        this.ptr = this.create();
    }

    public void setReload() {
        writeLock.lock();
        try {
            reload = true;
        } finally {
            writeLock.unlock();
        }
    }

    public void tryReload() {
        readLock.lock();
        debug("Got read lock");

        if (reload) {
            debug("Cache is expired");

            readLock.unlock();
            debug("Released read lock coz expired");

            writeLock.lock();
            debug("Got write lock");

            try {
                if (reload) {
                    fullReload();
                }

                readLock.lock();
                debug("Got read lock inside write");
            } finally {
                writeLock.unlock();
                debug("Released write lock");
            }
        }

        readLock.unlock();
        debug("Released read lock");
    }

    private void fullReload() {
        destroyRef();
        debug("Dropped ref");

        debug("Reloading");
        reload();
        debug("Reloading completed");

        updateRef();
        debug("Created ref");
        reload = false;
    }

    private void updateRef() {
        this.ptrRef = this.createRef();
    }

    private native void reload();

    private native long create();

    private native long createRef();

    private native void destroy();

    private native void destroyRef();

    @Override
    public void close() {
        writeLock.lock();
        try {
            this.destroy();
            this.ptrRef = 0;
            this.ptr = 0;
        } finally {
            writeLock.unlock();
        }
    }

    private static void debug(String s) {
        System.out.printf("%10s : %s%n", Thread.currentThread().getName(), s);
    }

}

1 Ответ

0 голосов
/ 28 сентября 2019

Проблема, о которой я думал как утечка памяти, на самом деле не была утечкой памяти.Проблема заключалась в том, что распределитель использовал локальные арены потоков.Итак, какой бы поток ни перезагружал 250 МБ данных, он оставлял выделенное пространство как есть и не возвращал его в систему.Эта проблема не была специфичной для JNI, но также возникала в чистом безопасном коде ржавчины.См. Почему несколько потоков используют слишком много памяти при удерживании Mutex

Количество созданных арен по умолчанию по умолчанию равно 8 * cpu count = 64 в моем случае.Этот параметр можно изменить, установив переменную MALLOC_ARENA_MAX env.

Поэтому я решил эту проблему, установив переменную MALLOC_ARENA_MAX env в 1.Итак, подход, который я выбрал, хорош.Это была проблема, специфичная для платформы.

Эта проблема возникала только в Ubuntu в WSL.Я также попробовал тот же код без каких-либо настроек в Windows 10, и он работает без проблем.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...