Похоже, что при параллельном запуске Spark неверная карта объекта используется - PullRequest
0 голосов
/ 19 апреля 2020

Я довольно новичок в Spark (даже не уверен, что моя проблема связана со Spark). У меня есть код, как показано ниже.

SparkRunner. java

Broadcast<Map<Integer, ObjectDomain>> partialObjectDomain = sc.broadcast(allObjectDomains);         

JavaRDD<Result> resultRDD = inputRdd.map(rdd -> {
    ......
    ObjectDomain domain = partialObjectDomain.getValue().get(id);

    domain.prepareData();

});

ObjectDomain. java

    public ObjectDomain implements Serializable {

        private transient EntityMaps entityMaps;

        ......

        public void prepareData() {

            // logged persons here and the succeed and failed run seem to be the same.
            new EntityMapLoader<Person>().load(persons, entityMaps.personMap); 

            ......

            for (Employee employee : employees) {
                if(employee.getPersonId() != null) {
                    // this is the line throwing error
                    Person person = entityMaps.personMap.get(employee.getPersonId());  
                    employee.setPerson(person);
                }
            }

        }
        ......
    }

    class EntityMaps {

        EntityMap<Person> personMap= new EntityMap<Person>();
        ......
    }


    class EntityMapLoader<E extends IEntity> {

        public void load(List<E> data, EntityMap<E> entityMap) {
            for (E d : data) {
                entityMap.add(d);
            }
        }
    }

Иногда запуск искры не удается из-за линии Я пометил (не всегда). Это бросило RunTimeException, говоря, что "Entity not found". Но если я бегу второй раз, все будет хорошо. Я сравнил неудачные и успешные запуски, похоже, они загружают в EntityMap одинаковых людей. Обратите внимание, что мы запускаем выше искровой работы параллельно с другим набором групп. Иногда мы терпим неудачу, когда мы идем параллельно. Во второй раз (всегда успешно) после сбоя просто запустите группу с ошибками. Не хочу ошибаться, но это заставляет меня думать, что, возможно, мой EntityMap не является каким-то «поточно-ориентированным», поэтому он испортил карту других групп. Возможно ли это?

Чтобы помочь понять код, я также публикую ниже класс EntityMap

EntityMap. java

public EntityMap extends <T extends IEntity> {

    protected Map<Integer, T> entityMap;

    ......

    public void add(T entity) {
        this.entityMap.put(entity.getId(), entity);
    }

    public T get(int id) {
        T entity = this.entityMap.get(id);
        if (entity==null) {
            throw new RuntimeException("Entity not found");
        }
        return entity;
    }
}
...