数据结构

数组

概述

数组是由相同类型元素的集合组成的数据结构,计算机会为数组分配一块连续的内存来保存其中的元素,我们可以利用数组中元素的索引快速访问特定元素,常见的数组大多都是一维的线性数组,而多维数组在数值和图形计算领域却有比较常见的应用

定义与初始化

Go 语言的数组有两种不同的创建方式,一种是显式的指定数组大小,另一种是使用 [...]T 声明数组,Go 语言会在编译期间通过源代码推导数组的大小

arr1 := [3]int{1, 2, 3}
arr2 := [...]int{1, 2, 3}

上述两种声明方式在运行期间得到的结果是完全相同的,后一种声明方式在编译期间就会被转换成前一种,这也就是编译器对数组大小的推导

语句转换

对于一个由字面量组成的数组,根据数组元素数量的不同,编译器会在负责初始化字面量的 cmd/compile/internal/gc.anylit函数中做两种不同的优化:

  1. 当元素数量小于或者等于 4 个时,会直接将数组中的元素放置在栈上;
  2. 当元素数量大于 4 个时,会将数组中的元素放置到静态区并在运行时取出;
func anylit(n *Node, var_ *Node, init *Nodes) {
t := n.Type
switch n.Op {
case OSTRUCTLIT, OARRAYLIT:
if n.List.Len() > 4 {
...
}

fixedlit(inInitFunction, initKindLocalCode, n, var_, init)
...
}
}

当数组的元素小于或者等于四个时,cmd/compile/internal/gc.fixedlit会负责在函数编译之前将 [3]{1, 2, 3} 转换成更加原始的语句:

func fixedlit(ctxt initContext, kind initKind, n *Node, var_ *Node, init *Nodes) {
var splitnode func(*Node) (a *Node, value *Node)
...

for _, r := range n.List.Slice() {
a, value := splitnode(r)
a = nod(OAS, a, value)
a = typecheck(a, ctxStmt)
switch kind {
case initKindStatic:
genAsStatic(a)
case initKindLocalCode:
a = orderStmtInPlace(a, map[string][]*Node{})
a = walkstmt(a)
init.Append(a)
}
}
}

当数组中元素的个数小于或者等于四个并且 cmd/compile/internal/gc.fixedlit函数接收的 kindinitKindLocalCode 时,上述代码会将原有的初始化语句 [3]int{1, 2, 3} 拆分成一个声明变量的表达式和几个赋值表达式,这些表达式会完成对数组的初始化:

var arr [3]int
arr[0] = 1
arr[1] = 2
arr[2] = 3

但是如果当前数组的元素大于四个,cmd/compile/internal/gc.anylit函数在静态存储区初始化数组中的元素并将临时变量赋值给数组:

func anylit(n *Node, var_ *Node, init *Nodes) {
t := n.Type
switch n.Op {
case OSTRUCTLIT, OARRAYLIT:
if n.List.Len() > 4 {
vstat := staticname(t)
vstat.Name.SetReadonly(true)

fixedlit(inNonInitFunction, initKindStatic, n, vstat, init)

a := nod(OAS, var_, vstat)
a = typecheck(a, ctxStmt)
a = walkexpr(a, init)
init.Append(a)
break
}

...
}
}

总结起来,在不考虑逃逸分析的情况下,如果数组中元素的个数小于或者等于 4 个,那么所有的变量会直接在栈上初始化,如果数组元素大于 4 个,变量就会在静态存储区初始化然后拷贝到栈上,这些转换后的代码才会继续进入中间代码生成机器码生成两个阶段,最后生成可以执行的二进制文件。

访问和赋值

无论是在栈上还是静态存储区,数组在内存中都是一连串的内存空间,我们通过指向数组开头的指针、元素的数量以及元素类型占的空间大小表示数组。如果我们不知道数组中元素的数量,访问时可能发生越界;而如果不知道数组中元素类型的大小,就没有办法知道应该一次取出多少字节的数据,无论丢失了哪个信息,我们都无法知道这片连续的内存空间到底存储了什么数据

数组访问越界是非常严重的错误,Go 语言中可以在编译期间的静态类型检查判断数组越界,cmd/compile/internal/gc.typecheck1

