Компонент ColdFusion / Railo для многопоточных HTTP-запросов - PullRequest
2 голосов
/ 18 февраля 2012

У меня проблемы с областями действия переменных, когда я вызываю поток http-вызовов внутри компонента cfscript удаленного доступа.

Мне удалось сделать это успешно как отдельную страницу CFM, но у меня возникли проблемыпри запуске этого кода в CFC.

Я звоню CFC из приложения, поэтому он должен быть доступен через браузер, что-то вроде

site.co.uk/page.cfc?method=search&returnFormat=json&strList=item1,item2,item3

Этокод, который вызывает проблему

var cnt = listLen(strList);

for(var p = 1; p <= cnt; p++) {
    var tmpText = listGetAt(strList,p);
    thread name='t#p#' {
        ArrayAppend(arr,getPrice(tmpText));
    }
}

for(var p = 1; p <= cntPlates; p++) {
    threadJoin('t#p#');
    writeDump(t1);
}

Когда я пишу Dump t1, мой первый поток, он содержит всю информацию о потоке, но тело потока - просто ошибка: переменная [TMPTEXT] не существует

Нужно ли создавать tmpText внутри потока?Или положить его в область т1 или поток?Или что-то совсем другое.

1 Ответ

3 голосов
/ 19 февраля 2012

Для полного ответа на этот вопрос требуется два ответа, и они связаны с областью THREAD в ColdFusion.

ONE

На первый взгляд, ваш код страдает от простого случая доступа к области видимости. [TMPTEXT] не существует является результатом трех порожденных потоков, пытающихся получить доступ к переменной, которая была объявлена ​​ вне их области видимости и не была явно передана в поток при создании экземпляра.

Решение состоит в том, чтобы явно передать переменные в область видимости потока, чтобы работать с ними, а именно:

thread name='t#p#'
       myText=tmpText {
    ArrayAppend(arr,getPrice(myText));
}

Здесь копия переменной tmpText, которая была объявлена ​​вне потока, явно передается как 'myText' и упоминается в контексте границы потока этим новым именем переменной.

Что касается того, почему ваш код работал в старом старом CFM, но не в рамках CFC, это потому, что в контексте CFM потоки имеют доступ ко всем общим областям, VARIABLES, FORM, SESSION и т. д., но запись в эти области должна выполняться с осторожностью (например, через CFLOCK), в противном случае возникают проблемы с синхронизацией (взаимоблокировки). CFC является инкапсулированным объектом и, конечно, имеет доступ только к определенным неявным областям (например, VARIABLES отличается для CFC и CFM), и к большинству других областей необходимо обращаться с соответствующим именем области.

TWO

Проблема намного больше , связанная с вашим вопросом и фрагментом кода, связана с вашим восприятием того, как потоки работают с общими данными. Я заранее прошу прощения, если это станет проповедью, но я должен сыграть адвоката дьявола, основываясь на моих ограниченных знаниях о вашей системе, всех функциях и ваших намерениях с самого начала.

На основании предоставленного вами кода:

var cnt = listLen(strList);

for(var p = 1; p <= cnt; p++) {
    var tmpText = listGetAt(strList,p);
    thread name='t#p#' {
        ArrayAppend(arr,getPrice(tmpText));
    }
}

Я не вижу объявления массива 'arr', поэтому мое первоначальное предположение состоит в том, что вы планируете подойти к этому многопоточному методу, создав контейнер хранения (массив), а затем разветвить несколько потоков для выполнения работы с массив, получая общее повышение производительности при распараллеливании обновлений массива.

Проблема этого подхода в том, что потоки не знают о работе друг друга; существует принципиальная нехватка синхронизации между ними. Когда я пытаюсь запустить код как есть, он не знает о массиве 'arr', потому что он явно не был передан (точно такая же проблема, как и проблема TMPTEXT, рассмотренная выше). Однако простой передачи массива в поток недостаточно, поскольку он копируется значением в область потока, что означает, что любая работа, выполняемая с этим массивом в потоке, теряется при выходе из нить.

Какой правильный подход? Чтобы каждый поток в конечном итоге работал на его собственной локальной версии вычисленного значения из getPrice () , а затем присоединялся к потокам, заканчивая созданием окончательного массива из этих вычисленных значений из каждая нить:

component {

    private numeric function getPrice( string lookup ) {
        return Val(Right(lookup,1));        
    }

    remote array function search( string strList ) output=true {

        var cnt = ListLen(strList);
        var arr = ArrayNew(1);

        for(var p = 1; p <= cnt; p++) {

            var tmpText = ListGetAt( strList, p );

            thread name='t#p#' 
                    outerTmpText = tmpText {
                var thread.destVal = getPrice( outerTmpText );
            }
        }

        for(var p = 1; p <= cnt; p++) {
            ThreadJoin('t#p#');
            WriteDump(evaluate('t' & p));
        }

        for(var p = 1; p <= cnt; p++) {
            ArrayAppend(arr, cfthread['t' & p].destVal);
        }

        return arr; 
    }

}

