从底层简析Python程序的执行过程

   这篇文章主要介绍了从底层简析Python程序的执行过程,包括注入操作码和封装程序等解释器执行层面的知识,需要的朋友可以参考下

  最近我在学习 Python 的运行模型。我对 Python 的一些内部机制很是好奇,比如 Python 是怎么实现类似 YIELDVALUE、YIELDFROM 这样的操作码的;对于 递推式构造列表(List Comprehensions)、生成器表达式(generator expressions)以及其他一些有趣的 Python 特性是怎么编译的;从字节码的层面来看,当异常抛出的时候都发生了什么事情。翻阅 CPython 的代码对于解答这些问题当然是很有帮助的,但我仍然觉得以这样的方式来做的话对于理解字节码的执行和堆栈的变化还是缺少点什么。GDB 是个好选择,但是我懒,而且只想使用一些比较高阶的接口写点 Python 代码来完成这件事。

  所以呢,我的目标就是创建一个字节码级别的追踪 API,类似 sys.setrace 所提供的那样,但相对而言会有更好的粒度。这充分锻炼了我编写 Python 实现的 C 代码的编码能力。我们所需要的有如下几项,在这篇文章中所用的 Python 版本为 3.5。

  一个新的 Cpython 解释器操作码

  一种将操作码注入到 Python 字节码的方法

  一些用于处理操作码的 Python 代码

  一个新的 Cpython 操作码

  新操作码:DEBUG_OP

  这个新的操作码 DEBUG_OP 是我第一次尝试写 CPython 实现的 C 代码,我将尽可能的让它保持简单。 我们想要达成的目的是,当我们的操作码被执行的时候我能有一种方式来调用一些 Python 代码。同时,我们也想能够追踪一些与执行上下文有关的数据。我们的操作码会把这些信息当作参数传递给我们的回调函数。通过操作码能辨识出的有用信息如下:

  堆栈的内容

  执行 DEBUG_OP 的帧对象信息

  所以呢,我们的操作码需要做的事情是:

  找到回调函数

  创建一个包含堆栈内容的列表

  调用回调函数,并将包含堆栈内容的列表和当前帧作为参数传递给它

  听起来挺简单的,现在开始动手吧!声明:下面所有的解释说明和代码是经过了大量段错误调试之后总结得到的结论。首先要做的是给操作码定义一个名字和相应的值,因此我们需要在 Include/opcode.h中添加代码。

  ?

1
2
3
4
5
6
7
8
9
10
11
12

/** My own comments begin by '**' **/
/** From: Includes/opcode.h **/
 
/* Instruction opcodes for compiled code */
 
/** We just have to define our opcode with a free value
0 was the first one I found **/
#define DEBUG_OP 0
 
#define POP_TOP 1
#define ROT_TWO 2
#define ROT_THREE 3

  这部分工作就完成了,现在我们去编写操作码真正干活的代码。

  实现 DEBUG_OP

  在考虑如何实现DEBUG_OP之前我们需要了解的是 DEBUG_OP 提供的接口将长什么样。 拥有一个可以调用其他代码的新操作码是相当酷眩的,但是究竟它将调用哪些代码捏?这个操作码如何找到回调函数的捏?我选择了一种最简单的方法:在帧的全局区域写死函数名。那么问题就变成了,我该怎么从字典中找到一个固定的 C 字符串?为了回答这个问题我们来看看在 Python 的 main loop 中使用到的和上下文管理相关的标识符 enter 和 exit。

  我们可以看到这两标识符被使用在操作码 SETUP_WITH 中:

  ?

1
2
3
4
5
6
7