func typecheck1(n *Node, top int) (res *Node) {
switch n.Op {
case OINDEX:
ok |= ctxExpr
l := n.Left // array
r := n.Right // index
switch n.Left.Type.Etype {
case TSTRING, TARRAY, TSLICE:
...
if n.Right.Type != nil && !n.Right.Type.IsInteger() {
yyerror("non-integer array index %v", n.Right)
break
}
if !n.Bounded() && Isconst(n.Right, CTINT) {
x := n.Right.Int64()
if x < 0 {
yyerror("invalid array index %v (index must be non-negative)", n.Right)
} else if n.Left.Type.IsArray() && x >= n.Left.Type.NumElem() {
yyerror("invalid array index %v (out of bounds for %d-element array)", n.Right, n.Left.Type.NumElem())
}
}
}
...
}
}
  1. 访问数组的索引是非整数时,报错 “non-integer array index %v”;
  2. 访问数组的索引是负数时,报错 “invalid array index %v (index must be non-negative)”;
  3. 访问数组的索引越界时,报错 “invalid array index %v (out of bounds for %d-element array)”;

数组和字符串的一些简单越界错误都会在编译期间发现,例如:直接使用整数或者常量访问数组;但是如果使用变量去访问数组或者字符串时,编译器就无法提前发现错误,我们需要 Go 语言运行时阻止不合法的访问:

arr[4]: invalid array index 4 (out of bounds for 3-element array)
arr[i]: panic: runtime error: index out of range [4] with length 3

Go 语言运行时在发现数组、切片和字符串的越界操作会由运行时的 runtime.panicIndex 触发程序的运行时错误并导致崩溃退出:

TEXT runtime·panicIndex(SB),NOSPLIT,$0-8
MOVL AX, x+0(FP)
MOVL CX, y+4(FP)
JMP runtime·goPanicIndex(SB)

func goPanicIndex(x int, y int) {
panicCheck1(getcallerpc(), "index out of range")
panic(boundsError{x: int64(x), signed: true, y: y, code: boundsIndex})
}

当数组的访问操作 OINDEX 成功通过编译器的检查后,会被转换成几个 SSA 指令,假设我们有如下所示的 Go 语言代码,通过如下的方式进行编译会得到 ssa.html 文件:

package check

func outOfRange() int {
arr := [3]int{1, 2, 3}
i := 4
elem := arr[i]
return elem
}

$ GOSSAFUNC=outOfRange go build array.go
dumped SSA to ./ssa.html

start 阶段生成的 SSA 代码就是优化之前的第一版中间代码,下面展示的部分是 elem := arr[i] 对应的中间代码,在这段中间代码中我们发现 Go 语言为数组的访问操作生成了判断数组上限的指令 IsInBounds 以及当条件不满足时触发程序崩溃的 PanicBounds 指令:

b1:
...
v22 (6) = LocalAddr <*[3]int> {arr} v2 v20
v23 (6) = IsInBounds <bool> v21 v11
If v23 → b2 b3 (likely) (6)

b2: ← b1-
v26 (6) = PtrIndex <*int> v22 v21
v27 (6) = Copy <mem> v20
v28 (6) = Load <int> v26 v27 (elem[int])
...
Ret v30 (+7)

b3: ← b1-
v24 (6) = Copy <mem> v20
v25 (6) = PanicBounds <mem> [0] v21 v11 v24
Exit v25 (6)

编译器会将 PanicBounds 指令转换成上面提到的 runtime.panicIndex 函数,当数组下标没有越界时,编译器会先获取数组的内存地址和访问的下标、利用 PtrIndex 计算出目标元素的地址,最后使用 Load 操作将指针中的元素加载到内存中。

当然只有当编译器无法对数组下标是否越界无法做出判断时才会加入 PanicBounds 指令交给运行时进行判断,在使用字面量整数访问数组下标时会生成非常简单的中间代码,当我们将上述代码中的 arr[i] 改成 arr[2] 时,就会得到如下所示的代码:

b1:
...
v21 (5) = LocalAddr <*[3]int> {arr} v2 v20
v22 (5) = PtrIndex <*int> v21 v14
v23 (5) = Load <int> v22 v20 (elem[int])
...

Go 语言对于数组的访问还是有着比较多的检查的,它不仅会在编译期间提前发现一些简单的越界错误并插入用于检测数组上限的函数调用,还会在运行期间通过插入的函数保证不会发生越界。

切片

概述

数组在 Go 语言中没那么常用,更常用的数据结构是切片,即动态数组,其长度并不固定,我们可以向切片中追加元素,它会在容量不足时自动扩容。

在 Go 语言中,切片类型的声明方式与数组有一些相似,不过由于切片的长度是动态的,所以声明时只需要指定切片中的元素类型:

[]int
[]interface{}

从切片的定义我们能推测出,切片在编译期间的生成的类型只会包含切片中的元素类型,即 int 或者 interface{} 等。cmd/compile/internal/types.NewSlice就是编译期间用于创建切片类型的函数:

