CPython源码阅读——Timsort

对Timsort的简介请见Wiki或者Tim本人的小文章,本文主要分析CPython(版本3.7)的实现部分,不对原理做具体介绍。
Timsort的代码位于Objects/listobject.c中,大概从1000行到2000行,占据整个list实现的三分之一。本文把所有的Timsort源码基本上都粘贴过来了,所以也不会短。
这段代码看起来其实很有意思,因为注释得比较详尽,画风稳中带皮,不失为消遣娱乐之佳品。有人说CPython这部分代码很难懂,我实在不能苟同。

准备工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* Lots of code for an adaptive, stable, natural mergesort.  There are many
* pieces to this algorithm; read listsort.txt for overviews and details.
*/

/* A sortslice contains a pointer to an array of keys and a pointer to
* an array of corresponding values. In other words, keys[i]
* corresponds with values[i]. If values == NULL, then the keys are
* also the values.
*
* Several convenience routines are provided here, so that keys and
* values are always moved in sync.
*/

typedef struct {
PyObject **keys;
PyObject **values;
} sortslice;

起手式先搞了一个叫做sortslice的结构,后面用到的非常多。这个结构粗看起来让人摸不着头脑,好好的list哪来的成对的valuekey呢?看到后面会明白这是由Python排序函数的原型决定的。Python的排序有个叫做key的关键字,允许你传入一个函数然后依据这个函数来比较list中元素的大小。在CPython内部实现上如果定义了key那么CPython就会首先依据key这个函数把list中所有元素(称作value)的key都算出来,然后再依据key排序,这就自然出现了成对的keyvalue,两者在排序时要一起移动。所以sortslice是CPython特有的一个排序的基本单元,构成了一个run的基础。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
Py_LOCAL_INLINE(void)
sortslice_copy(sortslice *s1, Py_ssize_t i, sortslice *s2, Py_ssize_t j)
{
s1->keys[i] = s2->keys[j];
if (s1->values != NULL)
s1->values[i] = s2->values[j];
}

Py_LOCAL_INLINE(void)
sortslice_copy_incr(sortslice *dst, sortslice *src)
{
*dst->keys++ = *src->keys++;
if (dst->values != NULL)
*dst->values++ = *src->values++;
}

Py_LOCAL_INLINE(void)
sortslice_copy_decr(sortslice *dst, sortslice *src)
{
*dst->keys-- = *src->keys--;
if (dst->values != NULL)
*dst->values-- = *src->values--;
}


Py_LOCAL_INLINE(void)
sortslice_memcpy(sortslice *s1, Py_ssize_t i, sortslice *s2, Py_ssize_t j,
Py_ssize_t n)
{
memcpy(&s1->keys[i], &s2->keys[j], sizeof(PyObject *) * n);
if (s1->values != NULL)
memcpy(&s1->values[i], &s2->values[j], sizeof(PyObject *) * n);
}

Py_LOCAL_INLINE(void)
sortslice_memmove(sortslice *s1, Py_ssize_t i, sortslice *s2, Py_ssize_t j,
Py_ssize_t n)
{
memmove(&s1->keys[i], &s2->keys[j], sizeof(PyObject *) * n);
if (s1->values != NULL)
memmove(&s1->values[i], &s2->values[j], sizeof(PyObject *) * n);
}

Py_LOCAL_INLINE(void)
sortslice_advance(sortslice *slice, Py_ssize_t n)
{
slice->keys += n;
if (slice->values != NULL)
slice->values += n;
}

随后定义了一大串用来操作sortslice的内联函数,基本上类似于一个“类”了。这些函数简单来说就是优先处理key,然后如果有value再对value做一样的操作。不论调用排序时有没有key这个参数,待排序的元素都在key中,只不过如果有key这个参数原数组的元素被移动到了value里。Py_LOCAL_INLINE的定义依平台而定,目标是实现最快速的局域函数调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/* Comparison function: ms->key_compare, which is set at run-time in
* listsort_impl to optimize for various special cases.
* Returns -1 on error, 1 if x < y, 0 if x >= y.
*/

#define ISLT(X, Y) (*(ms->key_compare))(X, Y, ms)

/* Compare X to Y via "<". Goto "fail" if the comparison raises an
error. Else "k" is set to true iff X<Y, and an "if (k)" block is
started. It makes more sense in context <wink>. X and Y are PyObject*s.
*/
#define IFLT(X, Y) if ((k = ISLT(X, Y)) < 0) goto fail; \
if (k)

/* The maximum number of entries in a MergeState's pending-runs stack.
* This is enough to sort arrays of size up to about
* 32 * phi ** MAX_MERGE_PENDING
* where phi ~= 1.618. 85 is ridiculouslylarge enough, good for an array
* with 2**64 elements.
*/
#define MAX_MERGE_PENDING 85

/* When we get into galloping mode, we stay there until both runs win less
* often than MIN_GALLOP consecutive times. See listsort.txt for more info.
*/
#define MIN_GALLOP 7

/* Avoid malloc for small temp arrays. */
#define MERGESTATE_TEMP_SIZE 256

