一些常见的并发编程错误

本文译自 Some Common Concurrent Programming Mistakes 版权@归原文所有.

Go 是内置了并发编程支持的语言. 通过使用关键字 go 来创建 goroutines (轻量级线程), 并且使用 channels 和 Go 提供的其他并发同步的技术, 并发编程变得简单, 灵活, 并且富有乐趣.

另一方面, Go 不会阻止 Go 程序员由于粗心或者缺乏经验导致的一些并发编程的错误. 本文接下来的部分将展示一些常见的 Go 并发编程错误, 来帮助 Go 程序员避免类似的错误.

需要同步的时候没有同步

代码行可能不会按书写代码的顺序执行.

如下的程序有两个错误:

  • 首先, 主 goroutine 中 b 的读取和新 goroutine 中 b 的写入可能存在数据竞争.
  • 其次, 条件 b == true 无法确保主 goroutine 满足 a != nil. 编译器和 CPU 在新 goroutine 中通过重排序进行优化, 所以运行时 b 的赋值可能发生在 a 的赋值之前, 导致当主 goroutine 中 a 的元素被修改的时候, a 仍旧是 nil.
package main

import (
	"time"
	"runtime"
)

func main() {
	var a []int // nil
	var b bool  // false

	// a new goroutine
	go func () {
		a = make([]int, 3)
		b = true // write b
	}()

	for !b { // read b
		time.Sleep(time.Second)
		runtime.Gosched()
	}
	a[0], a[1], a[2] = 0, 1, 2 // might panic
}

上面的程序可能在一台电脑上运行良好, 在另一台电脑上 panic. 或者它可能 N 次运行良好, N+1 次 panic.

我们应该使用 channels 或者 sync 标准库提供的同步技术来确保内存序. 比如,

package main

func main() {
	var a []int = nil
	c := make(chan struct{})

	// a new goroutine
	go func () {
		a = make([]int, 3)
		c <- struct{}{}
	}()

	<-c
	a[0], a[1], a[2] = 0, 1, 2
}

使用 time.Sleep 做同步

让我们先看一个简单的例子.

package main

import (
	"fmt"
	"time"
)

func main() {
	var x = 123

	go func() {
		x = 789 // write x
	}()

	time.Sleep(time.Second)
	fmt.Println(x) // read x
}

我们期望程序打印 789. 如果我们运行它, 它确实打印 789, 差不多总是这样. 但是它是否是一个具有良好同步的程序 ? 不 ! 原因是 Go 运行时不保证 x 的写入发生在 x 的读取之前. 某些条件下, 比如大部分 CPU 资源被运行在同一 OS 的程序消费, x 的写入可能发生在 x 的读取之后. 这就是为什么我们永远不要使用 time.Sleep 在正式项目中做同步.

让我们再看另一个例子.

package main

import (
	"fmt"
	"time"
)

var x = 0

func main() {
	var num = 123
	var p = &num

	c := make(chan int)

	go func() {
		c <- *p + x
	}()

	time.Sleep(time.Second)
	num = 789
	fmt.Println(<-c)
}

你期望程序输出什么 ? 123 还是 789 ? 事实上, 该输出是编译器相关的. 对于标准 Go 编译器 1.11, 程序非常有可能输出 123. 但是理论上, 它也可能输出 789, 或者另一个不期望的数.

现在, 让我们把 c <- *p + x 改成 c <- *p, 并且重新运行程序. 你会发现输出变成 789 (对于标准 Go 编译器 1.11). 再说一次, 该输出是编译器相关的.

是的, 上面的程序有数据竞争. 表达式 *p 可能会在 num = 789 赋值操作之前,之后抑或同时求值. time.Sleep 无法确保 *p 的求值发生在赋值操作之前.

对于这个特殊的例子, 我们应该在创建新的 goroutine 之前存储要发送的值到一个临时的值里面, 并且在新 goroutine 里发送该临时值来移除数据竞争.

...
	tmp := *p + x
	go func() {
		c <- tmp
	}()
...

任由 Goroutines 挂起(Hanging)

挂起的 goroutines 是指永远保持阻塞的 goroutines. 有许多原因导致 goroutines 挂起. 比如,

  • 一个 goroutine 尝试从一个 nil channel 接收值或者从一个没有任何 goroutine 发送数据的 channel 接收值.
  • 一个 goroutine 尝试发送值到 nil channel 或者发送值到一个没有任何 goroutine 从中读取值的 channel.
  • 一个 goroutine 自己死锁.
  • 一组 goroutines 相互死锁.
  • 一个 goroutine 在执行没有 default 分支的 select 代码块时阻塞, 并且 select 代码块中 case 关键字之后的所有 channel 操作永远阻塞.