func NewSlice(elem *Type) *Type {
if t := elem.Cache.slice; t != nil {
if t.Elem() != elem {
Fatalf("elem mismatch")
}
return t
}

t := New(TSLICE)
t.Extra = Slice{Elem: elem}
elem.Cache.slice = t
return t
}

上述方法返回结构体中的 Extra 字段是一个只包含切片内元素类型的结构,也就是说切片内元素的类型都是在编译期间确定的,编译器确定了类型之后,会将类型存储在 Extra 字段中帮助程序在运行时动态获取。

编译期间的切片是 cmd/compile/internal/types.Slice类型的,但是在运行时切片可以由如下的 reflect.SliceHeader

  • Data 是指向数组的指针;
  • Len 是当前切片的长度;
  • Cap 是当前切片的容量,即 Data 数组的大小:
type SliceHeader struct {
Data uintptr
Len int
Cap int
}

Data 是一片连续的内存空间,这片内存空间可以用于存储切片中的全部元素,数组中的元素只是逻辑上的概念,底层存储其实都是连续的,所以我们可以将切片理解成一片连续的内存空间加上长度与容量的标识。

从上图中,我们会发现切片与数组的关系非常密切,切片引入了一个抽象层,提供了对数组中部分连续片段的引用,而作为数组的引用,我们可以在运行区间可以修改它的长度和范围。当切片底层的数组长度不足时就会触发扩容,切片指向的数组可能会发生变化,不过在上层看来切片是没有变化的,上层只需要与切片打交道不需要关心数组的变化。

定义与初始化

Go 语言中包含三种初始化切片的方式:

  1. 通过下标的方式获得数组或者切片的一部分;
  2. 使用字面量初始化新的切片;
  3. 使用关键字 make 创建切片:
arr[0:3] or slice[0:3]
slice := []int{1, 2, 3}
slice := make([]int, 10)

使用下标

使用下标创建切片是最原始也最接近汇编语言的方式,它是所有方法中最为底层的一种,编译器会将 arr[0:3] 或者 slice[0:3] 等语句转换成 OpSliceMake 操作,我们可以通过下面的代码来验证一下:

package opslicemake

func newSlice() []int {
arr := [3]int{1, 2, 3}
slice := arr[0:1]
return slice
}

通过 GOSSAFUNC 变量编译上述代码可以得到一系列 SSA 中间代码,其中 slice := arr[0:1] 语句在 “decompose builtin” 阶段对应的代码如下所示:

v27 (+5) = SliceMake <[]int> v11 v14 v17

name &arr[*[3]int]: v11
name slice.ptr[*int]: v11
name slice.len[int]: v14
name slice.cap[int]: v17

SliceMake 操作会接受四个参数创建新的切片,元素类型、数组指针、切片大小和容量,需要注意的是使用下标初始化切片不会拷贝原数组或者原切片中的数据,它只会创建一个指向原数组的切片结构体,所以修改新切片的数据也会修改原切片。

字面量

当我们使用字面量 []int{1, 2, 3} 创建新的切片时,cmd/compile/internal/gc.slicelit函数会在编译期间将它展开成如下所示的代码片段:

var vstat [3]int
vstat[0] = 1
vstat[1] = 2
vstat[2] = 3
var vauto *[3]int = new([3]int)
*vauto = vstat
slice := vauto[:]
  1. 根据切片中的元素数量对底层数组的大小进行推断并创建一个数组;
  2. 将这些字面量元素存储到初始化的数组中;
  3. 创建一个同样指向 [3]int 类型的数组指针;
  4. 将静态存储区的数组 vstat 赋值给 vauto 指针所在的地址;
  5. 通过 [:] 操作获取一个底层使用 vauto 的切片;

第 5 步中的 [:] 就是使用下标创建切片的方法,从这一点我们也能看出 [:] 操作是创建切片最底层的一种方法。

关键字

如果使用字面量的方式创建切片,大部分的工作都会在编译期间完成。但是当我们使用 make 关键字创建切片时,很多工作都需要运行时的参与;调用方必须向 make 函数传入切片的大小以及可选的容量,类型检查期间的 cmd/compile/internal/gc.typecheck1

上述函数不仅会检查 len 是否传入,还会保证传入的容量 cap 一定大于或者等于 len。除了校验参数之外,当前函数会将 OMAKE 节点转换成 OMAKESLICE,中间代码生成的 cmd/compile/internal/gc.walkexpr函数会依据下面两个条件转换 OMAKESLICE 类型的节点:

  1. 切片的大小和容量是否足够小;
  2. 切片是否发生了逃逸,最终在堆上初始化