再然后定义了一些宏。比较大小的宏让人心痛:CPython里要比较元素大小的overhead还是相当大的。MAX_MERGE_PENDINGMIN_GALLOP的注释都做的非常清楚,把数值的选择交代明白了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/* One MergeState exists on the stack per invocation of mergesort.  It's just
* a convenient way to pass state around among the helper functions.
*/
struct s_slice {
sortslice base;
Py_ssize_t len;
};

typedef struct s_MergeState MergeState;
struct s_MergeState {
/* This controls when we get *into* galloping mode. It's initialized
* to MIN_GALLOP. merge_lo and merge_hi tend to nudge it higher for
* random data, and lower for highly structured data.
*/
Py_ssize_t min_gallop;

/* 'a' is temp storage to help with merges. It contains room for
* alloced entries.
*/
sortslice a; /* may point to temparray below */
Py_ssize_t alloced;

/* A stack of n pending runs yet to be merged. Run #i starts at
* address base[i] and extends for len[i] elements. It's always
* true (so long as the indices are in bounds) that
*
* pending[i].base + pending[i].len == pending[i+1].base
*
* so we could cut the storage for this, but it's a minor amount,
* and keeping all the info explicit simplifies the code.
*/
int n;
struct s_slice pending[MAX_MERGE_PENDING];

/* 'a' points to this when possible, rather than muck with malloc. */
PyObject *temparray[MERGESTATE_TEMP_SIZE];

/* This is the function we will use to compare two keys,
* even when none of our special cases apply and we have to use
* safe_object_compare. */
int (*key_compare)(PyObject *, PyObject *, MergeState *);

/* This function is used by unsafe_object_compare to optimize comparisons
* when we know our list is type-homogeneous but we can't assume anything else.
* In the pre-sort check it is set equal to key->ob_type->tp_richcompare */
PyObject *(*key_richcompare)(PyObject *, PyObject *, int);

/* This function is used by unsafe_tuple_compare to compare the first elements
* of tuples. It may be set to safe_object_compare, but the idea is that hopefully
* we can assume more, and use one of the special-case compares. */
int (*tuple_elem_compare)(PyObject *, PyObject *, MergeState *);
};

下面首先利用s_slicesortslice进行了进一步包装,添加了长度字段,形成了一个完整的run的结构。然后定义了一个MergeState,用于包装进行Timsort时的众多metadata,在传参时比较方便。里面有一些对Timsort具有明显关键作用的参数如min_gallop、保存run的栈pending、进行merge时的临时空间temparray以及与之相关的指针等,也有一些为Python的泛型特点准备的函数指针。

二分插入排序

很多基于比较的排序或多或少都依赖强大的插入排序对小的数据片段进行处理,Timsort也不例外,而且使用得相当巧妙。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/* binarysort is the best method for sorting small arrays: it does
few compares, but can do data movement quadratic in the number of
elements.
[lo, hi) is a contiguous slice of a list, and is sorted via
binary insertion. This sort is stable.
On entry, must have lo <= start <= hi, and that [lo, start) is already
sorted (pass start == lo if you don't know!).
If islt() complains return -1, else 0.
Even in case of error, the output slice will be some permutation of
the input (nothing is lost or duplicated).
*/
static int
binarysort(MergeState *ms, sortslice lo, PyObject **hi, PyObject **start)
{
Py_ssize_t k;
PyObject **l, **p, **r;
PyObject *pivot;

assert(lo.keys <= start && start <= hi);
/* assert [lo, start) is sorted */
if (lo.keys == start)
++start;
for (; start < hi; ++start) {
/* set l to where *start belongs */
l = lo.keys;
r = start;
pivot = *r;
/* Invariants:
* pivot >= all in [lo, l).
* pivot < all in [r, start).
* The second is vacuously true at the start.
*/
assert(l < r);
do {
p = l + ((r - l) >> 1);
IFLT(pivot, *p)
r = p;
else
l = p+1;
} while (l < r);
assert(l == r);
/* The invariants still hold, so pivot >= all in [lo, l) and
pivot < all in [l, start), so pivot belongs at l. Note
that if there are elements equal to pivot, l points to the
first slot after them -- that's why this sort is stable.
Slide over to make room.
Caution: using memmove is much slower under MSVC 5;
we're not usually moving many slots. */
for (p = start; p > l; --p)
*p = *(p-1);
*l = pivot;
if (lo.values != NULL) {
Py_ssize_t offset = lo.values - lo.keys;
p = start + offset;
pivot = *p;
l += offset;
for (p = start + offset; p > l; --p)
*p = *(p-1);
*l = pivot;
}
}
return 0;

fail:
return -1;
}

