示例: 并发的非阻塞缓存

本节中我们会做一个无阻塞的缓存,这种工具可以帮助我们来解决现实世界中并发程序出现但没有现成的库可以解决的问题。这个问题叫作缓存(memoizing)函数(译注:Memoization的定义: memoization 一词是Donald Michie 根据拉丁语memorandum杜撰的一个词。相应的动词、过去分词、ing形式有memoiz、memoized、memoizing),也就是说,我们需要缓存函数的返回结果,这样在对函数进行调用的时候,我们就只需要一次计算,之后只要返回计算的结果就可以了。我们的解决方案会是并发安全且会避免对整个缓存加锁而导致所有操作都去争一个锁的设计。

我们将使用下面的httpGetBody函数作为我们需要缓存的函数的一个样例。这个函数会去进行HTTP GET请求并且获取http响应body。对这个函数的调用本身开销是比较大的,所以我们尽量避免在不必要的时候反复调用。

func httpGetBody(url string) (interface{}, error) {
	resp, err := http.Get(url)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	return ioutil.ReadAll(resp.Body)
}

最后一行稍微隐藏了一些细节。ReadAll会返回两个结果,一个[]byte数组和一个错误,不过这两个对象可以被赋值给httpGetBody的返回声明里的interface{}和error类型,所以我们也就可以这样返回结果并且不需要额外的工作了。我们在httpGetBody中选用这种返回类型是为了使其可以与缓存匹配。

下面是我们要设计的cache的第一个“草稿”:

gopl.io/ch9/memo1

// Package memo provides a concurrency-unsafe
// memoization of a function of type Func.
package memo

// A Memo caches the results of calling a Func.
type Memo struct {
	f     Func
	cache map[string]result
}

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

type result struct {
	value interface{}
	err   error
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]result)}
}

// NOTE: not concurrency-safe!
func (memo *Memo) Get(key string) (interface{}, error) {
	res, ok := memo.cache[key]
	if !ok {
		res.value, res.err = memo.f(key)
		memo.cache[key] = res
	}
	return res.value, res.err
}

Memo实例会记录需要缓存的函数f(类型为Func),以及缓存内容(里面是一个string到result映射的map)。每一个result都是简单的函数返回的值对儿——一个值和一个错误值。继续下去我们会展示一些Memo的变种,不过所有的例子都会遵循上面的这些方面。

下面是一个使用Memo的例子。对于流入的URL的每一个元素我们都会调用Get,并打印调用延时以及其返回的数据大小的log:

m := memo.New(httpGetBody)
for url := range incomingURLs() {
	start := time.Now()
	value, err := m.Get(url)
	if err != nil {
		log.Print(err)
	}
	fmt.Printf("%s, %s, %d bytes\n",
	url, time.Since(start), len(value.([]byte)))
}

我们可以使用测试包(第11章的主题)来系统地鉴定缓存的效果。从下面的测试输出,我们可以看到URL流包含了一些重复的情况,尽管我们第一次对每一个URL的(*Memo).Get的调用都会花上几百毫秒,但第二次就只需要花1毫秒就可以返回完整的数据了。

$ go test -v gopl.io/ch9/memo1
=== RUN   Test
https://golang.org, 175.026418ms, 7537 bytes
https://godoc.org, 172.686825ms, 6878 bytes
https://play.golang.org, 115.762377ms, 5767 bytes
http://gopl.io, 749.887242ms, 2856 bytes
https://golang.org, 721ns, 7537 bytes
https://godoc.org, 152ns, 6878 bytes
https://play.golang.org, 205ns, 5767 bytes
http://gopl.io, 326ns, 2856 bytes
--- PASS: Test (1.21s)
PASS
ok  gopl.io/ch9/memo1   1.257s

这个测试是顺序地去做所有的调用的。

由于这种彼此独立的HTTP请求可以很好地并发,我们可以把这个测试改成并发形式。可以使用sync.WaitGroup来等待所有的请求都完成之后再返回。

m := memo.New(httpGetBody)
var n sync.WaitGroup
for url := range incomingURLs() {
	n.Add(1)
	go func(url string) {
		start := time.Now()
		value, err := m.Get(url)
		if err != nil {
			log.Print(err)
		}
		fmt.Printf("%s, %s, %d bytes\n",
		url, time.Since(start), len(value.([]byte)))
		n.Done()
	}(url)
}
n.Wait()

这次测试跑起来更快了,然而不幸的是貌似这个测试不是每次都能够正常工作。我们注意到有一些意料之外的cache miss(缓存未命中),或者命中了缓存但却返回了错误的值,或者甚至会直接崩溃。

