синхронизировать и объединять сообщения / поток данных - PullRequest
3 голосов
/ 07 июля 2011

Это очень распространенная проблема обработки данных с датчиков.

Чтобы синхронизировать и объединять данные датчиков из разных источников, я хотел бы реализовать их на Java без слишком сложных 3-х библиотек или фреймворков.

Скажем, я определяю объект (O), который состоит, например, из 4 атрибутов (A1, .. A4). 4 атрибута поступают из разных каналов данных, например канальный сокет.

4 атрибута поступают, как правило, с частотой 1,0 ~ 2,0 Гц, и их приходы не зависят друг от друга. Как только 4 атрибута (A1, ..A4) будут приходить одновременно (в пределах небольшого временного окна, например, 100 мс), я создаю новый объект (O) из этих 4 атрибутов.

Описательный сценарий выглядит следующим образом. время прибытия A1 ~ A4 помечено *.

Объекты O1 ~ U3 строятся в моменты времени t1, t2 и t3 соответственно. Некоторые атрибуты поступают между t2 и t3, но не завершены для создания объекта, поэтому они будет отброшен и проигнорирован.

  A1     *          *         *         *
  A2      *           *         *        *
  A3     *            *                  * 
  A4      *            *       *         * 
  --------|------------|-----------------|----------> time
          t1           t2                t3
          O1           O2                O3  

некоторые требования:

  1. идентифицирует момент времени a.s.a.p. для построения объекта из последних 4 входящих атрибутов.
  2. FIFO, O1 должен быть построен до O2 и т. Д.
  3. меньше блокировок в Java
  4. в конечном итоге отбросить данные, если они не завершены для создания объекта.

Несколько быстрых идей по реализации:

  • хранить любые входящие атрибуты в очереди FIFO дискретных по времени сегментов (каждый блок содержит 4 различных атрибута).
  • запустить бесконечный поток одновременно, чтобы проверить очередь FIFO (из начала очереди), если какой-либо сегмент уже заполнен 4 различными атрибутами. Если да, то создайте объект и удалите сегмент из очереди. Если контейнер не заполнен в течение определенного временного окна, он будет отброшен.

любые предложения и исправления приветствуются!

Ответы [ 4 ]

0 голосов
/ 07 июля 2011

Вы можете сделать что-то вроде этого, операция get блокируется, пока не поступят данные, операция add не блокируется. Операция get может быть немного оптимизирована, так что вы сохраняете кандидатов в структуре paralell, чтобы вам не нужно было перебирать всех кандидатов при фильтрации старых элементов. Итерации по 4 пунктам должны быть достаточно быстрыми.

import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;

public class Filter<V> {

    private static final long MAX_AGE_IN_MS = 100;

    private final int numberOfSources;

    private final LinkedBlockingQueue<Item> values = new LinkedBlockingQueue<Item>();

    public Filter(int numberOfSources) {
        this.numberOfSources = numberOfSources;
    }

    public void add(String source, V data) {
        values.add(new Item(source, data));
    }

    public void get() throws InterruptedException {
        HashMap<String, Item> result = new HashMap<String, Item>();
        while (true) {
            while (result.size() < numberOfSources) {
                Item i = values.take();
                result.put(i.source, i);
                if (result.size() == numberOfSources) {
                    break;
                }
            }
            //We got candidates from each source now, check if some are too old.
            long now = System.currentTimeMillis();
            Iterator<Item> it = result.values().iterator();
            while (it.hasNext()) {
                Item item = it.next();
                if (now - item.creationTime > MAX_AGE_IN_MS) {
                    it.remove();
                }
            }
            if (result.size() == numberOfSources) {
                System.out.println("Got result, create a result object and return the items " + result.values());
                break;
            }
        }
    }

    private class Item {
        final String source;
        final V value;
        final long creationTime;

        public Item(String source, V value) {
            this.source = source;
            this.value = value;
            this.creationTime = System.currentTimeMillis();
        }

        public String toString() {
            return String.valueOf(value);
        }
    }


    public static void main(String[] args) throws Exception {
        final Filter<String> filter = new Filter<String>(4);
        new Thread(new Runnable() {
            public void run() {
                try {
                    filter.get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        filter.add("a0", "va0.1");
        filter.add("a0", "va0.2");
        Thread.sleep(2000);
        filter.add("a0", "va0.3");
        Thread.sleep(100);
        filter.add("a1", "va1.1");
        filter.add("a2", "va2.1");
        filter.add("a0", "va0.4");
        Thread.sleep(100);
        filter.add("a3", "va3.1");
        Thread.sleep(10);
        filter.add("a1", "va1.2");
        filter.add("a2", "va2.2");
        filter.add("a0", "va0.5");

    }


}
0 голосов
/ 07 июля 2011

Вот еще одна безумная идея:

используйте один единственный LinkedBlockingQueue для записи значений со всех датчиков A1-A4

назначить эту очередь AtomicReference переменная

создать задачу таймера, которая будет переключать эту очередь на новую через заданные интервалы (100 мс)

получить все данные из старой очереди и посмотреть, есть ли у вас все данные A1-A4

если да, то создать объект, иначе отбросить все

0 голосов
/ 07 июля 2011

Это еще один способ сделать это - это просто псевдокод, хотя, вам нужно написать это самостоятельно:)

class SlidingWindow {
    AtomicReference<Object> a1;
    AtomicReference<Object> a2;
    AtomicReference<Object> a3;
    AtomicReference<Object> a4;

    Queue<Long> arrivalTimes = new Queue(4);

    public Bucket setA1(Object data) {
        a1.set(data);
        now = System.currentTimeInMillis()
        long oldestArrivalTime = arrivalTimes.pop();
        arrivalTimes.push(now);
        if (now - oldestArrivalTime < 100) {
            return buildBucket();
        }
        return null;
    }

    public Bucket setA2(Object data) { ...

    ...

    private Bucket buildBucket() {
        Bucket b = new Bucket(a1, a2, a3, a4);
        a1.clear();
        a2.clear();
        a3.clear();
        a4.clear();
        return b;
    }

}
0 голосов
/ 07 июля 2011

Это вряд ли решит вашу проблему, но может указать вам правильное направление.

Я бы использовал Google Guava MapMaker для первой попытки:

ConcurrentMap<Key, Bucket> graphs = new MapMaker()
                                   .expireAfterAccess(100, TimeUnit.MILLISECOND)
                                   .makeComputingMap(new Function<Key, Bucket>() {
                                                     public Bucket apply(Key key) {
                                                         return new Bucket(key);
                                                     }
                                    });

Это создаст карту, чьи записи исчезнут, если к ним не было доступа в течение 100 мс, и создаст новую корзину, когда ее попросят.

Что я не могу понять, так это то, чем был бы Ключ: S Что вам действительно нужно, так это функциональность в виде очереди.

...