Timsort中的二分插入排序和一般的二分插入排序不一样的地方在于额外多了一些参数,特别是start。我们来看一下这个排序函数的参数都是什么含义。

  1. ms包含了非常多的信息,粗看起来好像在当前函数中没有用到,其实为大小比较提供了进行比较的函数。
  2. lo里面是两个指针,指向当前待排序的keyvalue的地址。
  3. hi是单个指针,指向待排序的key的结束地址。
  4. start指向开始排序的元素的地址。这是Timsort中二分插入排序和其它二分插入排序的最大区别。一般的排序是不需要这个指针的,直接从头开始排就行了。Timsort需要这个指针的原因是Timsort需要把长度不够一个run的单调序列强行拉成一个run长。这样形成的run前半部分是单调的有序的,后半部分是不单调的无序的,那么要把这个run变成有序的就可以把不单调的部分与前面单调的部分一起放到这个二分插入排序里来进行排序,这时我们知道start之前已经是单调的了,因此从start开始排序即可。通过这个二分插入排序函数我们不难想象在数据比较随机的时候Timsort可以很自然地退化为mergesort。

除此之外,在这个二分插入排序最后,我们看到了在排序完毕后将value的值也进行位移、插入的过程。为什么不在对key排序的时候顺便就把在一个结构里的value的一起做了呢?因为数据局部性啊。

山雨欲来风满楼

下面就要进入Timsort最精华最出彩的部分了。首先登场的是count_run函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*
Return the length of the run beginning at lo, in the slice [lo, hi). lo < hi
is required on entry. "A run" is the longest ascending sequence, with

lo[0] <= lo[1] <= lo[2] <= ...

or the longest descending sequence, with

lo[0] > lo[1] > lo[2] > ...

Boolean *descending is set to 0 in the former case, or to 1 in the latter.
For its intended use in a stable mergesort, the strictness of the defn of
"descending" is needed so that the caller can safely reverse a descending
sequence without violating stability (strict > ensures there are no equal
elements to get out of order).

Returns -1 in case of error.
*/
static Py_ssize_t
count_run(MergeState *ms, PyObject **lo, PyObject **hi, int *descending)
{
Py_ssize_t k;
Py_ssize_t n;

assert(lo < hi);
*descending = 0;
++lo;
if (lo == hi)
return 1;

n = 2;
IFLT(*lo, *(lo-1)) {
*descending = 1;
for (lo = lo+1; lo < hi; ++lo, ++n) {
IFLT(*lo, *(lo-1))
;
else
break;
}
}
else {
for (lo = lo+1; lo < hi; ++lo, ++n) {
IFLT(*lo, *(lo-1))
break;
}
}

return n;
fail:
return -1;
}

这个函数从lo规定的地方开始,到hi规定的地方结束,寻找一个单调的序列作为run的前体(长度不够再来凑)。注意这里如果是单调增不要求严格单调,但是单调减要求是严格的。这是因为Timsort会反转单调减的序列把它变成单调增,我们不希望单调减序列中有相同元素使反转操作导致排序不稳定,严格要求单调减时翻转需要处理的元素数目较少也是一个bonus。因此这里的单调/严格单调要求不是随意选择的。

Gallop!

接下来是威名远扬的gallop函数,本质是对待查找元素常常位于有序数组某一端这种情况进行优化的二分查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/*
Locate the proper position of key in a sorted vector; if the vector contains
an element equal to key, return the position immediately to the left of
the leftmost equal element. [gallop_right() does the same except returns
the position to the right of the rightmost equal element (if any).]

"a" is a sorted vector with n elements, starting at a[0]. n must be > 0.

"hint" is an index at which to begin the search, 0 <= hint < n. The closer
hint is to the final result, the faster this runs.

The return value is the int k in 0..n such that

a[k-1] < key <= a[k]

pretending that *(a-1) is minus infinity and a[n] is plus infinity. IOW,
key belongs at index k; or, IOW, the first k elements of a should precede
key, and the last n-k should follow key.

Returns -1 on error. See listsort.txt for info on the method.
*/
static Py_ssize_t
gallop_left(MergeState *ms, PyObject *key, PyObject **a, Py_ssize_t n, Py_ssize_t hint)
{
Py_ssize_t ofs;
Py_ssize_t lastofs;
Py_ssize_t k;

assert(key && a && n > 0 && hint >= 0 && hint < n);

a += hint;
lastofs = 0;
ofs = 1;
IFLT(*a, key) {
/* a[hint] < key -- gallop right, until
* a[hint + lastofs] < key <= a[hint + ofs]
*/
const Py_ssize_t maxofs = n - hint; /* &a[n-1] is highest */
while (ofs < maxofs) {
IFLT(a[ofs], key) {
lastofs = ofs;
ofs = (ofs << 1) + 1;
if (ofs <= 0) /* int overflow */
ofs = maxofs;
}
else /* key <= a[hint + ofs] */
break;
}
if (ofs > maxofs)
ofs = maxofs;
/* Translate back to offsets relative to &a[0]. */
lastofs += hint;
ofs += hint;
}
else {
/* key <= a[hint] -- gallop left, until
* a[hint - ofs] < key <= a[hint - lastofs]
*/
const Py_ssize_t maxofs = hint + 1; /* &a[0] is lowest */
while (ofs < maxofs) {
IFLT(*(a-ofs), key)
break;
/* key <= a[hint - ofs] */
lastofs = ofs;
ofs = (ofs << 1) + 1;
if (ofs <= 0) /* int overflow */
ofs = maxofs;
}
if (ofs > maxofs)
ofs = maxofs;
/* Translate back to positive offsets relative to &a[0]. */
k = lastofs;
lastofs = hint - ofs;
ofs = hint - k;
}
a -= hint;

assert(-1 <= lastofs && lastofs < ofs && ofs <= n);
/* Now a[lastofs] < key <= a[ofs], so key belongs somewhere to the
* right of lastofs but no farther right than ofs. Do a binary
* search, with invariant a[lastofs-1] < key <= a[ofs].
*/
++lastofs;
while (lastofs < ofs) {
Py_ssize_t m = lastofs + ((ofs - lastofs) >> 1);

IFLT(a[m], key)
lastofs = m+1; /* a[m] < key */
else
ofs = m; /* key <= a[m] */
}
assert(lastofs == ofs); /* so a[ofs-1] < key <= a[ofs] */
return ofs;

fail:
return -1;
}

