Вот мой взгляд на эту проблему.Требование, чтобы пользовательские сценарии выполнялись внутри vanilla CPython, означает, что вам нужно либо написать интерпретатор для вашего мини-языка, либо скомпилировать его в байт-код Python (или использовать Python в качестве исходного языка), а затем «санировать» байт-код перед его выполнением.
Я взял быстрый пример, основанный на предположении, что пользователи могут писать свои скрипты на Python, и что исходный код и байт-код могут быть в достаточной степени очищены с помощью некоторой комбинации фильтрации небезопасного синтаксиса из дерева разбора и / или удалениянебезопасные коды операций из байт-кода.
Вторая часть решения требует, чтобы байт-код пользовательского сценария периодически прерывался сторожевой задачей, которая гарантирует, что пользовательский сценарий не превышает некоторого предела кода операции, и для всего этогодля запуска на ванильном CPython.
Резюме моей попытки, которая в основном фокусируется на 2-й части проблемы.
- Пользовательские скрипты написаны на Python.
- Используйте byteplay для фильтрации и моотличается от байт-кода.
- Инструментирует байт-код пользователя для вставки счетчика кода операции и вызова функции, контекст которой переключается на задачу наблюдения.
- Используйте greenlet для выполнения пользователембайт-код, с выходом, переключающимся между сценарием пользователя и сопрограммой сторожевого таймера.
- сторожевой таймер вводит предварительно установленное ограничение на число кодов операций, которые могут быть выполнены до возникновения ошибки.
Надеемсяэто по крайней мере идет в правильном направлении.Мне интересно узнать больше о вашем решении, когда вы его получите.
Исходный код для lowperf.py
:
# std
import ast
import dis
import sys
from pprint import pprint
# vendor
import byteplay
import greenlet
# bytecode snippet to increment our global opcode counter
INCREMENT = [
(byteplay.LOAD_GLOBAL, '__op_counter'),
(byteplay.LOAD_CONST, 1),
(byteplay.INPLACE_ADD, None),
(byteplay.STORE_GLOBAL, '__op_counter')
]
# bytecode snippet to perform a yield to our watchdog tasklet.
YIELD = [
(byteplay.LOAD_GLOBAL, '__yield'),
(byteplay.LOAD_GLOBAL, '__op_counter'),
(byteplay.CALL_FUNCTION, 1),
(byteplay.POP_TOP, None)
]
def instrument(orig):
"""
Instrument bytecode. We place a call to our yield function before
jumps and returns. You could choose alternate places depending on
your use case.
"""
line_count = 0
res = []
for op, arg in orig.code:
line_count += 1
# NOTE: you could put an advanced bytecode filter here.
# whenever a code block is loaded we must instrument it
if op == byteplay.LOAD_CONST and isinstance(arg, byteplay.Code):
code = instrument(arg)
res.append((op, code))
continue
# 'setlineno' opcode is a safe place to increment our global
# opcode counter.
if op == byteplay.SetLineno:
res += INCREMENT
line_count += 1
# append the opcode and its argument
res.append((op, arg))
# if we're at a jump or return, or we've processed 10 lines of
# source code, insert a call to our yield function. you could
# choose other places to yield more appropriate for your app.
if op in (byteplay.JUMP_ABSOLUTE, byteplay.RETURN_VALUE) \
or line_count > 10:
res += YIELD
line_count = 0
# finally, build and return new code object
return byteplay.Code(res, orig.freevars, orig.args, orig.varargs,
orig.varkwargs, orig.newlocals, orig.name, orig.filename,
orig.firstlineno, orig.docstring)
def transform(path):
"""
Transform the Python source into a form safe to execute and return
the bytecode.
"""
# NOTE: you could call ast.parse(data, path) here to get an
# abstract syntax tree, then filter that tree down before compiling
# it into bytecode. i've skipped that step as it is pretty verbose.
data = open(path, 'rb').read()
suite = compile(data, path, 'exec')
orig = byteplay.Code.from_code(suite)
return instrument(orig)
def execute(path, limit = 40):
"""
This transforms the user's source code into bytecode, instrumenting
it, then kicks off the watchdog and user script tasklets.
"""
code = transform(path)
target = greenlet.greenlet(run_task)
def watcher_task(op_count):
"""
Task which is yielded to by the user script, making sure it doesn't
use too many resources.
"""
while 1:
if op_count > limit:
raise RuntimeError("script used too many resources")
op_count = target.switch()
watcher = greenlet.greenlet(watcher_task)
target.switch(code, watcher.switch)
def run_task(code, yield_func):
"This is the greenlet task which runs our user's script."
globals_ = {'__yield': yield_func, '__op_counter': 0}
eval(code.to_code(), globals_, globals_)
execute(sys.argv[1])
Вот пример пользовательского сценария user.py
:
def otherfunc(b):
return b * 7
def myfunc(a):
for i in range(0, 20):
print i, otherfunc(i + a + 3)
myfunc(2)
Вот примерный прогон:
% python lowperf.py user.py
0 35
1 42
2 49
3 56
4 63
5 70
6 77
7 84
8 91
9 98
10 105
11 112
Traceback (most recent call last):
File "lowperf.py", line 114, in <module>
execute(sys.argv[1])
File "lowperf.py", line 105, in execute
target.switch(code, watcher.switch)
File "lowperf.py", line 101, in watcher_task
raise RuntimeError("script used too many resources")
RuntimeError: script used too many resources