/** From: Python/ceval.c **/
TARGET(SETUP_WITH) {
_Py_IDENTIFIER(__exit__);
_Py_IDENTIFIER(__enter__);
PyObject *mgr = TOP();
PyObject *exit = special_lookup(mgr, &PyId___exit__), *enter;
PyObject *res;

  现在,看一眼宏 _Py_IDENTIFIER 定义

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

/** From: Include/object.h **/
 
/********************* String Literals ****************************************/
/* This structure helps managing static strings. The basic usage goes like this:
Instead of doing
 
r = PyObject_CallMethod(o, "foo", "args", ...);
 
do
 
_Py_IDENTIFIER(foo);
...
r = _PyObject_CallMethodId(o, &PyId_foo, "args", ...);
 
PyId_foo is a static variable, either on block level or file level. On first
usage, the string "foo" is interned, and the structures are linked. On interpreter
shutdown, all strings are released (through _PyUnicode_ClearStaticStrings).
 
Alternatively, _Py_static_string allows to choose the variable name.
_PyUnicode_FromId returns a borrowed reference to the interned string.
_PyObject_{Get,Set,Has}AttrId are __getattr__ versions using _Py_Identifier*.
*/
typedef struct _Py_Identifier {
struct _Py_Identifier *next;
const char* string;
PyObject *object;
} _Py_Identifier;
 
#define _Py_static_string_init(value) { 0, value, 0 }
#define _Py_static_string(varname, value) static _Py_Identifier varname = _Py_static_string_init(value)
#define _Py_IDENTIFIER(varname) _Py_static_string(PyId_##varname, #varname)

  嗯,注释部分已经说明得很清楚了。通过一番查找,我们发现了可以用来从字典找固定字符串的函数 _PyDict_GetItemId,所以我们操作码的查找部分的代码就是长这样滴。

  ?

1
2
3
4
5
6
7
8
9
10

/** Our callback function will be named op_target **/
PyObject *target = NULL;
_Py_IDENTIFIER(op_target);
target = _PyDict_GetItemId(f->f_globals, &PyId_op_target);
if (target == NULL && _PyErr_OCCURRED()) {
if (!PyErr_ExceptionMatches(PyExc_KeyError))
goto error;
PyErr_Clear();
DISPATCH();
}

  为了方便理解,对这一段代码做一些说明:

  f 是当前的帧,f->f_globals 是它的全局区域

  如果我们没有找到 op_target,我们将会检查这个异常是不是 KeyError

  goto error; 是一种在 main loop 中抛出异常的方法

  PyErr_Clear() 抑制了当前异常的抛出,而 DISPATCH() 触发了下一个操作码的执行

  下一步就是收集我们想要的堆栈信息。

  ?

1
2
3
4
5
6
7
8
9

/** This code create a list with all the values on the current stack **/
PyObject *value = PyList_New(0);
for (i = 1 ; i <= STACK_LEVEL(); i++) {
tmp = PEEK(i);
if (tmp == NULL) {
tmp = Py_None;
}
PyList_Append(value, tmp);
}

  最后一步就是调用我们的回调函数!我们用 call_function 来搞定这件事,我们通过研究操作码 CALL_FUNCTION 的实现来学习怎么使用 call_function 。

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

/** From: Python/ceval.c **/
TARGET(CALL_FUNCTION) {
PyObject **sp, *res;
/** stack_pointer is a local of the main loop.
It's the pointer to the stacktop of our frame **/
sp = stack_pointer;
res = call_function(&sp, oparg);
/** call_function handles the args it consummed on the stack for us **/
stack_pointer = sp;
PUSH(res);
/** Standard exception handling **/
if (res == NULL)
goto error;
DISPATCH();
}

  有了上面这些信息,我们终于可以捣鼓出一个操作码DEBUG_OP的草稿了:

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

TARGET(DEBUG_OP) {
PyObject *value = NULL;
PyObject *target = NULL;
PyObject *res = NULL;
PyObject **sp = NULL;
PyObject *tmp;
int i;
_Py_IDENTIFIER(op_target);
 
target = _PyDict_GetItemId(f->f_globals, &PyId_op_target);
if (target == NULL && _PyErr_OCCURRED()) {
if (!PyErr_ExceptionMatches(PyExc_KeyError))
goto error;
PyErr_Clear();
DISPATCH();
}
value = PyList_New(0);
Py_INCREF(target);
for (i = 1 ; i <= STACK_LEVEL(); i++) {
tmp = PEEK(i);
if (tmp == NULL)
tmp = Py_None;
PyList_Append(value, tmp);
}
 
PUSH(target);
PUSH(value);
Py_INCREF(f);
PUSH(f);
sp = stack_pointer;
res = call_function(&sp, 2);
stack_pointer = sp;
if (res == NULL)
goto error;
Py_DECREF(res);
DISPATCH();
}

  在编写 CPython 实现的 C 代码方面我确实没有什么经验,有可能我漏掉了些细节。如果您有什么建议还请您纠正,我期待您的反馈。

  编译它,成了!

  一切看起来很顺利,但是当我们尝试去使用我们定义的操作码 DEBUG_OP 的时候却失败了。自从 2008 年之后,Python 使用预先写好的 goto(你也可以从 这里获取更多的讯息)。故,我们需要更新下 goto jump table,我们在 Python/opcode_targets.h 中做如下修改。

  ?

1
2
3
4
5
6
7

/** From: Python/opcode_targets.h **/
/** Easy change since DEBUG_OP is the opcode number 1 **/
static void *opcode_targets[256] = {
//&&_unknown_opcode,
&&TARGET_DEBUG_OP,
&&TARGET_POP_TOP,
/** ... **/

  这就完事了,我们现在就有了一个可以工作的新操作码。唯一的问题就是这货虽然存在,但是没有被人调用过。接下来,我们将DEBUG_OP注入到函数的字节码中。

  在 Python 字节码中注入操作码 DEBUG_OP

  有很多方式可以在 Python 字节码中注入新的操作码:

  使用 peephole optimizer, Quarkslab就是这么干的

  在生成字节码的代码中动些手脚

  在运行时直接修改函数的字节码(这就是我们将要干的事儿)

  为了创造出一个新操作码,有了上面的那一堆 C 代码就够了。现在让我们回到原点,开始理解奇怪甚至神奇的 Python!

  我们将要做的事儿有:

  得到我们想要追踪函数的 code object

  重写字节码来注入 DEBUG_OP

  将新生成的 code object 替换回去

  和 code object 有关的小贴士

  如果你从没听说过 code object,这里有一个简单的介绍网路上也有一些相关的文档可供查阅,可以直接 Ctrl+F 查找 code object

  还有一件事情需要注意的是在这篇文章所指的环境中 code object 是不可变的:

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

Python 3.4.2 (default, Oct 8 2014, 10:45:20)
[GCC 4.9.1] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> x = lambda y : 2
>>> x.__code__
<code object <lambda> at 0x7f481fd88390, file "<stdin>", line 1>
>>> x.__code__.co_name
'<lambda>'
>>> x.__code__.co_name = 'truc'
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: readonly attribute
>>> x.__code__.co_consts = ('truc',)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: readonly attribute

  但是不用担心,我们将会找到方法绕过这个问题的。

  使用的工具

  为了修改字节码我们需要一些工具:

  dis模块用来反编译和分析字节码

  dis.BytecodePython 3.4新增的一个特性,对于反编译和分析字节码特别有用

  一个能够简单修改 code object 的方法

  用 dis.Bytecode 反编译 code object 能告诉我们一些有关操作码、参数和上下文的信息。

  ?

1
2
3
4
5
6
7
8
9

# Python3.4
>>> import dis
>>> f = lambda x: x + 3
>>> for i in dis.Bytecode(f.__code__): print (i)
...
Instruction(opname='LOAD_FAST', opcode=124, arg=0, argval='x', argrepr='x', offset=0, starts_line=1, is_jump_target=False)
Instruction(opname='LOAD_CONST', opcode=100, arg=1, argval=3, argrepr='3', offset=3, starts_line=None, is_jump_target=False)
Instruction(opname='BINARY_ADD', opcode=23, arg=None, argval=None, argrepr='', offset=6, starts_line=None, is_jump_target=False)
Instruction(opname='RETURN_VALUE', opcode=83, arg=None, argval=None, argrepr='', offset=7, starts_line=None, is_jump_target=False)

  为了能够修改 code object,我定义了一个很小的类用来复制 code object,同时能够按我们的需求修改相应的值,然后重新生成一个新的 code object。

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

class MutableCodeObject(object):
args_name = ("co_argcount", "co_kwonlyargcount", "co_nlocals", "co_stacksize", "co_flags", "co_code",
"co_consts", "co_names", "co_varnames", "co_filename", "co_name", "co_firstlineno",
"co_lnotab", "co_freevars", "co_cellvars")
 
def __init__(self, initial_code):
self.initial_code = initial_code
for attr_name in self.args_name:
attr = getattr(self.initial_code, attr_name)
if isinstance(attr, tuple):
attr = list(attr)
setattr(self, attr_name, attr)
 
def get_code(self):
args = []
for attr_name in self.args_name:
attr = getattr(self, attr_name)
if isinstance(attr, list):
attr = tuple(attr)
args.append(attr)
return self.initial_code.__class__(*args)

  这个类用起来很方便,解决了上面提到的 code object 不可变的问题。

  ?

1
2
3
4
5
6
7
8
9
10

>>> x = lambda y : 2
>>> m = MutableCodeObject(x.__code__)
>>> m
<new_code.MutableCodeObject object at 0x7f3f0ea546a0>
>>> m.co_consts
[None, 2]
>>> m.co_consts[1] = '3'
>>> m.co_name = 'truc'
>>> m.get_code()
<code object truc at 0x7f3f0ea2bc90, file "<stdin>", line 1>

  测试我们的新操作码

  我们现在拥有了注入 DEBUG_OP 的所有工具,让我们来验证下我们的实现是否可用。我们将我们的操作码注入到一个最简单的函数中:

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

from new_code import MutableCodeObject
 
def op_target(*args):
print("WOOT")
print("op_target called with args <{0}>".format(args))
 
def nop():
pass
 
new_nop_code = MutableCodeObject(nop.__code__)
new_nop_code.co_code = b"x00" + new_nop_code.co_code[0:3] + b"x00" + new_nop_code.co_code[-1:]
new_nop_code.co_stacksize += 3
 
nop.__code__ = new_nop_code.get_code()
 
import dis
dis.dis(nop)
nop()
 
 
# Don't forget that ./python is our custom Python implementing DEBUG_OP
hakril@computer ~/python/CPython3.5 % ./python proof.py
8 0 <0>
1 LOAD_CONST 0 (None)
4 <0>
5 RETURN_VALUE
WOOT
op_target called with args <([], <frame object at 0x7fde9eaebdb0>)>
WOOT
op_target called with args <([None], <frame object at 0x7fde9eaebdb0>)>

  看起来它成功了!有一行代码需要说明一下 new_nop_code.co_stacksize += 3

  co_stacksize 表示 code object 所需要的堆栈的大小

  操作码DEBUG_OP往堆栈中增加了三项,所以我们需要为这些增加的项预留些空间

  现在我们可以将我们的操作码注入到每一个 Python 函数中了!

  重写字节码

  正如我们在上面的例子中所看到的那样,重写 Pyhton 的字节码似乎 so easy。为了在每一个操作码之间注入我们的操作码,我们需要获取每一个操作码的偏移量,然后将我们的操作码注入到这些位置上(把我们操作码注入到参数上是有坏处大大滴)。这些偏移量也很容易获取,使用 dis.Bytecode,就像这样。

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

def add_debug_op_everywhere(code_obj):
# We get every instruction offset in the code object
offsets = [instr.offset for instr in dis.Bytecode(code_obj)]
# And insert a DEBUG_OP at every offset
return insert_op_debug_list(code_obj, offsets)
 
def insert_op_debug_list(code, offsets):
# We insert the DEBUG_OP one by one
for nb, off in enumerate(sorted(offsets)):
# Need to ajust the offsets by the number of opcodes already inserted before
# That's why we sort our offsets!
code = insert_op_debug(code, off + nb)
return code
 
# Last problem: what does insert_op_debug looks like?

  基于上面的例子,有人可能会想我们的 insert_op_debug 会在指定的偏移量增加一个"x00",这尼玛是个坑啊!我们第一个 DEBUG_OP 注入的例子中被注入的函数是没有任何的分支的,为了能够实现完美一个函数注入函数 insert_op_debug 我们需要考虑到存在分支操作码的情况。

  Python 的分支一共有两种:

  (1) 绝对分支:看起来是类似这样子的 Instruction_Pointer = argument(instruction)

  (2)相对分支:看起来是类似这样子的 Instruction_Pointer += argument(instruction)

  相对分支总是向前的

  我们希望这些分支在我们插入操作码之后仍然能够正常工作,为此我们需要修改一些指令参数。以下是其逻辑流程:

  (1) 对于每一个在插入偏移量之前的相对分支而言

  如果目标地址是严格大于我们的插入偏移量的话,将指令参数增加 1

  如果相等,则不需要增加 1 就能够在跳转操作和目标地址之间执行我们的操作码DEBUG_OP

  如果小于,插入我们的操作码的话并不会影响到跳转操作和目标地址之间的距离

  (2) 对于 code object 中的每一个绝对分支而言

  如果目标地址是严格大于我们的插入偏移量的话,将指令参数增加 1

  如果相等,那么不需要任何修改,理由和相对分支部分是一样的

  如果小于,插入我们的操作码的话并不会影响到跳转操作和目标地址之间的距离

  下面是实现:

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# Helper
def bytecode_to_string(bytecode):
if bytecode.arg is not None:
return struct.pack("<Bh", bytecode.opcode, bytecode.arg)
return struct.pack("<B", bytecode.opcode)
 
# Dummy class for bytecode_to_string
class DummyInstr:
def __init__(self, opcode, arg):
self.opcode = opcode
self.arg = arg
 
def insert_op_debug(code, offset):
opcode_jump_rel = ['FOR_ITER', 'JUMP_FORWARD', 'SETUP_LOOP', 'SETUP_WITH', 'SETUP_EXCEPT', 'SETUP_FINALLY']
opcode_jump_abs = ['POP_JUMP_IF_TRUE', 'POP_JUMP_IF_FALSE', 'JUMP_ABSOLUTE']
res_codestring = b""
inserted = False
for instr in dis.Bytecode(code):
if instr.offset == offset:
res_codestring += b"x00"
inserted = True
if instr.opname in opcode_jump_rel and not inserted: #relative jump are always forward
if offset < instr.offset + 3 + instr.arg: # inserted beetwen jump and dest: add 1 to dest (3 for size)
#If equal: jump on DEBUG_OP to get info before exec instr
res_codestring += bytecode_to_string(DummyInstr(instr.opcode, instr.arg + 1))
continue
if instr.opname in opcode_jump_abs:
if instr.arg > offset:
res_codestring += bytecode_to_string(DummyInstr(instr.opcode, instr.arg + 1))
continue
res_codestring += bytecode_to_string(instr)
# replace_bytecode just replaces the original code co_code
return replace_bytecode(code, res_codestring)

  让我们看一下效果如何:

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

>>> def lol(x):
... for i in range(10):
... if x == i:
... break
 
>>> dis.dis(lol)
101 0 SETUP_LOOP 36 (to 39)
3 LOAD_GLOBAL 0 (range)
6 LOAD_CONST 1 (10)
9 CALL_FUNCTION 1 (1 positional, 0 keyword pair)
12 GET_ITER
>> 13 FOR_ITER 22 (to 38)
16 STORE_FAST 1 (i)
 
102 19 LOAD_FAST 0 (x)
22 LOAD_FAST 1 (i)
25 COMPARE_OP 2 (==)
28 POP_JUMP_IF_FALSE 13
 
103 31 BREAK_LOOP
32 JUMP_ABSOLUTE 13
35 JUMP_ABSOLUTE 13
>> 38 POP_BLOCK
>> 39 LOAD_CONST 0 (None)
42 RETURN_VALUE
>>> lol.__code__ = transform_code(lol.__code__, add_debug_op_everywhere, add_stacksize=3)
 
 
>>> dis.dis(lol)
101 0 <0>
1 SETUP_LOOP 50 (to 54)
4 <0>
5 LOAD_GLOBAL 0 (range)
8 <0>
9 LOAD_CONST 1 (10)
12 <0>
13 CALL_FUNCTION 1 (1 positional, 0 keyword pair)
16 <0>
17 GET_ITER
>> 18 <0>
 
102 19 FOR_ITER 30 (to 52)
22 <0>
23 STORE_FAST 1 (i)
26 <0>
27 LOAD_FAST 0 (x)
30 <0>
 
103 31 LOAD_FAST 1 (i)
34 <0>
35 COMPARE_OP 2 (==)
38 <0>
39 POP_JUMP_IF_FALSE 18
42 <0>
43 BREAK_LOOP
44 <0>
45 JUMP_ABSOLUTE 18
48 <0>
49 JUMP_ABSOLUTE 18
>> 52 <0>
53 POP_BLOCK
>> 54 <0>
55 LOAD_CONST 0 (None)
58 <0>
59 RETURN_VALUE
 
# Setup the simplest handler EVER
>>> def op_target(stack, frame):
... print (stack)
 
# GO
>>> lol(2)
[]
[]
[<class 'range'>]
[10, <class 'range'>]
[range(0, 10)]
[<range_iterator object at 0x7f1349afab80>]
[0, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[0, 2, <range_iterator object at 0x7f1349afab80>]
[False, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[1, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[1, 2, <range_iterator object at 0x7f1349afab80>]
[False, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[2, 2, <range_iterator object at 0x7f1349afab80>]
[True, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[]
[None]

  甚好!现在我们知道了如何获取堆栈信息和 Python 中每一个操作对应的帧信息。上面结果所展示的结果目前而言并不是很实用。在最后一部分中让我们对注入做进一步的封装。

  增加 Python 封装

  正如您所见到的,所有的底层接口都是好用的。我们最后要做的一件事是让 op_target 更加方便使用(这部分相对而言比较空泛一些,毕竟在我看来这不是整个项目中最有趣的部分)。

  首先我们来看一下帧的参数所能提供的信息,如下所示:

  f_code当前帧将执行的 code object

  f_lasti当前的操作(code object 中的字节码字符串的索引)

  经过我们的处理我们可以得知 DEBUG_OP 之后要被执行的操作码,这对我们聚合数据并展示是相当有用的。

  新建一个用于追踪函数内部机制的类:

  改变函数自身的 co_code

  设置回调函数作为 op_debug 的目标函数

  一旦我们知道下一个操作,我们就可以分析它并修改它的参数。举例来说我们可以增加一个 auto-follow-called-functions 的特性。

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

def op_target(l, f, exc=None):
if op_target.callback is not None:
op_target.callback(l, f, exc)
 
class Trace:
def __init__(self, func):
self.func = func
 
def call(self, *args, **kwargs):
self.add_func_to_trace(self.func)
# Activate Trace callback for the func call
op_target.callback = self.callback
try:
res = self.func(*args, **kwargs)
except Exception as e:
res = e
op_target.callback = None
return res
 
def add_func_to_trace(self, f):
# Is it code? is it already transformed?
if not hasattr(f ,"op_debug") and hasattr(f, "__code__"):
f.__code__ = transform_code(f.__code__, transform=add_everywhere, add_stacksize=ADD_STACK)
f.__globals__['op_target'] = op_target
f.op_debug = True
 
def do_auto_follow(self, stack, frame):
# Nothing fancy: FrameAnalyser is just the wrapper that gives the next executed instruction
next_instr = FrameAnalyser(frame).next_instr()
if "CALL" in next_instr.opname:
arg = next_instr.arg
f_index = (arg & 0xff) + (2 * (arg >> 8))
called_func = stack[f_index]
 
# If call target is not traced yet: do it
if not hasattr(called_func, "op_debug"):
self.add_func_to_trace(called_func)

  现在我们实现一个 Trace 的子类,在这个子类中增加 callback 和 doreport 这两个方法。callback 方法将在每一个操作之后被调用。doreport 方法将我们收集到的信息打印出来。

  这是一个伪函数追踪器实现:

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

class DummyTrace(Trace):
def __init__(self, func):
self.func = func
self.data = collections.OrderedDict()
self.last_frame = None
self.known_frame = []
self.report = []
 
def callback(self, stack, frame, exc):
if frame not in self.known_frame:
self.known_frame.append(frame)
self.report.append(" === Entering New Frame {0} ({1}) ===".format(frame.f_code.co_name, id(frame)))
self.last_frame = frame
if frame != self.last_frame:
self.report.append(" === Returning to Frame {0} {1}===".format(frame.f_code.co_name, id(frame)))
self.last_frame = frame
 
self.report.append(str(stack))
instr = FrameAnalyser(frame).next_instr()
offset = str(instr.offset).rjust(8)
opname = str(instr.opname).ljust(20)
arg = str(instr.arg).ljust(10)
self.report.append("{0} {1} {2} {3}".format(offset, opname, arg, instr.argval))
self.do_auto_follow(stack, frame)
 
def do_report(self):
print("n".join(self.report))

  这里有一些实现的例子和使用方法。格式有些不方便观看,毕竟我并不擅长于搞这种对用户友好的报告的事儿。

  例1自动追踪堆栈信息和已经执行的指令

  例2上下文管理

  递推式构造列表(List Comprehensions)的追踪示例。

  例3伪追踪器的输出

  例4输出收集的堆栈信息

  总结

  这个小项目是一个了解 Python 底层的良好途径,包括解释器的 main loop,Python 实现的 C 代码编程、Python 字节码。通过这个小工具我们可以看到 Python 一些有趣构造函数的字节码行为,例如生成器、上下文管理和递推式构造列表。

  这里是这个小项目的完整代码。更进一步的,我们还可以做的是修改我们所追踪的函数的堆栈。我虽然不确定这个是否有用,但是可以肯定是这一过程是相当有趣的。

时间: 2024-10-14 12:31:26

从底层简析Python程序的执行过程的相关文章

从底层简析Python程序的执行过程_Mysql

最近我在学习 Python 的运行模型.我对 Python 的一些内部机制很是好奇,比如 Python 是怎么实现类似 YIELDVALUE.YIELDFROM 这样的操作码的:对于 递推式构造列表(List Comprehensions).生成器表达式(generator expressions)以及其他一些有趣的 Python 特性是怎么编译的:从字节码的层面来看,当异常抛出的时候都发生了什么事情.翻阅 CPython 的代码对于解答这些问题当然是很有帮助的,但我仍然觉得以这样的方式来做的话

Python程序的执行原理

1. 过程概述 Python先把代码(.py文件)编译成字节码,交给字节码虚拟机,然后虚拟机一条一条执行字节码指令,从而完成程序的执行. 2. 字节码 字节码在Python虚拟机程序里对应的是PyCodeObject对象. .pyc文件是字节码在磁盘上的表现形式. 3. pyc文件 PyCodeObject对象的创建时机是模块加载的时候,即import. Python test.py会对test.py进行编译成字节码并解释执行,但是不会生成test.pyc. 如果test.py加载了其他模块,如

初步探究Python程序的执行原理_python

1. 过程概述 Python先把代码(.py文件)编译成字节码,交给字节码虚拟机,然后虚拟机一条一条执行字节码指令,从而完成程序的执行.2. 字节码 字节码在Python虚拟机程序里对应的是PyCodeObject对象. .pyc文件是字节码在磁盘上的表现形式.3. pyc文件 PyCodeObject对象的创建时机是模块加载的时候,即import. Python test.py会对test.py进行编译成字节码并解释执行,但是不会生成test.pyc. 如果test.py加载了其他模块,如im

使用优化器来提升Python程序的执行效率的教程_python

如果不首先想想这句Knuth的名言,就开始进行优化工作是不明智的.可是,你很快写出来加入一些特性的代码,可能会很丑陋,你需要注意了.这篇文章就是为这时候准备的. 那么接下来就是一些很有用的工具和模式来快速优化Python.它的主要目的很简单:尽快发现瓶颈,修复它们并且确认你修复了它们.写一个测试 在你开始优化前,写一个高级测试来证明原来代码很慢.你可能需要采用一些最小值数据集来复现它足够慢.通常一两个显示运行时秒的程序就足够处理一些改进的地方了. 有一些基础测试来保证你的优化没有改变原有代码的行

简析 IOS 程序图标的设计

程序图标主要作用是为了使该程序更加具象及更容易理解,除了上述的作用外,有更好http://www.aliyun.com/zixun/aggregation/8936.html">视觉效果的图标可以提高产品的整体体验和品牌,可引起用户的关注和下载,激发起用户点击的欲望. 表现形态 在有限的空间里表达出相对应的信息,在IOS 程序图标设计中,直观是第一个解决的问题,不应该出现大多繁琐的修饰,当然还要有很好的视觉表现力,使用户可以更容易理解此应用的实际作用,更轻松地辨识此应用.下面来说说几种表现

ASP.NET MVC应用程序的执行过程

基于ASP.NET MVC Web应用程序的请求首先通过一个UrlRoutingModule的对象(HTTP模块).这个模块匹配请求,并且执行路由选择.这个UrlRoutingModule对象选择第一个匹配当前请求的路由对象.如果没有路径匹配,这个UrlRoutingModule什么也不做,让这个请求返回给常规的ASP.NET或者IIS来请求处理. 从这个被选中的Route对象,UrlRoutingModule对象获得IRouteHandler对象(IRouteHandler对象与Route对象

C++语言基础 例程 基于对象的程序的执行过程

贺老师的教学链接  本课讲解 #include <iostream> #include <cstring> using namespace std; class Student { public: void set_data(int n, char *p,char s); void display( ); private: int num; char name[20]; char sex; }; void Student::set_data(int n, char *p,char

Python 程序的运行原理及垃圾回收

1. 简单的例子 先从一个简单的例子说起,包含了两个文件 foo.py 和 demo.py [foo.py]def add(a, b):    return a + b [demo.py]import foo a = [1, 'python']a = 'a string' def func():    a = 1    b = 257    print(a + b) print(a) if __name__ == '__main__':    func()    foo.add(1, 2) 执行

段错误-在ubuntu环境下执行python程序,报错 segment error

问题描述 在ubuntu环境下执行python程序,报错 segment error 请教各位朋友们,应该如何查看出错的代码文件和行数呢,只报错segment error,无法知道程序哪里错了.而且程序本身很长,大概几千行代码,没有错误定位很难调试. 请问如何打印出具体的错误信息呢,或者如何调试呢? 在此,先谢谢各位朋友们了. 解决方案 生成dump文件,然后gdb调试.看堆栈. 不过你python程序还可以对print打印信息来分析. 解决方案二: segment error 往往是指针的问题