但更糟糕的是,有时候这个程序还是能正确的运行(译:也就是最让人崩溃的偶发bug),所以我们甚至可能都不会意识到这个程序有bug。但是我们可以使用-race这个flag来运行程序,竞争检测器(§9.6)会打印像下面这样的报告:

$ go test -run=TestConcurrent -race -v gopl.io/ch9/memo1
=== RUN   TestConcurrent
...
WARNING: DATA RACE
Write by goroutine 36:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
  ...
Previous write by goroutine 35:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
...
Found 1 data race(s)
FAIL    gopl.io/ch9/memo1   2.393s

memo.go的32行出现了两次,说明有两个goroutine在没有同步干预的情况下更新了cache map。这表明Get不是并发安全的,存在数据竞争。

28  func (memo *Memo) Get(key string) (interface{}, error) {
29      res, ok := memo.cache(key)
30      if !ok {
31          res.value, res.err = memo.f(key)
32          memo.cache[key] = res
33      }
34      return res.value, res.err
35  }

最简单的使cache并发安全的方式是使用基于监控的同步。只要给Memo加上一个mutex,在Get的一开始获取互斥锁,return的时候释放锁,就可以让cache的操作发生在临界区内了:

gopl.io/ch9/memo2

type Memo struct {
	f     Func
	mu    sync.Mutex // guards cache
	cache map[string]result
}

// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	res, ok := memo.cache[key]
    if !ok {
		res.value, res.err = memo.f(key)
		memo.cache[key] = res
	}
	memo.mu.Unlock()
	return res.value, res.err
}

测试依然并发进行,但这回竞争检查器“沉默”了。不幸的是对于Memo的这一点改变使我们完全丧失了并发的性能优点。每次对f的调用期间都会持有锁,Get将本来可以并行运行的I/O操作串行化了。我们本章的目的是完成一个无锁缓存,而不是现在这样的将所有请求串行化的函数的缓存。

下一个Get的实现,调用Get的goroutine会两次获取锁:查找阶段获取一次,如果查找没有返回任何内容,那么进入更新阶段会再次获取。在这两次获取锁的中间阶段,其它goroutine可以随意使用cache。

gopl.io/ch9/memo3

func (memo *Memo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	res, ok := memo.cache[key]
	memo.mu.Unlock()
	if !ok {
		res.value, res.err = memo.f(key)

		// Between the two critical sections, several goroutines
		// may race to compute f(key) and update the map.
		memo.mu.Lock()
		memo.cache[key] = res
		memo.mu.Unlock()
	}
	return res.value, res.err
}

这些修改使性能再次得到了提升,但有一些URL被获取了两次。这种情况在两个以上的goroutine同一时刻调用Get来请求同样的URL时会发生。多个goroutine一起查询cache,发现没有值,然后一起调用f这个慢不拉叽的函数。在得到结果后,也都会去更新map。其中一个获得的结果会覆盖掉另一个的结果。

理想情况下是应该避免掉多余的工作的。而这种“避免”工作一般被称为duplicate suppression(重复抑制/避免)。下面版本的Memo每一个map元素都是指向一个条目的指针。每一个条目包含对函数f调用结果的内容缓存。与之前不同的是这次entry还包含了一个叫ready的channel。在条目的结果被设置之后,这个channel就会被关闭,以向其它goroutine广播(§8.9)去读取该条目内的结果是安全的了。

gopl.io/ch9/memo4

type entry struct {
	res   result
	ready chan struct{} // closed when res is ready
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]*entry)}
}

type Memo struct {
	f     Func
	mu    sync.Mutex // guards cache
	cache map[string]*entry
}

func (memo *Memo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	e := memo.cache[key]
	if e == nil {
		// This is the first request for this key.
		// This goroutine becomes responsible for computing
		// the value and broadcasting the ready condition.
		e = &entry{ready: make(chan struct{})}
		memo.cache[key] = e
		memo.mu.Unlock()

		e.res.value, e.res.err = memo.f(key)

		close(e.ready) // broadcast ready condition
	} else {
		// This is a repeat request for this key.
		memo.mu.Unlock()

		<-e.ready // wait for ready condition
	}
	return e.res.value, e.res.err
}

现在Get函数包括下面这些步骤了:获取互斥锁来保护共享变量cache map,查询map中是否存在指定条目,如果没有找到那么分配空间插入一个新条目,释放互斥锁。如果存在条目的话且其值没有写入完成(也就是有其它的goroutine在调用f这个慢函数)时,goroutine必须等待值ready之后才能读到条目的结果。而想知道是否ready的话,可以直接从ready channel中读取,由于这个读取操作在channel关闭之前一直是阻塞。