当切片发生逃逸或者非常大时,运行时需要 runtime.makeslice在堆上初始化切片,如果当前的切片不会发生逃逸并且切片非常小的时候,make([]int, 3, 4) 会被直接转换成如下所示的代码:

当切片发生逃逸或者非常大时,运行时需要 runtime.makeslice 在堆上初始化切片,如果当前的切片不会发生逃逸并且切片非常小的时候,make([]int, 3, 4) 会被直接转换成如下所示的代码:

var arr [4]int
n := arr[:3]

上述代码会初始化数组并通过下标 [:3] 得到数组对应的切片,这两部分操作都会在编译阶段完成,编译器会在栈上或者静态存储区创建数组并将 [:3] 转换成 OpSliceMake 操作。

于创建切片的运行时函数 runtime.makeslice,这个函数的实现很简单:

func makeslice(et *_type, len, cap int) unsafe.Pointer {
mem, overflow := math.MulUintptr(et.size, uintptr(cap))
if overflow || mem > maxAlloc || len < 0 || len > cap {
mem, overflow := math.MulUintptr(et.size, uintptr(len))
if overflow || mem > maxAlloc || len < 0 {
panicmakeslicelen()
}
panicmakeslicecap()
}

return mallocgc(mem, et, true)
}

Go

上述函数的主要工作是计算切片占用的内存空间并在堆上申请一片连续的内存,它使用如下的方式计算占用的内存:

内存空间=切片中元素大小×切片容量内存空间=切片中元素大小×切片容量

虽然编译期间可以检查出很多错误,但是在创建切片的过程中如果发生了以下错误会直接触发运行时错误并崩溃:

  1. 内存空间的大小发生了溢出;
  2. 申请的内存大于最大可分配的内存;
  3. 传入的长度小于 0 或者长度大于容量;

runtime.makeslice函数的实现还是比较复杂,如果遇到了比较小的对象会直接初始化在 Go 语言调度器里面的 P 结构中,而大于 32KB 的对象会在堆上初始化

访问元素

func (s *state) expr(n *Node) *ssa.Value {
switch n.Op {
case OLEN, OCAP:
switch {
case n.Left.Type.IsSlice():
op := ssa.OpSliceLen
if n.Op == OCAP {
op = ssa.OpSliceCap
}
return s.newValue1(op, types.Types[TINT], s.expr(n.Left))
...
}
...
}
}

访问切片中的字段可能会触发 “decompose builtin” 阶段的优化,len(slice) 或者 cap(slice) 在一些情况下会直接替换成切片的长度或者容量,不需要在运行时获取:

(SlicePtr (SliceMake ptr _ _ )) -> ptr
(SliceLen (SliceMake _ len _)) -> len
(SliceCap (SliceMake _ _ cap)) -> cap

除了获取切片的长度和容量之外,访问切片中元素使用的 OINDEX 操作也会在中间代码生成期间转换成对地址的直接访问:

func (s *state) expr(n *Node) *ssa.Value {
switch n.Op {
case OINDEX:
switch {
case n.Left.Type.IsSlice():
p := s.addr(n, false)
return s.load(n.Left.Type.Elem(), p)
...
}
...
}
}

切片的操作基本都是在编译期间完成的,除了访问切片的长度、容量或者其中的元素之外,编译期间也会将包含 range 关键字的遍历转换成形式更简单的循环,我们会在后面的章节中介绍使用 range 遍历切片的过程。

追加与扩容

使用 append 关键字向切片中追加元素也是常见的切片操作,中间代码生成阶段的 cmd/compile/internal/gc.state.append 方法会根据返回值是否会覆盖原变量,选择进入两种流程,如果 append 返回的新切片不需要赋值回原有的变量,就会进入如下的处理流程:

// append(slice, 1, 2, 3)
ptr, len, cap := slice
newlen := len + 3
if newlen > cap {
ptr, len, cap = growslice(slice, newlen)
newlen = len + 3
}
*(ptr+len) = 1
*(ptr+len+1) = 2
*(ptr+len+2) = 3
return makeslice(ptr, newlen, cap)

我们会先解构切片结构体获取它的数组指针、大小和容量,如果在追加元素后切片的大小大于容量,那么就会调用 runtime.growslice对切片进行扩容并将新的元素依次加入切片。