几个参数都比较显然,hint看起来很fancy但在实际当中并没有得到有效应用,其值往往都是端点位置。如果hint的取值多一些,gallop函数就可以看做是针对待查找元素常常位于有序数组某一特定位置(hint)附近的情况进行优化的二分查找。与一般的查找函数不同,这一函数的返回值是带查找元素相对于hint的偏移量,这是为了方便算法中对偏移量的使用而设计的接口。
Gallop函数主要分为两个大块,第一块是一段if-else,通过gallop找到key大概在的区间,然后第二块通过二分查找确定key的位置。if-else段两个逻辑也很类似,只不过一个是往右gallop一个是往左gallop。两段最大的区别在于从相对于hint的offset换算成相对于数组起始地址的offset时逻辑不太一样,因为当向左gallop时需要调换ofslastofs的位置。这段代码首先要夸作者注释做的好,不多但是把最最关键的都点得清清楚楚,读起来好似按摩神经。然后还要夸对于int overflow的细节的处理,这让我一个基本上只写Python的肃然起敬。这里贴上来的代码是gallop_left,那么同理还有gallop_right,不同之处在于有多个与key相同的元素时的处理。gallop_left在有多个相同元素时会返回数组最左侧的元素,相反gallop_right会返回数组最右侧的元素。这是为了实现算法稳定性而做出的区别对待,与“向左查找”或者“向右查找”没有关系。
以上对gallop函数的分析看起来好像很简单,其实gallop函数中的学问还是非常多的,具体的可以看Tim本人写的小文章。这里请读者思考一个问题:为什么在将offset进行翻倍操作时要多加一个1呢?也就是说,为什么会有:

1
ofs = (ofs << 1) + 1;

而不是更简单的:

1
ofs <<= 1;

经本人测试,这个小区别几乎决定了gallop函数是一个有效的改进还是画蛇添足。如果在offset翻倍时采用简单的位移操作,不进行gallop的Timsort会比进行gallop的Timsort整体来说表现更好。这个小问题的答案在Tim的小文章中做了详细的介绍。
下面就要做merge了。

Merge的基础设施

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/* Conceptually a MergeState's constructor. */
static void
merge_init(MergeState *ms, Py_ssize_t list_size, int has_keyfunc)
{
assert(ms != NULL);
if (has_keyfunc) {
/* The temporary space for merging will need at most half the list
* size rounded up. Use the minimum possible space so we can use the
* rest of temparray for other things. In particular, if there is
* enough extra space, listsort() will use it to store the keys.
*/
ms->alloced = (list_size + 1) / 2;

/* ms->alloced describes how many keys will be stored at
ms->temparray, but we also need to store the values. Hence,
ms->alloced is capped at half of MERGESTATE_TEMP_SIZE. */
if (MERGESTATE_TEMP_SIZE / 2 < ms->alloced)
ms->alloced = MERGESTATE_TEMP_SIZE / 2;
ms->a.values = &ms->temparray[ms->alloced];
}
else {
ms->alloced = MERGESTATE_TEMP_SIZE;
ms->a.values = NULL;
}
ms->a.keys = ms->temparray;
ms->n = 0;
ms->min_gallop = MIN_GALLOP;
}

/* Free all the temp memory owned by the MergeState. This must be called
* when you're done with a MergeState, and may be called before then if
* you want to free the temp memory early.
*/
static void
merge_freemem(MergeState *ms)
{
assert(ms != NULL);
if (ms->a.keys != ms->temparray)
PyMem_Free(ms->a.keys);
}

/* Ensure enough temp memory for 'need' array slots is available.
* Returns 0 on success and -1 if the memory can't be gotten.
*/
static int
merge_getmem(MergeState *ms, Py_ssize_t need)
{
int multiplier;

assert(ms != NULL);
if (need <= ms->alloced)
return 0;

multiplier = ms->a.values != NULL ? 2 : 1;

/* Don't realloc! That can cost cycles to copy the old data, but
* we don't care what's in the block.
*/
merge_freemem(ms);
if ((size_t)need > PY_SSIZE_T_MAX / sizeof(PyObject *) / multiplier) {
PyErr_NoMemory();
return -1;
}
ms->a.keys = (PyObject **)PyMem_Malloc(multiplier * need
* sizeof(PyObject *));
if (ms->a.keys != NULL) {
ms->alloced = need;
if (ms->a.values != NULL)
ms->a.values = &ms->a.keys[need];
return 0;
}
PyErr_NoMemory();
return -1;
}
#define MERGE_GETMEM(MS, NEED) ((NEED) <= (MS)->alloced ? 0 : \
merge_getmem(MS, NEED))