Здесь вы увидите, что в теле функции search каждый поток создает свою собственную локальную переменную 'destVal', которая является потокобезопасным хранилищем для вычисленного значения, возвращаемого getPrice (). Эти три потока могут свободно вызывать методы и сохранять результаты независимо друг от друга, не влияя на большую общую переменную (что может привести к проблемам синхронизации).

Затем, как только все ваши потоки объединяются, выполняется последний цикл для построения этого массива 'arr', передавая вычисленное значение из каждого из определенных потоков (используя область CFTHREAD для доступа к каждому отдельному var 'destVal'). .

ПРИМЕЧАНИЕ: Я сделал этот пример кода удаленным, чтобы вы могли продолжать вызывать его, как вы просили в описании вопроса (можно использовать удаленно), но имейте в виду, что При этом вызовы WriteDump будут игнорироваться.

Для получения дополнительной информации о том, как проектировать и разрабатывать многопоточные приложения, а также для более глубокого понимания того, как потоки работают с локальными / общими данными (и возникающими проблемами), я рекомендую начать с Java Concurrency на практике .

Источник: Использование данных потоков (Adobe LiveDocs)

--- ОТВЕТ НА БОНУС ---

Если вы хотите, чтобы правильный код для завершенного CFC работал с этим общим массивом, он будет выглядеть следующим образом (READ CAVEAT BELOW) :

component {

variables.arr = ArrayNew(1);

private numeric function getPrice( string lookup ) {
    return Val(Right(lookup,1));        
}

remote array function search( string strList ) output=true {

    var cnt = ListLen(strList);

    for(var p = 1; p <= cnt; p++) {

        var tmpText = ListGetAt( strList, p );

        thread name='t#p#' 
                outerTmpText = tmpText {
            lock name='arrWrite'
                    type='exclusive'
                    timeout='15' {
                ArrayAppend(variables.arr, getPrice( outerTmpText ) );
            }
        }
    }

    for(var p = 1; p <= cnt; p++) {
        ThreadJoin('t#p#');
        WriteDump(evaluate('t' & p));
    }

    return variables.arr;   
}

}

CAVEAT: Эта реализация правильно выполняет запись в переменную arr, которая находится внутри CFC (и, таким образом, является общей), но безопасно блокирует запись в массив, так что разветвленные потоки не вызывают взаимоблокировку. Обратная сторона? Такая конструкция не обеспечит дополнительного улучшения производительности вообще ; блокируя каждую запись в массив, ваша производительность точно такая же, как если бы вы просто зацикливали массив в одном потоке и обрабатывали каждый элемент самостоятельно - начиная с каждой записи (что может быть быстрее от потока к потоку). все еще должен ждать снятия блокировки, чтобы завершить запись.

--- ОТВЕТ НА БОНУС №2 ---

Я спал на этом и придумал альтернативное решение. Технически, есть еще один способ сделать так, чтобы несколько потоков записывали в общий массив, не блокируя массив, чтобы обеспечить синхронизацию: Размер массива заблаговременно и каждый поток уникально ссылается на определенный индекс в массиве. . Недостатком этого является то, что вам придется

a) Знайте размер списка, который вы собираетесь обработать заранее, или б) Увеличьте его до максимального значения, и когда вы выполняете итерацию по массиву, вы тратите лишние циклы, чтобы проверить, существует ли значение по этому индексу.

Вот пример использования a):

component {

variables.arr = ArrayNew(1);

private numeric function getPrice( string lookup ) {
    return Val(Right(lookup,1));        
}

remote array function search( string strList ) output=true {

    var cnt = ListLen(strList);

    ArrayClear(variables.arr);
    ArrayResize(variables.arr, cnt);

    for(var p = 1; p <= cnt; p++) {

        var tmpText = ListGetAt( strList, p );

        thread name='t#p#' 
                outerTmpText = tmpText
                index = p
        {
            variables.arr[index] = getPrice( outerTmpText );
        }
    }

    for(var p = 1; p <= cnt; p++) {
        ThreadJoin('t#p#');
        WriteDump(evaluate('t' & p));
    }

    return variables.arr;   
}

В этой методологии массив уже имеет соответствующий размер до появления отдельных потоков, которые будут работать над ним. И вместо того, чтобы регулировать размер массива на лету (через ArrayAppend), который потенциально может выбить синхронность, каждый поток работает с определенным индексом массива. Обратите внимание, что мы передаем этот индекс в область видимости потока через итеративную переменную 'p'. Это потокобезопасный способ работы с общим массивом без блокировки доступа к нему. Просто помните об ограничениях, указанных выше.

...