如果使用 slice = append(slice, 1, 2, 3) 语句,那么 append 后的切片会覆盖原切片,这时 cmd/compile/internal/gc.state.append 方法会使用另一种方式展开关键字:

// slice = append(slice, 1, 2, 3)
a := &slice
ptr, len, cap := slice
newlen := len + 3
if uint(newlen) > uint(cap) {
newptr, len, newcap = growslice(slice, newlen)
vardef(a)
*a.cap = newcap
*a.ptr = newptr
}
newlen = len + 3
*a.len = newlen
*(ptr+len) = 1
*(ptr+len+1) = 2
*(ptr+len+2) = 3

是否覆盖原变量的逻辑其实差不多,最大的区别在于得到的新切片是否会赋值回原变量。如果我们选择覆盖原有的变量,就不需要担心切片发生拷贝影响性能,因为 Go 语言编译器已经对这种常见的情况做出了优化。

当切片的容量不足时,我们会调用 runtime.growslice 函数为切片扩容,扩容是为切片分配新的内存空间并拷贝原切片中元素的过程,我们先来看新切片的容量是如何确定的:

func growslice(et *_type, old slice, cap int) slice {
newcap := old.cap
doublecap := newcap + newcap
if cap > doublecap {
newcap = cap
} else {
if old.len < 1024 {
newcap = doublecap
} else {
for 0 < newcap && newcap < cap {
newcap += newcap / 4
}
if newcap <= 0 {
newcap = cap
}
}
}

在分配内存空间之前需要先确定新的切片容量,运行时根据切片的当前容量选择不同的策略进行扩容:

  1. 如果期望容量大于当前容量的两倍就会使用期望容量;
  2. 如果当前切片的长度小于 1024 就会将容量翻倍;
  3. 如果当前切片的长度大于 1024 就会每次增加 25% 的容量,直到新容量大于期望容量;

上述代码片段仅会确定切片的大致容量,下面还需要根据切片中的元素大小对齐内存,当数组中元素所占的字节大小为 1、8 或者 2 的倍数时,运行时会使用如下所示的代码对齐内存:

var overflow bool
var lenmem, newlenmem, capmem uintptr
switch {
case et.size == 1:
lenmem = uintptr(old.len)
newlenmem = uintptr(cap)
capmem = roundupsize(uintptr(newcap))
overflow = uintptr(newcap) > maxAlloc
newcap = int(capmem)
case et.size == sys.PtrSize:
lenmem = uintptr(old.len) * sys.PtrSize
newlenmem = uintptr(cap) * sys.PtrSize
capmem = roundupsize(uintptr(newcap) * sys.PtrSize)
overflow = uintptr(newcap) > maxAlloc/sys.PtrSize
newcap = int(capmem / sys.PtrSize)
case isPowerOfTwo(et.size):
...
default:
...
}

在默认情况下,我们会将目标容量和元素大小相乘得到占用的内存。如果计算新容量时发生了内存溢出或者请求内存超过上限,就会直接崩溃退出程序

var overflow bool
var newlenmem, capmem uintptr
switch {
...
default:
lenmem = uintptr(old.len) * et.size
newlenmem = uintptr(cap) * et.size
capmem, _ = math.MulUintptr(et.size, uintptr(newcap))
capmem = roundupsize(capmem)
newcap = int(capmem / et.size)
}
...
var p unsafe.Pointer
if et.kind&kindNoPointers != 0 {
p = mallocgc(capmem, nil, false)
memclrNoHeapPointers(add(p, newlenmem), capmem-newlenmem)
} else {
p = mallocgc(capmem, et, true)
if writeBarrier.enabled {
bulkBarrierPreWriteSrcOnly(uintptr(p), uintptr(old.array), lenmem)
}
}
memmove(p, old.array, lenmem)
return slice{p, old.len, newcap}
}

如果切片中元素不是指针类型,那么会调用 runtime.memclrNoHeapPointers 将原数组内存中的内容拷贝到新申请的内存中。

runtime.growslice,其中包含了新的数组指针、大小和容量,这个返回的三元组最终会覆盖原切片

var arr []int64
arr = append(arr, 1, 2, 3, 4, 5)

简单总结一下扩容的过程,当我们执行上述代码时,会触发 runtime.growslice期望分配的内存大小为 40 字节;不过因为切片中的元素大小等于 sys.PtrSize,所以运行时会调用 runtime.roundupsize 8 = 6。

哈希表

数据结构

Go 语言运行时同时使用了多个数据结构组合表示哈希表,其中 runtime.hmap是最核心的结构体