首先是一个类似MergeState构造函数的函数,不难发现这个函数并不是严格的构造函数,有许多变量都没处理,调用的时候其实也只被调用了一次,所以个人感觉这个函数意义并不是很大。之后是对MergeState维护的临时空间内存进行管理的函数。在merge_getmem里有一个貌似很奇怪的multiplier,这是由于当同时存在keyvalue时,所需要的空间两倍于只有key时的空间。最后定义的MERGE_GETMEM看起来和merge_getmem中实现的功能完全重叠,不知为何,等CPython开启issue以后一定要问一下。

merge-阴阳太极

为什么叫“阴阳太极”呢?因为有关merge的代码包含了一些的对偶的部分,而这些对偶的部分又相互依赖相互补充。比如下面这个针对na <= nb时的merge_lo函数,需要在gallop mode和正常mode里面跳来跳去。这样的逻辑使代码相对复杂,不过从这里我们看到了C中goto的巧妙应用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/* Merge the na elements starting at ssa with the nb elements starting at
* ssb.keys = ssa.keys + na in a stable way, in-place. na and nb must be > 0.
* Must also have that ssa.keys[na-1] belongs at the end of the merge, and
* should have na <= nb. See listsort.txt for more info. Return 0 if
* successful, -1 if error.
*/
static Py_ssize_t
merge_lo(MergeState *ms, sortslice ssa, Py_ssize_t na,
sortslice ssb, Py_ssize_t nb)
{
Py_ssize_t k;
sortslice dest;
int result = -1; /* guilty until proved innocent */
Py_ssize_t min_gallop;

assert(ms && ssa.keys && ssb.keys && na > 0 && nb > 0);
assert(ssa.keys + na == ssb.keys);
if (MERGE_GETMEM(ms, na) < 0)
return -1;
sortslice_memcpy(&ms->a, 0, &ssa, 0, na);
dest = ssa;
ssa = ms->a;

sortslice_copy_incr(&dest, &ssb);
--nb;
if (nb == 0)
goto Succeed;
if (na == 1)
goto CopyB;

min_gallop = ms->min_gallop;
for (;;) {
Py_ssize_t acount = 0; /* # of times A won in a row */
Py_ssize_t bcount = 0; /* # of times B won in a row */

/* Do the straightforward thing until (if ever) one run
* appears to win consistently.
*/
for (;;) {
assert(na > 1 && nb > 0);
k = ISLT(ssb.keys[0], ssa.keys[0]);
if (k) {
if (k < 0)
goto Fail;
sortslice_copy_incr(&dest, &ssb);
++bcount;
acount = 0;
--nb;
if (nb == 0)
goto Succeed;
if (bcount >= min_gallop)
break;
}
else {
sortslice_copy_incr(&dest, &ssa);
++acount;
bcount = 0;
--na;
if (na == 1)
goto CopyB;
if (acount >= min_gallop)
break;
}
}

/* One run is winning so consistently that galloping may
* be a huge win. So try that, and continue galloping until
* (if ever) neither run appears to be winning consistently
* anymore.
*/
++min_gallop;
do {
assert(na > 1 && nb > 0);
min_gallop -= min_gallop > 1;
ms->min_gallop = min_gallop;
k = gallop_right(ms, ssb.keys[0], ssa.keys, na, 0);
acount = k;
if (k) {
if (k < 0)
goto Fail;
sortslice_memcpy(&dest, 0, &ssa, 0, k);
sortslice_advance(&dest, k);
sortslice_advance(&ssa, k);
na -= k;
if (na == 1)
goto CopyB;
/* na==0 is impossible now if the comparison
* function is consistent, but we can't assume
* that it is.
*/
if (na == 0)
goto Succeed;
}
sortslice_copy_incr(&dest, &ssb);
--nb;
if (nb == 0)
goto Succeed;

k = gallop_left(ms, ssa.keys[0], ssb.keys, nb, 0);
bcount = k;
if (k) {
if (k < 0)
goto Fail;
sortslice_memmove(&dest, 0, &ssb, 0, k);
sortslice_advance(&dest, k);
sortslice_advance(&ssb, k);
nb -= k;
if (nb == 0)
goto Succeed;
}
sortslice_copy_incr(&dest, &ssa);
--na;
if (na == 1)
goto CopyB;
} while (acount >= MIN_GALLOP || bcount >= MIN_GALLOP);
++min_gallop; /* penalize it for leaving galloping mode */
ms->min_gallop = min_gallop;
}
Succeed:
result = 0;
Fail:
if (na)
sortslice_memcpy(&dest, 0, &ssa, 0, na);
return result;
CopyB:
assert(na == 1 && nb > 0);
/* The last element of ssa belongs at the end of the merge. */
sortslice_memmove(&dest, 0, &ssb, 0, nb);
sortslice_copy(&dest, nb, &ssa, 0);
return 0;
}