如果没有条目的话,需要向map中插入一个没有准备好的条目,当前正在调用的goroutine就需要负责调用慢函数、更新条目以及向其它所有goroutine广播条目已经ready可读的消息了。

条目中的e.res.value和e.res.err变量是在多个goroutine之间共享的。创建条目的goroutine同时也会设置条目的值,其它goroutine在收到"ready"的广播消息之后立刻会去读取条目的值。尽管会被多个goroutine同时访问,但却并不需要互斥锁。ready channel的关闭一定会发生在其它goroutine接收到广播事件之前,因此第一个goroutine对这些变量的写操作是一定发生在这些读操作之前的。不会发生数据竞争。

这样并发、不重复、无阻塞的cache就完成了。

上面这样Memo的实现使用了一个互斥量来保护多个goroutine调用Get时的共享map变量。不妨把这种设计和前面提到的把map变量限制在一个单独的monitor goroutine的方案做一些对比,后者在调用Get时需要发消息。

Func、result和entry的声明和之前保持一致:

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

// A result is the result of calling a Func.
type result struct {
	value interface{}
	err   error
}

type entry struct {
	res   result
	ready chan struct{} // closed when res is ready
}

然而Memo类型现在包含了一个叫做requests的channel,Get的调用方用这个channel来和monitor goroutine来通信。requests channel中的元素类型是request。Get的调用方会把这个结构中的两组key都填充好,实际上用这两个变量来对函数进行缓存的。另一个叫response的channel会被拿来发送响应结果。这个channel只会传回一个单独的值。

gopl.io/ch9/memo5

// A request is a message requesting that the Func be applied to key.
type request struct {
	key      string
	response chan<- result // the client wants a single result
}

type Memo struct{ requests chan request }
// New returns a memoization of f.  Clients must subsequently call Close.
func New(f Func) *Memo {
	memo := &Memo{requests: make(chan request)}
	go memo.server(f)
	return memo
}

func (memo *Memo) Get(key string) (interface{}, error) {
	response := make(chan result)
	memo.requests <- request{key, response}
	res := <-response
	return res.value, res.err
}

func (memo *Memo) Close() { close(memo.requests) }

上面的Get方法,会创建一个response channel,把它放进request结构中,然后发送给monitor goroutine,然后马上又会接收它。

cache变量被限制在了monitor goroutine ``(*Memo).server`中,下面会看到。monitor会在循环中一直读取请求,直到request channel被Close方法关闭。每一个请求都会去查询cache,如果没有找到条目的话,那么就会创建/插入一个新的条目。

func (memo *Memo) server(f Func) {
	cache := make(map[string]*entry)
	for req := range memo.requests {
		e := cache[req.key]
		if e == nil {
			// This is the first request for this key.
			e = &entry{ready: make(chan struct{})}
			cache[req.key] = e
			go e.call(f, req.key) // call f(key)
		}
		go e.deliver(req.response)
	}
}

func (e *entry) call(f Func, key string) {
	// Evaluate the function.
	e.res.value, e.res.err = f(key)
	// Broadcast the ready condition.
	close(e.ready)
}

func (e *entry) deliver(response chan<- result) {
	// Wait for the ready condition.
	<-e.ready
	// Send the result to the client.
	response <- e.res
}

和基于互斥量的版本类似,第一个对某个key的请求需要负责去调用函数f并传入这个key,将结果存在条目里,并关闭ready channel来广播条目的ready消息。使用(*entry).call来完成上述工作。

紧接着对同一个key的请求会发现map中已经有了存在的条目,然后会等待结果变为ready,并将结果从response发送给客户端的goroutien。上述工作是用(*entry).deliver来完成的。对call和deliver方法的调用必须让它们在自己的goroutine中进行以确保monitor goroutines不会因此而被阻塞住而没法处理新的请求。

这个例子说明我们无论用上锁,还是通信来建立并发程序都是可行的。

上面的两种方案并不好说特定情境下哪种更好,不过了解他们还是有价值的。有时候从一种方式切换到另一种可以使你的代码更为简洁。(译注:不是说好的golang推崇通信并发么。)

练习 9.3: 扩展Func类型和(*Memo).Get方法,支持调用方提供一个可选的done channel,使其具备通过该channel来取消整个操作的能力(§8.9)。一个被取消了的Func的调用结果不应该被缓存。

Last updated