type hmap struct {
count int
flags uint8
B uint8
noverflow uint16
hash0 uint32

buckets unsafe.Pointer
oldbuckets unsafe.Pointer
nevacuate uintptr

extra *mapextra
}

type mapextra struct {
overflow *[]*bmap
oldoverflow *[]*bmap
nextOverflow *bmap
}
  1. count 表示当前哈希表中的元素数量;
  2. B 表示当前哈希表持有的 buckets 数量,但是因为哈希表中桶的数量都 2 的倍数,所以该字段会存储对数,也就是 len(buckets) == 2^B
  3. hash0 是哈希的种子,它能为哈希函数的结果引入随机性,这个值在创建哈希表时确定,并在调用哈希函数时作为参数传入;
  4. oldbuckets 是哈希在扩容时用于保存之前 buckets 的字段,它的大小是当前 buckets 的一半;

如上图所示哈希表 runtime.hmap 的桶是 runtime.bmap都能存储 8 个键值对,当哈希表中存储的数据过多,单个桶已经装满时就会使用 extra.nextOverflow 中桶存储溢出的数据。

上述两种不同的桶在内存中是连续存储的,我们在这里将它们分别称为正常桶和溢出桶,上图中黄色的 runtime.bmap就是正常桶,绿色的 runtime.bmap使用 C 语言实现时使用的设计3,由于它能够减少扩容的频率所以一直使用至今。

桶的结构体 runtime.bmap在 Go 语言源代码中的定义只包含一个简单的 tophash 字段,tophash 存储了键的哈希的高 8 位,通过比较不同键的哈希的高 8 位可以减少访问键值对次数以提高性能:

type bmap struct {
tophash [bucketCnt]uint8
}

在运行期间,runtime.bmap段,因为哈希表中可能存储不同类型的键值对,而且 Go 语言也不支持泛型,所以键值对占据的内存空间大小只能在编译时进行推导。runtime.bmap就不包含这些字段,不过我们能根据编译期间的 cmd/compile/internal/gc.bmap

type bmap struct {
topbits [8]uint8
keys [8]keytype
values [8]valuetype
pad uintptr
overflow uintptr
}

随着哈希表存储的数据逐渐增多,我们会扩容哈希表或者使用额外的桶存储溢出的数据,不会让单个桶中的数据超过 8 个,不过溢出桶只是临时的解决方案,创建过多的溢出桶最终也会导致哈希的扩容。

从 Go 语言哈希的定义中可以发现,改进元素比数组和切片复杂得多,它的结构体中不仅包含大量字段,还使用复杂的嵌套结构

初始化

Go 语言初始化哈希的两种方法 — 通过字面量和运行时

字面量

目前的现代编程语言基本都支持使用字面量的方式初始化哈希,一般都会使用 key: value 的语法来表示键值对,Go 语言中也不例外:

hash := map[string]int{
"1": 2,
"3": 4,
"5": 6,
}

我们需要在初始化哈希时声明键值对的类型,这种使用字面量初始化的方式最终都会通过cmd/compile/internal/gc.maplit

func maplit(n *Node, m *Node, init *Nodes) {
a := nod(OMAKE, nil, nil)
a.Esc = n.Esc
a.List.Set2(typenod(n.Type), nodintconst(int64(n.List.Len())))
litas(m, a, init)

entries := n.List.Slice()
if len(entries) > 25 {
...
return
}

// Build list of var[c] = expr.
// Use temporaries so that mapassign1 can have addressable key, elem.
...
}

当哈希表中的元素数量少于或者等于 25 个时,编译器会将字面量初始化的结构体转换成以下的代码,将所有的键值对一次加入到哈希表中:

hash := make(map[string]int, 3)
hash["1"] = 2
hash["3"] = 4
hash["5"] = 6

这种初始化的方式与的数组切片几乎完全相同,由此看来集合类型的初始化在 Go 语言中有着相同的处理逻辑。

一旦哈希表中元素的数量超过了 25 个,编译器会创建两个数组分别存储键和值,这些键值对会通过如下所示的 for 循环加入哈希:

hash := make(map[string]int, 26)
vstatk := []string{"1", "2", "3", ... , "26"}
vstatv := []int{1, 2, 3, ... , 26}
for i := 0; i < len(vstak); i++ {
hash[vstatk[i]] = vstatv[i]
}

这里展开的两个切片 vstatkvstatv 还会被编辑器继续展开,具体的展开方式可以阅读上一节了解,不过无论使用哪种方法,使用字面量初始化的过程都会使用 Go 语言中的关键字 make 来创建新的哈希并通过最原始的 [] 语法向哈希追加元素。

运行时