除了有时候我们故意让主 goroutine 挂起来避免程序退出, 大多数其他的挂起 goroutine 案例都是不期望的. 对于 Go 运行时来说很难判断一个处于阻塞状态的 goroutine 是否挂起还是处于临时的阻塞状态. 所以 Go 运行时从来不会释放由挂起的 goroutine 消费的资源.

谁先响应谁赢的 channel 使用案例中, 如果之后使用的 channel 容量不够大, 那么当尝试将结果发送到之后的 channel 时, 一些较慢的响应 goroutines 将会挂起. 例如, 如果调用以下函数, 将会有 4 个 goroutines 永远处于阻塞状态.

func request() int {
	c := make(chan int)
	for i := 0; i < 5; i++ {
		i := i
		go func() {
			c <- i // 4 goroutines will hang here.
		}()
	}
	return <-c
}

为避免这四个 goroutines 挂起, channel c 的容量必须至少为 4.

谁先响应谁赢的第二种方式的 channel 使用案例中, 如果用的 channel 是无缓冲的,则 channel 接收者可能永远不会得到响应并挂起. 例如, 如果在 goroutine 中调用以下函数, 则 goroutine 可能会挂起. 原因是, 如果五个 try-send 操作都发生在接收操作 <-c 准备就绪之前, 则所有五个 try-send 操作都将无法发送值, 因此调用者 goroutine 将永远不会收到值.

func request() int {
	c := make(chan int)
	for i := 0; i < 5; i++ {
		i := i
		go func() {
			select {
			case c <- i:
			default:
			}
		}()
	}
	return <-c
}

将 channel c 更改为缓冲 channel 将保证五个尝试发送操作中的至少一个成功, 以便调用方 goroutine 永远不会挂起在上述函数中.

在 sync 标准库中复制类型的值

在实践中, sync 标准库中(Locker 接口值除外)类型的值不应该被拷贝. 我们应该只拷贝这些值的指针.

以下是(不好的并发编程)的例子. 这个例子中, 当调用 Counter.Value 时, 一个 Counter 接收值会被拷贝. 作为接收值的一个字段, Counter 接收值的 Mutex 字段也会被拷贝. 该拷贝不是同步的, 所以拷贝的 Mutex 可能已经被破坏. 即使它没有损坏, 它所保护的是复制的 Counter 接收者值的访问, 这通常是没有意义的.

import "sync"

type Counter struct {
	sync.Mutex
	n int64
}

// This method is okay.
func (c *Counter) Increase(d int64) (r int64) {
	c.Lock()
	c.n += d
	r = c.n
	c.Unlock()
	return
}

// The method is bad. When it is called, a Counter
// receiver value will be copied.
func (c Counter) Value() (r int64) {
	c.Lock()
	r = c.n
	c.Unlock()
	return
}

我们应该将 Value 方法的接收者类型更改为指针类型 *Counter, 以避免复制 Mutex 值.

官方 Go SDK 中提供的 go vet 命令将报告潜在的错误值拷贝.

在错误的地方调用 sync.WaitGroup

每个 sync.WaitGroup 值在内部维护一个计数器, 计数器的初始值为零. 如果 WaitGroup 值的计数器为零, 则对 WaitGroup 值的 Wait 方法的调用将不会阻塞, 否则调用将阻塞, 直到计数器值变为零.

要使 WaitGroup 值的使用有意义, 当 WaitGroup 值的计数器为零时, 对 WaitGroup 值的 Wait 方法的相应调用必须发生在调用 WaitGroup 值的 Add 方法之前.

例如, 在以下程序中, 在不正确的位置调用 Add 方法, 这使得最终打印的数字不总是 100. 实际上, 程序的最终打印数可以是 [0, 100] 范围内的任意数字. 原因是在 Wait 方法调用之前不保证发生 Add 方法调用.

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	var wg sync.WaitGroup
	var x int32 = 0
	for i := 0; i < 100; i++ {
		go func() {
			wg.Add(1)
			atomic.AddInt32(&x, 1)
			wg.Done()
		}()
	}

	fmt.Println("To wait ...")
	wg.Wait()
	fmt.Println(atomic.LoadInt32(&x))
}

为了使程序按预期运行, 我们应该将 Add 方法调用移出 for 循环中创建的新 goroutine, 如下面的代码所示.