这个函数一开始的预处理主要是把较小的run拷贝到临时空间里去,最后的后处理也是想尽量利用memmove或者memcpy快速处理merge的尾巴。主要部分包含两个嵌套的循环,外层循环没有跳出条件,只能靠goto强行改变指令地址。如果使用其它语言想把这个功能写好恐怕不太容易,少不了一些奇奇怪怪的flag。内层循环有两个,第一个进行正常的merge,同时悄悄在小本本上记一些东西,最后要么成功要么切换模式;第二个进行gallop。gallop中好像进行了两次gallop,实际相当于是一个if-else,分别对应于不同数组上的gallop。在两次gallop中,除了对由gallop得到的大数组进行memmove或者memcpy以外,还会从没有gallop的数组中拷贝一个元素,因为这个元素正是在gallop中作为key比较的元素,在归并时根据定义恰好应该位于gallop得到的大数组之后。从中可以看出CPython这种一切以减少元素比较为中心的优化方式到了多么丧心病狂的地步。
依靠外层循环,算法得以在两种模式间不断切换。两个模式中间有个++min_gallop让人困惑——从正常模式切换到gallop难道不应该减小min_gallop吗?其实这里要加的原因是后面一进入gallop循环就要大力减。
除了merge_lo自然也有merge_hi,这一对merge是一般的归并排序没有的,因为一般的归并排序两段子数组长度类似,通常的也是符合直觉的做法是拷贝走第一个数组,这样可以从左往右扫归并。而在Timsort中可能存在第一个数组的长度比第二个数组要长的情况。为了节省空间、时间,要拷贝走第二个数组。这时再在第一个数组上进行归并可能就会覆盖第一个数组上的数据,只能在第二个数组上倒着来归并。这就导致merge_lomerge_hi的实现有本质不同。但它们两个的基本精神还是一致的,因此这里不再赘述merge_hi了,直接来看merge_lomerge_hi的调用者merge_at函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/* Merge the two runs at stack indices i and i+1.
* Returns 0 on success, -1 on error.
*/
static Py_ssize_t
merge_at(MergeState *ms, Py_ssize_t i)
{
sortslice ssa, ssb;
Py_ssize_t na, nb;
Py_ssize_t k;

assert(ms != NULL);
assert(ms->n >= 2);
assert(i >= 0);
assert(i == ms->n - 2 || i == ms->n - 3);

ssa = ms->pending[i].base;
na = ms->pending[i].len;
ssb = ms->pending[i+1].base;
nb = ms->pending[i+1].len;
assert(na > 0 && nb > 0);
assert(ssa.keys + na == ssb.keys);

/* Record the length of the combined runs; if i is the 3rd-last
* run now, also slide over the last run (which isn't involved
* in this merge). The current run i+1 goes away in any case.
*/
ms->pending[i].len = na + nb;
if (i == ms->n - 3)
ms->pending[i+1] = ms->pending[i+2];
--ms->n;

/* Where does b start in a? Elements in a before that can be
* ignored (already in place).
*/
k = gallop_right(ms, *ssb.keys, ssa.keys, na, 0);
if (k < 0)
return -1;
sortslice_advance(&ssa, k);
na -= k;
if (na == 0)
return 0;

/* Where does a end in b? Elements in b after that can be
* ignored (already in place).
*/
nb = gallop_left(ms, ssa.keys[na-1], ssb.keys, nb, nb-1);
if (nb <= 0)
return nb;

/* Merge what remains of the runs, using a temp array with
* min(na, nb) elements.
*/
if (na <= nb)
return merge_lo(ms, ssa, na, ssb, nb);
else
return merge_hi(ms, ssa, na, ssb, nb);
}

merge_at函数实现了一个很重要的逻辑,就是把两个相邻的run之间相对排序结果已经排好序的部分剔除掉,把没有排好序的部分放到merge_lo或者merge_hi里进行merge,这又是一个针对大体有序的数组的优化。时间复杂度上,这一逻辑减少了比较和拷贝的次数;空间复杂度上,这一逻辑也减少了在进行merge时需要的辅助空间。把底层设施都写好以后,这个函数看起来十分畅快。先通过gallop确定已经排好序的数组的边界,然后依据数组大小merge_lo或者merge_hi即可。

Collapse-多米诺骨牌

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/* Examine the stack of runs waiting to be merged, merging adjacent runs
* until the stack invariants are re-established:
*
* 1. len[-3] > len[-2] + len[-1]
* 2. len[-2] > len[-1]
*
* See listsort.txt for more info.
*
* Returns 0 on success, -1 on error.
*/
static int
merge_collapse(MergeState *ms)
{
struct s_slice *p = ms->pending;

assert(ms);
while (ms->n > 1) {
Py_ssize_t n = ms->n - 2;
if ((n > 0 && p[n-1].len <= p[n].len + p[n+1].len) ||
(n > 1 && p[n-2].len <= p[n-1].len + p[n].len)) {
if (p[n-1].len < p[n+1].len)
--n;
if (merge_at(ms, n) < 0)
return -1;
}
else if (p[n].len <= p[n+1].len) {
if (merge_at(ms, n) < 0)
return -1;
}
else
break;
}
return 0;
}