当创建的哈希被分配到栈上并且其容量小于 BUCKETSIZE = 8 时,Go 语言在编译阶段会使用如下方式快速初始化哈希,这也是编译器对小容量的哈希做的优化:

var h *hmap
var hv hmap
var bv bmap
h := &hv
b := &bv
h.buckets = b
h.hash0 = fashtrand0()

除了上述特定的优化之外,无论 make 是从哪里来的,只要我们使用 make 创建哈希,Go 语言编译器都会在类型检查期间将它们转换成 runtime.makemap最后调用的都是 runtime.makemap

func makemap(t *maptype, hint int, h *hmap) *hmap {
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
hint = 0
}

if h == nil {
h = new(hmap)
}
h.hash0 = fastrand()

B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B

if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}

这个函数会按照下面的步骤执行:

  1. 计算哈希占用的内存是否溢出或者超出能分配的最大值;
  2. 调用runtime.fastrand]获取一个随机的哈希种子;
  3. 根据传入的 hint 计算出需要的最小需要的桶的数量;
  4. 使用 runtime.makeBucketArray 创建用于保存桶的数组;

runtime.makeBucketArray会根据传入的 B 计算出的需要创建的桶数量并在内存中分配一片连续的空间用于存储数据:

func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
base := bucketShift(b)
nbuckets := base
if b >= 4 {
nbuckets += bucketShift(b - 4)
sz := t.bucket.size * nbuckets
up := roundupsize(sz)
if up != sz {
nbuckets = up / t.bucket.size
}
}

buckets = newarray(t.bucket, int(nbuckets))
if base != nbuckets {
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
last.setoverflow(t, (*bmap)(buckets))
}
return buckets, nextOverflow

  • 当桶的数量小于 2424 时,由于数据较少、使用溢出桶的可能性较低,会省略创建的过程以减少额外开销;
  • 当桶的数量多于 2424 时,会额外创建 2B−42B−4 个溢出桶;

根据上述代码,我们能确定在正常情况下,正常桶和溢出桶在内存中的存储空间是连续的,只是被runtime.hmap 中的不同字段引用,当溢出桶数量较多时会通过 runtime.newobject

读写操作

哈希表的访问一般都是通过下标或者遍历进行的:

_ = hash[key]

for k, v := range hash {
// k, v
}

这两种方式虽然都能读取哈希表的数据,但是使用的函数和底层原理完全不同。前者需要知道哈希的键并且一次只能获取单个键对应的值,而后者可以遍历哈希中的全部键值对,访问数据时也不需要预先知道哈希的键。

数据结构的写一般指的都是增加、删除和修改,增加和修改字段都使用索引和赋值语句,而删除字典中的数据需要使用关键字 delete

hash[key] = value
hash[key] = newValue
delete(hash, key)

扩容

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again
}
...
}

mapassign

函数会在以下两种情况发生时触发哈希的扩容:

  1. 装载因子已经超过 6.5;
  2. 哈希使用了太多溢出桶;

不过因为 Go 语言哈希的扩容不是一个原子的过程,所以 runtime.mapassign]还需要判断当前哈希是否已经处于扩容状态,避免二次扩容造成混乱。

根据触发的条件不同扩容的方式分成两种,如果这次扩容是溢出的桶太多导致的,那么这次扩容就是等量扩容 sameSizeGrowsameSizeGrow 是一种特殊情况下发生的扩容,当我们持续向哈希中插入数据并将它们全部删除时,如果哈希表中的数据量没有超过阈值,就会不断积累溢出桶造成缓慢的内存泄漏runtime: limit the number of map overflow buckets引入了 sameSizeGrow 通过复用已有的哈希扩容机制解决该问题,一旦哈希中出现了过多的溢出桶,它会创建新桶保存数据,垃圾回收会清理老的溢出桶并释放内存

func hashGrow(t *maptype, h *hmap) {
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
bigger = 0
h.flags |= sameSizeGrow
}
oldbuckets := h.buckets
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0

h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
h.extra.nextOverflow = nextOverflow
}

哈希在扩容的过程中会通过 runtime.makeBucketArray了相同的逻辑更新,下图展示了触发扩容后的哈希:

我们在 runtime.hashGrow 中还看不出来等量扩容和翻倍扩容的太多区别,等量扩容创建的新桶数量只是和旧桶一样,该函数中只是创建了新的桶,并没有对数据进行拷贝和转移。哈希表的数据迁移的过程在是 runtime.evacuate

