Goroutine 最佳实践
# 一、不要做一个旁观者
一般情况下,不要让主进程成为一个旁观者,明明可以干活,但是最后使用了一个 select 在那儿空跑。
# 1.1 原始代码
首先来看下面的代码,这里使用一个 goroutine 进行 Http 端口的监听,为了防止主线程退出,在 main 函数中使用 select {}
保持永远阻塞。
package main
import (
"fmt"
"log"
"net/http"
)
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello World")
})
go func() {
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}()
select {}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 1.2 优化代码
通常情况下,如果你的 goroutine 在从另一个 goroutine 获得结果之前无法取得进展,那么你自己去做这项工作比委托它 go func()
更简单。这可以消除将结果从 goroutine 返回到其启动器所需的大量状态跟踪和 chan 操作。我们对上面的代码进行优化,如下所示:
package main
import (
"fmt"
"log"
"net/http"
)
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello World")
})
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 二、将选择权留给调用者
# 2.1 原始代码
package main
import (
"fmt"
"log"
"net/http"
)
func serve() {
go func() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello World")
})
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}()
select {}
}
func main() {
serve()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 2.2 优化代码
上面代码在 serve 中开启了一个 gorountine,如果 server 是在其他包里面,如果没有特殊说明,你知道这是一个异步调用么?因此,把是否并发的选择权交给调用者,而不是自己就直接悄悄的用上了 goroutine。优化后代码如下:
package main
import (
"fmt"
"log"
"net/http"
)
func serve() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello World")
})
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}
func main() {
go serve()
select {}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 三、不要创建一个不知道什么时候会退出的 goroutine
# 3.1 原始代码
下面代码中,8080 端口代表正常的 API 请求,8001 端口用于 debug,为了保证两个端口同时监听,这里将 debug 端口的监听委派出去,让其在后台执行。
package main
import (
"fmt"
"net/http"
)
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello World")
})
go http.ListenAndServe(":8001", http.DefaultServeMux)
http.ListenAndServe(":8080", mux)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
但这里会存在一个问题,那就是我们不知道这个 goroutine 什么时候会结束。实际场景中,如果监听 debug 端口的 goroutine 异常退出,此时 API 请求还可以正常处理,当我们需要诊断问题时就会出问题,我们通常希望,这两个服务有一个挂了,整个程序应该退出。因此,我们需要有一种机制能够管住它的生命周期。
# 3.2 优化代码 - 1
首先,我们通过 serveApp 和 serveDebug 将处理程序分解为各自的函数,让它们与 main.main 解耦,使 main 函数的主干逻辑看起来比较简单,代码如下:
package main
import (
"fmt"
"net/http"
)
func serveApp() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello World")
})
http.ListenAndServe(":8080", mux)
}
func serveDebug() {
http.ListenAndServe(":8001", http.DefaultServeMux)
}
func main() {
go serveDebug()
serveApp()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
这里仍会存在原始代码中提到的问题:
- 如果 serveApp 返回,则 main.main 将返回导致程序关闭,只能靠类似 supervisor 进程管理来重新启动;
- 然而,serveDebug 是在一个单独的 goroutine 中运行的,如果它返回,那么所在的 goroutine 将退出,而程序的其余部分继续运行。
# 3.3 优化代码 - 2
下面我们将两个 server 都单独使用 goroutine 运行,当运行报错时直接 log.Fatal(err)
退出程序,具体代码如下:
package main
import (
"fmt"
"log"
"net/http"
)
func serveApp() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello World")
})
if err := http.ListenAndServe(":8080", mux); err != nil {
log.Fatal(err)
}
}
func serveDebug() {
if err := http.ListenAndServe(":8001", http.DefaultServeMux); err != nil {
log.Fatal(err)
}
}
func main() {
go serveDebug()
go serveApp()
select {}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
这样可以解决之前提到的保证同时退出的问题,但是 log.Fatal
这种写法不够友好,它调用了 os.Exit,会无条件终止程序,会使 defer 无法正常运行,导致可能会有些资源无法安全释放。
注意
通常,我们只在 main 函数或者 init 函数中使用 log.Fatal
。
# 3.4 优化代码 - 3
下面,我们通过 channel 来巧妙实现服务的平滑退出,代码如下:
package main
import (
"context"
"errors"
"fmt"
"log"
"net/http"
"time"
)
func serveApp(stop <-chan struct{}) error {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello World")
})
return server(mux, ":8080", stop)
}
func serveDebug(stop <-chan struct{}) error {
// 注意这里主要是为了模拟服务意外退出,用于验证一个服务退出,其他服务同时退出的场景
go func() {
server(http.DefaultServeMux, ":8001", stop)
}()
time.Sleep(5 * time.Second)
return errors.New("mock debug error")
}
func server(handler http.Handler, addr string, stop <-chan struct{}) error {
s := http.Server{
Handler: handler,
Addr: addr,
}
// 这个 goroutine 我们可以控制退出,因为只要 stop 这个 channel close 或者是写入数据,这里就会退出
// 同时因为调用了 s.Shutdown 调用之后,这个函数启动的 http server 也会优雅退出
go func() {
<-stop
log.Printf("server will exiting, addr: %s", addr)
s.Shutdown(context.Background())
}()
return s.ListenAndServe()
}
func main() {
// 用于监听服务退出
done := make(chan error, 2)
// 用于控制服务退出,传入同一个 stop,做到只要有一个服务退出了那么另外一个服务也会随之退出
stop := make(chan struct{})
// debug 服务
go func() {
done <- serveDebug(stop)
}()
// 主服务
go func() {
done <- serveApp(stop)
}()
// stopped 用于判断当前 stop 的状态
var stopped bool
// 这里循环读取 done 这个 channel
// 只要有一个退出了,我们就关闭 stop channel
for i:= 0; i < cap(done); i++ {
if err := <-done; err != nil {
fmt.Printf("error: %v\n", err)
}
if !stopped {
stopped = true
close(stop)
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
启动后 5s,服务全部平滑退出,运行结果如下:
error: mock debug error
2021/09/08 02:02:21 server will exiting, addr: :8001
2021/09/08 02:02:21 server will exiting, addr: :8080
error: http: Server closed
# 四、「goroutine 泄漏」不要创建一个永远都无法退出的 goroutine
在下面这个例子中,goroutine 泄漏可以在 code review 快速识别出来。不幸的是,生产代码中的 goroutine 泄漏通常更难找到。
func leak() {
ch := make(chan int)
go func() {
val := <-ch
fmt.Println("We received a value:", val)
}()
}
2
3
4
5
6
7
8
# 五、「goroutine 泄漏」确保创建出的 goroutine 的工作已经完成
# 5.1 原始代码
我们通常使用服务端埋点来跟踪记录一些事件,一种写法如下:
package main
import (
"log"
"net/http"
"time"
)
type Tracker struct{}
func (t *Tracker) Event(data string) {
time.Sleep(time.Millisecond)
log.Println(data)
}
type App struct {
track Tracker
}
func (a *App) Handle(w http.ResponseWriter, r http.Request) {
// do some work...
w.WriteHeader(http.StatusCreated)
// BUG:这里不能管控 goroutine 的生命周期
go a.track.Event("this event")
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
上面的代码无法保证创建的 goroutine 生命周期管理,可能导致的问题:在服务关闭时候,有一些未处理完的埋点事件丢失。
# 5.2 优化代码 - 1
下面使用 sync.WaitGroup
来追踪每一个创建的 goroutine。
package main
import (
"log"
"net/http"
"sync"
"time"
)
func main() {
var a App
// handle ...
a.track.Shutdown()
}
type Tracker struct{
wg sync.WaitGroup
}
func (t *Tracker) Event(data string) {
t.wg.Add(1)
go func() {
defer t.wg.Done()
time.Sleep(time.Millisecond)
log.Println(data)
}()
}
func (t *Tracker) Shutdown() {
t.wg.Wait()
}
type App struct {
track Tracker
}
func (a *App) Handle(w http.ResponseWriter, r http.Request) {
// do some work...
w.WriteHeader(http.StatusCreated)
a.track.Event("this event")
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
上面代码的关键点如下:
- 改造 Track 的 Event 方法,每次调用使用 WaitGroup 计数;
- main 函数最后调用 Track 的 Shutdown 方法,保证 Event 创建的 goroutine 全部运行结束。
# 5.3 优化代码 - 2
上面的代码还存在一个问题,如果 Event 运行时间过长,可能导致 Shutdown 方法被阻塞,甚至永远不能退出,因此我们可以将 wg.Wait()
托管到其他 goroutine,并使用 context 来处理超时。具体代码如下:
package main
import (
"context"
"errors"
"log"
"net/http"
"sync"
"time"
)
func main() {
var a App
// handle ...
const timeout = 3 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := a.track.Shutdown(ctx); err != nil {
log.Println(err)
}
}
type Tracker struct{
wg sync.WaitGroup
}
func (t *Tracker) Event(data string) {
t.wg.Add(1)
go func() {
defer t.wg.Done()
time.Sleep(time.Millisecond)
log.Println(data)
}()
}
func (t *Tracker) Shutdown(ctx context.Context) error {
ch := make(chan struct{})
go func() {
t.wg.Wait()
close(ch)
}()
select {
case <-ch:
return nil
case <-ctx.Done():
return errors.New("timeout")
}
}
type App struct {
track Tracker
}
func (a *App) Handle(w http.ResponseWriter, r http.Request) {
// do some work...
w.WriteHeader(http.StatusCreated)
a.track.Event("this event")
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# 5.3 优化代码 - 3
上面的两种优化代码都存在同一个问题,就是每次调用 Event 方法都会开启一个 goroutine 去处理,由于埋点数据的调用频率高,这样就会创建大量的 goroutine 来处理任务,代价过大。下面再对其进行优化:
package main
import (
"context"
"fmt"
"time"
)
func main() {
tr := NewTracker()
go tr.Run()
_ = tr.Event(context.Background(), "test")
_ = tr.Event(context.Background(), "test")
_ = tr.Event(context.Background(), "test")
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
defer cancel()
tr.Shutdown(ctx)
}
func NewTracker() *Tracker {
return &Tracker{
ch: make(chan string, 10),
}
}
type Tracker struct {
ch chan string
stop chan struct{}
}
func (t *Tracker) Event(ctx context.Context, data string) error {
select {
case t.ch <- data:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (t *Tracker) Run() {
for data := range t.ch {
time.Sleep(1 * time.Second)
fmt.Println(data)
}
t.stop <- struct{}{}
}
func (t *Tracker) Shutdown(ctx context.Context) {
close(t.ch)
select {
case <-t.stop:
case <-ctx.Done():
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
这段代码的关键思想就是引入了 channel 来处理并发,即:使用一个后台的 goroutine 去消费 channel 中的消息,而不是每一个消息都去创建一个 goroutine。
# 六、总结
- 把并发扔给调用者,即调用者决定后台执行还是前台执行;
- 搞清楚 goroutine 什么时候退出,要管控它的生命周期;
- 能够控制 goroutine 什么时候退出,通过 channel、context 等都可以。