...
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func() {
			atomic.AddInt32(&x, 1)
			wg.Done()
		}()
	}
...

错误使用 Channels 作为 Futures

从文章channel 使用案例中, 我们知道一些函数会返回 channels 作为 futures. 假设 fafb 是两个这样的函数, 那么下面的调用将不正确地使用 future 参数.

doSomethingWithFutureArguments(<-fa(), <-fb())

在上面的代码行中, 两个 channel 接收操作是顺序处理的, 而不是同时处理的. 我们应该将其修改为以下内容以同时处理它们.

ca, cb := fa(), fb()
doSomethingWithFutureArguments(<-ca, <-cb)

不是从最后一个活跃的发送者 Goroutine 关闭 Channels

Go 程序员常犯的一个错误就是当还有一些其他 goroutine 可能会在之后向该 channel 发送值时关闭一个 channel. 当这种潜在的发送(到关闭的 channel)真的发生时,会发生 panic.

一些有名的 Go 项目也犯过这样的错误, 比如 Kubernetes 项目中的这个以及这个 bug.

请阅读这篇文章, 了解如何安全, 优雅地关闭 channel.

64 位值的原子操作不保证 64 位对齐

到目前为止(Go 1.11), 对于标准的 Go 编译器, 64 位原子操作所涉及的值的地址需要 64 位对齐. 如果不这样做可能会导致 panic. 对于标准 Go 编译器, 此类故障只会发生在32位体系架构上. 请阅读内存布局, 了解如何保证 64 位字地址在 32 位操作系统上 64 位对齐.

不重视 time.After 调用带来的大量资源消耗

time 标准库中的 After 函数返回一个延迟通知的 channel. 该函数很方便, 但每次调用都会创建 time.Timer 类型的新值. 新创建的 Timer 值将在 After 函数传递的参数指定的持续时间内保持活跃状态. 如果在持续时间内多次调用该函数, 则会有许多 Timer 值存活并消耗大量内存和计算.

例如, 如果调用以下 longRunning 函数, 并且在一分钟内有数百万条消息进来, 那么在一定时期内将有数百万个 Timer 值存活, 即使这些 Timer 值中的大部分已经变得无用.

import (
	"fmt"
	"time"
)

// The function will return if a message arrival interval
// is larger than one minute.
func longRunning(messages <-chan string) {
	for {
		select {
		case <-time.After(time.Minute):
			return
		case msg := <-messages:
			fmt.Println(msg)
		}
	}
}

为避免在上面的代码中创建太多的 Timer 值, 我们应该使用单个 Timer 值来完成相同的工作.

func longRunning(messages <-chan string) {
	timer := time.NewTimer(time.Minute)
	defer timer.Stop()

	for {
		select {
		case <-timer.C:
			return
		case msg := <-messages:
			fmt.Println(msg)
			if !timer.Stop() {
				<-timer.C
			}
		}

		// The above "if" block can also be put here.

		timer.Reset(time.Minute)
	}
}

错误的使用 time.Timer

上一节中展示了 time.Timer 值的惯用法的例子. 应该注意的一个细节是应该始终调用 Reset 方法来停止或者过期 time.Timer 值.

select 块的第一个 case 分支的末尾, time.Timer 值已经过期, 因此我们不需要停止它. 但是我们必须在第二个分支中停止计时器. 如果第二个分支缺失 if 代码块, 发送(通过 Go 运行时)到 channel (timer.C) 可能会和 Reset 方法调用产生竞争, 并且 longRunning 函数可能比预期早返回, 因为 Reset 方法仅会重设内部计时器为零, 它不会清除已经发送到 timer.C channel 的值.

比如, 如下的程序非常可能在 1 秒内退出而不是 10 秒. 而且更重要的是, 程序是没有数据竞争的.

package main

import (
	"fmt"
	"time"
)

func main() {
	start := time.Now()
	timer := time.NewTimer(time.Second/2)
	select {
	case <-timer.C:
	default:
		time.Sleep(time.Second) // go here
	}
	timer.Reset(time.Second * 10)
	<-timer.C
	fmt.Println(time.Since(start)) // 1.000188181s
}

time.Timer 值可以在不再使用时保持非停止状态, 但建议最后停止它.

它很容易出错, 不建议在多个 goroutine 中同时使用 time.Timer 值.

我们不应该依赖 Reset 方法调用的返回值. Reset 方法的返回结果仅用于兼容性目的.

comments powered by Disqus