/* Regardless of invariants, merge all runs on the stack until only one
* remains. This is used at the end of the mergesort.
*
* Returns 0 on success, -1 on error.
*/
static int
merge_force_collapse(MergeState *ms)
{
struct s_slice *p = ms->pending;

assert(ms);
while (ms->n > 1) {
Py_ssize_t n = ms->n - 2;
if (n > 0 && p[n-1].len < p[n+1].len)
--n;
if (merge_at(ms, n) < 0)
return -1;
}
return 0;
}

这代码也看得爽啊,原来费劲心机写了那么多函数,现在通通拿来把栈中的run给collapse掉完成排序。注意merge_collapse原来是有bug的,不能只比较栈顶前三个元素是否满足collapse条件,还需要检查第四个。见这个commit

落幕

至此Timsort的精华部分基本就结束了,以下更多的都是一些简单的问题或者是工程问题。首先是一些简单的工具函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* Compute a good value for the minimum run length; natural runs shorter
* than this are boosted artificially via binary insertion.
*
* If n < 64, return n (it's too small to bother with fancy stuff).
* Else if n is an exact power of 2, return 32.
* Else return an int k, 32 <= k <= 64, such that n/k is close to, but
* strictly less than, an exact power of 2.
*
* See listsort.txt for more info.
*/
static Py_ssize_t
merge_compute_minrun(Py_ssize_t n)
{
Py_ssize_t r = 0; /* becomes 1 if any 1 bits are shifted off */

assert(n >= 0);
while (n >= 64) {
r |= n & 1;
n >>= 1;
}
return n + r;
}

static void
reverse_sortslice(sortslice *s, Py_ssize_t n)
{
reverse_slice(s->keys, &s->keys[n]);
if (s->values != NULL)
reverse_slice(s->values, &s->values[n]);
}

关于minrun的注释非常清晰。接下来的代码包含了许多比较函数,不是本文重点,我们直接进入sort函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
/* An adaptive, stable, natural mergesort.  See listsort.txt.
* Returns Py_None on success, NULL on error. Even in case of error, the
* list will be some permutation of its input state (nothing is lost or
* duplicated).
*/
/*[clinic input]
list.sort

*
key as keyfunc: object = None
reverse: bool(accept={int}) = False

Stable sort *IN PLACE*.
[clinic start generated code]*/

static PyObject *
list_sort_impl(PyListObject *self, PyObject *keyfunc, int reverse)
/*[clinic end generated code: output=57b9f9c5e23fbe42 input=b0fcf743982c5b90]*/
{
MergeState ms;
Py_ssize_t nremaining;
Py_ssize_t minrun;
sortslice lo;
Py_ssize_t saved_ob_size, saved_allocated;
PyObject **saved_ob_item;
PyObject **final_ob_item;
PyObject *result = NULL; /* guilty until proved innocent */
Py_ssize_t i;
PyObject **keys;

assert(self != NULL);
assert(PyList_Check(self));
if (keyfunc == Py_None)
keyfunc = NULL;

/* The list is temporarily made empty, so that mutations performed
* by comparison functions can't affect the slice of memory we're
* sorting (allowing mutations during sorting is a core-dump
* factory, since ob_item may change).
*/
saved_ob_size = Py_SIZE(self);
saved_ob_item = self->ob_item;
saved_allocated = self->allocated;
Py_SIZE(self) = 0;
self->ob_item = NULL;
self->allocated = -1; /* any operation will reset it to >= 0 */

if (keyfunc == NULL) {
keys = NULL;
lo.keys = saved_ob_item;
lo.values = NULL;
}
else {
if (saved_ob_size < MERGESTATE_TEMP_SIZE/2)
/* Leverage stack space we allocated but won't otherwise use */
keys = &ms.temparray[saved_ob_size+1];
else {
keys = PyMem_MALLOC(sizeof(PyObject *) * saved_ob_size);
if (keys == NULL) {
PyErr_NoMemory();
goto keyfunc_fail;
}
}

for (i = 0; i < saved_ob_size ; i++) {
keys[i] = PyObject_CallFunctionObjArgs(keyfunc, saved_ob_item[i],
NULL);
if (keys[i] == NULL) {
for (i=i-1 ; i>=0 ; i--)
Py_DECREF(keys[i]);
if (saved_ob_size >= MERGESTATE_TEMP_SIZE/2)
PyMem_FREE(keys);
goto keyfunc_fail;
}
}

lo.keys = keys;
lo.values = saved_ob_item;
}


/* The pre-sort check: here's where we decide which compare function to use.
* How much optimization is safe? We test for homogeneity with respect to
* several properties that are expensive to check at compare-time, and
* set ms appropriately. */
if (saved_ob_size > 1) {
@@@@@@ WEITANGLI: OMMITED @@@@@@
}
/* End of pre-sort check: ms is now set properly! */

merge_init(&ms, saved_ob_size, keys != NULL);

nremaining = saved_ob_size;
if (nremaining < 2)
goto succeed;

/* Reverse sort stability achieved by initially reversing the list,
applying a stable forward sort, then reversing the final result. */
if (reverse) {
if (keys != NULL)
reverse_slice(&keys[0], &keys[saved_ob_size]);
reverse_slice(&saved_ob_item[0], &saved_ob_item[saved_ob_size]);
}

/* March over the array once, left to right, finding natural runs,
* and extending short natural runs to minrun elements.
*/
minrun = merge_compute_minrun(nremaining);
do {
int descending;
Py_ssize_t n;

/* Identify next run. */
n = count_run(&ms, lo.keys, lo.keys + nremaining, &descending);
if (n < 0)
goto fail;
if (descending)
reverse_sortslice(&lo, n);
/* If short, extend to min(minrun, nremaining). */
if (n < minrun) {
const Py_ssize_t force = nremaining <= minrun ?
nremaining : minrun;
if (binarysort(&ms, lo, lo.keys + force, lo.keys + n) < 0)
goto fail;
n = force;
}
/* Push run onto pending-runs stack, and maybe merge. */
assert(ms.n < MAX_MERGE_PENDING);
ms.pending[ms.n].base = lo;
ms.pending[ms.n].len = n;
++ms.n;
if (merge_collapse(&ms) < 0)
goto fail;
/* Advance to find next run. */
sortslice_advance(&lo, n);
nremaining -= n;
} while (nremaining);

if (merge_force_collapse(&ms) < 0)
goto fail;
assert(ms.n == 1);
assert(keys == NULL
? ms.pending[0].base.keys == saved_ob_item
: ms.pending[0].base.keys == &keys[0]);
assert(ms.pending[0].len == saved_ob_size);
lo = ms.pending[0].base;

succeed:
result = Py_None;
fail:
if (keys != NULL) {
for (i = 0; i < saved_ob_size; i++)
Py_DECREF(keys[i]);
if (saved_ob_size >= MERGESTATE_TEMP_SIZE/2)
PyMem_FREE(keys);
}

if (self->allocated != -1 && result != NULL) {
/* The user mucked with the list during the sort,
* and we don't already have another error to report.
*/
PyErr_SetString(PyExc_ValueError, "list modified during sort");
result = NULL;
}

if (reverse && saved_ob_size > 1)
reverse_slice(saved_ob_item, saved_ob_item + saved_ob_size);

merge_freemem(&ms);

keyfunc_fail:
final_ob_item = self->ob_item;
i = Py_SIZE(self);
Py_SIZE(self) = saved_ob_size;
self->ob_item = saved_ob_item;
self->allocated = saved_allocated;
if (final_ob_item != NULL) {
/* we cannot use _list_clear() for this because it does not
guarantee that the list is really empty when it returns */
while (--i >= 0) {
Py_XDECREF(final_ob_item[i]);
}
PyMem_FREE(final_ob_item);
}
Py_XINCREF(result);
return result;
}

这个函数看了也是让人痛心疾首,怎么sort的overhead会这么大。这个函数首先把原先的list置为空来防止在排序中有人改变item的值,然后检查是否调用者指定了key_funckey,如果有则要按之前介绍的那样先把key计算出来,把list里原来存的东西当做value,这里因为相当于凭空多了一个数组所以要进行内存的分配。之后进行的比较函数的选择我们略去。再然后是对reverse这个参数的处理,即实现从大到小排序。粗看起来好像把结果逆序就行了,其实不然,因为我们要的是一个稳定的算法。所以这里要先把未排序数组逆序,然后在排序完成后再进行逆序才可以。
之后才正式进入排序,熟悉Timsort的话这一段就很好懂,无非就是一个run一个run的进栈,然后不断collapse就好了。

结语

综合来看,Timsort其实也算不上多么艰深复杂的算法——基本上是在mergesort的基础上针对有序数组做了一些优化。然而Tim毕竟是第一个把鸡蛋立起来的人,而且其中很多细节处理其实并不容易。
Timsort优化主要基础是依据数组的有序片段划分mergesort中各个子merge,把有序的放在一个run里。这样做可能带来一些问题,比如怎样快速选取下一对run进行merge来保证每次merge的两组run长度大体相似(如果相差悬殊则效率很低退化至插入排序),但Tim通过精巧地设定初始run的最小长度和一个栈解决了。
关于gallop的优化最根本的出发点是减少对数组中元素比较的次数——正如我们看到的,对于Python来说对元素进行比较是很昂贵的操作。因此,如果比较的成本并不高的话Timsort中关于gallop的部分其实意义不是非常大。然而在实际工作中,对数组元素进行比较成本高昂的情况其实是广泛存在的,最简单直接的例子就是字符串的排序,因此gallop模式的应用场景仍然广阔。
有关gallop的优化对于一般的mergesort理论上也适用,但是mergesort可能通过粗暴的划分方式(中点)把数据原本的有序性破坏掉了,因此效果估计会比Timsort当中差。
此外,将gallop作为二分查找的变种很值得借鉴。