func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
newbit := h.noldbuckets()
if !evacuated(b) {
var xy [2]evacDst
x := &xy[0]
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.v = add(x.k, bucketCnt*uintptr(t.keysize))

y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.v = add(y.k, bucketCnt*uintptr(t.keysize))

runtime.evacuate]会将一个旧桶中的数据分流到两个新桶,所以它会创建两个用于保存分配上下文的 runtime.evacDst

image-20210530140631149

哈希表扩容目的

如果这是等量扩容,那么旧桶与新桶之间是一对一的关系,所以两个 runtime.evacDst 只会初始化一个。而当哈希表的容量翻倍时,每个旧桶的元素会都分流到新创建的两个桶中,这里仔细分析一下分流元素的逻辑:

		for ; b != nil; b = b.overflow(t) {
k := add(unsafe.Pointer(b), dataOffset)
v := add(k, bucketCnt*uintptr(t.keysize))
for i := 0; i < bucketCnt; i, k, v = i+1, add(k, uintptr(t.keysize)), add(v, uintptr(t.valuesize)) {
top := b.tophash[i]
k2 := k
var useY uint8
hash := t.key.alg.hash(k2, uintptr(h.hash0))
if hash&newbit != 0 {
useY = 1
}
b.tophash[i] = evacuatedX + useY
dst := &xy[useY]

if dst.i == bucketCnt {
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.v = add(dst.k, bucketCnt*uintptr(t.keysize))
}
dst.b.tophash[dst.i&(bucketCnt-1)] = top
typedmemmove(t.key, dst.k, k)
typedmemmove(t.elem, dst.v, v)
dst.i++
dst.k = add(dst.k, uintptr(t.keysize))
dst.v = add(dst.v, uintptr(t.valuesize))
}
}
...
}

只使用哈希函数是不能定位到具体某一个桶的,哈希函数只会返回很长的哈希,例如:b72bfae3f3285244c4732ce457cca823bc189e0b,我们还需一些方法将哈希映射到具体的桶上。我们一般都会使用取模或者位操作来获取桶的编号,假如当前哈希中包含 4 个桶,那么它的桶掩码就是 0b11(3),使用位操作就会得到 3, 我们就会在 3 号桶中存储该数据:

0xb72bfae3f3285244c4732ce457cca823bc189e0b & 0b11 #=> 0

如果新的哈希表有 8 个桶,在大多数情况下,原来经过桶掩码 0b11 结果为 3 的数据会因为桶掩码增加了一位变成 0b111 而分流到新的 3 号和 7 号桶,所有数据也都会被 runtime.typedmemmove拷贝到目标桶中

哈希表桶数据的分流

runtime.evacuate增加哈希的 nevacuate 计数器并在所有的旧桶都被分流后清空哈希的 oldbucketsoldoverflow

func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
h.nevacuate++
stop := h.nevacuate + 1024
if stop > newbit {
stop = newbit
}
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate++
}
if h.nevacuate == newbit { // newbit == # of oldbuckets
h.oldbuckets = nil
if h.extra != nil {
h.extra.oldoverflow = nil
}
h.flags &^= sameSizeGrow
}
}

Go

之前在分析哈希表访问函数 runtime.mapaccess1时其实省略了扩容期间获取键值对的逻辑,当哈希表的 oldbuckets 存在时,会先定位到旧桶并在该桶没有被分流时从中获取键值对。

func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
alg := t.key.alg
hash := alg.hash(key, uintptr(h.hash0))
m := bucketMask(h.B)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
if c := h.oldbuckets; c != nil {
if !h.sameSizeGrow() {
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
if !evacuated(oldb) {
b = oldb
}
}
bucketloop:
...
}

因为旧桶中的元素还没有被 runtime.evacuate函数分流,其中还保存着我们需要使用的数据,所以旧桶会替代新创建的空桶提供数据。

我们在 runtime.mapassign函数中也省略了一段逻辑,当哈希表正在处于扩容状态时,每次向哈希表写入值时都会触发runtime.growWork

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
again:
bucket := hash & bucketMask(h.B)
if h.growing() {
growWork(t, h, bucket)
}
...
}

当然除了写入操作之外,删除操作也会在哈希表扩容期间触发 runtime.growWork

哈希在存储元素过多时会触发扩容操作,每次都会将桶的数量翻倍,扩容过程不是原子的,而是通过runtime.growWork希表写入数据时会触发旧桶元素的分流。除了这种正常的扩容之外,为了解决大量写入、删除造成的内存泄漏问题,哈希引入了 sameSizeGrow 这一机制,在出现较多溢出桶时会整理哈希的内存减少空间的占用。