Goroutine 最佳实践

2021-09-08 Golang Go 进阶

# 一、不要做一个旁观者

一般情况下,不要让主进程成为一个旁观者,明明可以干活,但是最后使用了一个 select 在那儿空跑。

# 1.1 原始代码

首先来看下面的代码,这里使用一个 goroutine 进行 Http 端口的监听,为了防止主线程退出,在 main 函数中使用 select {} 保持永远阻塞。

package main

import (
	"fmt"
	"log"
	"net/http"
)

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello World")
	})
	go func() {
		if err := http.ListenAndServe(":8080", nil); err != nil {
			log.Fatal(err)
		}
	}()

	select {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 1.2 优化代码

通常情况下,如果你的 goroutine 在从另一个 goroutine 获得结果之前无法取得进展,那么你自己去做这项工作比委托它 go func() 更简单。这可以消除将结果从 goroutine 返回到其启动器所需的大量状态跟踪和 chan 操作。我们对上面的代码进行优化,如下所示:

package main

import (
	"fmt"
	"log"
	"net/http"
)

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello World")
	})
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatal(err)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 二、将选择权留给调用者

# 2.1 原始代码

package main

import (
	"fmt"
	"log"
	"net/http"
)

func serve()  {
	go func() {
		http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
			fmt.Fprintln(w, "Hello World")
		})
		if err := http.ListenAndServe(":8080", nil); err != nil {
			log.Fatal(err)
		}
	}()
	select {}
}

func main() {
	serve()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 2.2 优化代码

上面代码在 serve 中开启了一个 gorountine,如果 server 是在其他包里面,如果没有特殊说明,你知道这是一个异步调用么?因此,把是否并发的选择权交给调用者,而不是自己就直接悄悄的用上了 goroutine。优化后代码如下:

package main

import (
	"fmt"
	"log"
	"net/http"
)

func serve()  {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello World")
	})
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatal(err)
	}
}

func main() {
	go serve()
	select {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 三、不要创建一个不知道什么时候会退出的 goroutine

# 3.1 原始代码

下面代码中,8080 端口代表正常的 API 请求,8001 端口用于 debug,为了保证两个端口同时监听,这里将 debug 端口的监听委派出去,让其在后台执行。

package main

import (
	"fmt"
	"net/http"
)

func main() {
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello World")
	})
	go http.ListenAndServe(":8001", http.DefaultServeMux)
	http.ListenAndServe(":8080", mux)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

但这里会存在一个问题,那就是我们不知道这个 goroutine 什么时候会结束。实际场景中,如果监听 debug 端口的 goroutine 异常退出,此时 API 请求还可以正常处理,当我们需要诊断问题时就会出问题,我们通常希望,这两个服务有一个挂了,整个程序应该退出。因此,我们需要有一种机制能够管住它的生命周期。

# 3.2 优化代码 - 1

首先,我们通过 serveApp 和 serveDebug 将处理程序分解为各自的函数,让它们与 main.main 解耦,使 main 函数的主干逻辑看起来比较简单,代码如下:

package main

import (
	"fmt"
	"net/http"
)

func serveApp() {
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello World")
	})
	http.ListenAndServe(":8080", mux)
}

func serveDebug() {
	http.ListenAndServe(":8001", http.DefaultServeMux)
}

func main() {
	go serveDebug()
	serveApp()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

这里仍会存在原始代码中提到的问题:

  • 如果 serveApp 返回,则 main.main 将返回导致程序关闭,只能靠类似 supervisor 进程管理来重新启动;
  • 然而,serveDebug 是在一个单独的 goroutine 中运行的,如果它返回,那么所在的 goroutine 将退出,而程序的其余部分继续运行。

# 3.3 优化代码 - 2

下面我们将两个 server 都单独使用 goroutine 运行,当运行报错时直接 log.Fatal(err) 退出程序,具体代码如下:

package main

import (
	"fmt"
	"log"
	"net/http"
)

func serveApp() {
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello World")
	})
	if err := http.ListenAndServe(":8080", mux); err != nil {
		log.Fatal(err)
	}
}

func serveDebug() {
	if err := http.ListenAndServe(":8001", http.DefaultServeMux); err != nil {
		log.Fatal(err)
	}
}

func main() {
	go serveDebug()
	go serveApp()
	select {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

这样可以解决之前提到的保证同时退出的问题,但是 log.Fatal 这种写法不够友好,它调用了 os.Exit,会无条件终止程序,会使 defer 无法正常运行,导致可能会有些资源无法安全释放。

注意

通常,我们只在 main 函数或者 init 函数中使用 log.Fatal

# 3.4 优化代码 - 3

下面,我们通过 channel 来巧妙实现服务的平滑退出,代码如下:

package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"net/http"
	"time"
)

func serveApp(stop <-chan struct{}) error {
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello World")
	})
	return server(mux, ":8080", stop)
}

func serveDebug(stop <-chan struct{}) error {
	// 注意这里主要是为了模拟服务意外退出,用于验证一个服务退出,其他服务同时退出的场景
	go func() {
		server(http.DefaultServeMux, ":8001", stop)
	}()

	time.Sleep(5 * time.Second)
	return errors.New("mock debug error")
}

func server(handler http.Handler, addr string, stop <-chan struct{}) error {
	s := http.Server{
		Handler: handler,
		Addr:    addr,
	}

	// 这个 goroutine 我们可以控制退出,因为只要 stop 这个 channel close 或者是写入数据,这里就会退出
	// 同时因为调用了 s.Shutdown 调用之后,这个函数启动的 http server 也会优雅退出
	go func() {
		<-stop
		log.Printf("server will exiting, addr: %s", addr)
		s.Shutdown(context.Background())
	}()

	return s.ListenAndServe()
}

func main() {
	// 用于监听服务退出
	done := make(chan error, 2)

	// 用于控制服务退出,传入同一个 stop,做到只要有一个服务退出了那么另外一个服务也会随之退出
	stop := make(chan struct{})

	// debug 服务
	go func() {
		done <- serveDebug(stop)
	}()
	// 主服务
	go func() {
		done <- serveApp(stop)
	}()

	// stopped 用于判断当前 stop 的状态
	var stopped bool

	// 这里循环读取 done 这个 channel
	// 只要有一个退出了,我们就关闭 stop channel
	for i:= 0; i < cap(done); i++ {
		if err := <-done; err != nil {
			fmt.Printf("error: %v\n", err)
		}
		if !stopped {
			stopped = true
			close(stop)
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

启动后 5s,服务全部平滑退出,运行结果如下:

error: mock debug error

2021/09/08 02:02:21 server will exiting, addr: :8001

2021/09/08 02:02:21 server will exiting, addr: :8080

error: http: Server closed

# 四、「goroutine 泄漏」不要创建一个永远都无法退出的 goroutine

在下面这个例子中,goroutine 泄漏可以在 code review 快速识别出来。不幸的是,生产代码中的 goroutine 泄漏通常更难找到。

func leak()  {
	ch := make(chan int)

	go func() {
		val := <-ch
		fmt.Println("We received a value:", val)
	}()
}
1
2
3
4
5
6
7
8

# 五、「goroutine 泄漏」确保创建出的 goroutine 的工作已经完成

# 5.1 原始代码

我们通常使用服务端埋点来跟踪记录一些事件,一种写法如下:

package main

import (
	"log"
	"net/http"
	"time"
)

type Tracker struct{}

func (t *Tracker) Event(data string) {
	time.Sleep(time.Millisecond)
	log.Println(data)
}

type App struct {
	track Tracker
}

func (a *App) Handle(w http.ResponseWriter, r http.Request) {
	// do some work...

	w.WriteHeader(http.StatusCreated)

	// BUG:这里不能管控 goroutine 的生命周期
	go a.track.Event("this event")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

上面的代码无法保证创建的 goroutine 生命周期管理,可能导致的问题:在服务关闭时候,有一些未处理完的埋点事件丢失。

# 5.2 优化代码 - 1

下面使用 sync.WaitGroup 来追踪每一个创建的 goroutine。

package main

import (
	"log"
	"net/http"
	"sync"
	"time"
)

func main() {
	var a App

	// handle ...

	a.track.Shutdown()
}

type Tracker struct{
	wg sync.WaitGroup
}

func (t *Tracker) Event(data string) {
	t.wg.Add(1)

	go func() {
		defer t.wg.Done()

		time.Sleep(time.Millisecond)
		log.Println(data)
	}()
}

func (t *Tracker) Shutdown() {
	t.wg.Wait()
}

type App struct {
	track Tracker
}

func (a *App) Handle(w http.ResponseWriter, r http.Request) {
	// do some work...

	w.WriteHeader(http.StatusCreated)

	a.track.Event("this event")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

上面代码的关键点如下:

  • 改造 Track 的 Event 方法,每次调用使用 WaitGroup 计数;
  • main 函数最后调用 Track 的 Shutdown 方法,保证 Event 创建的 goroutine 全部运行结束。

# 5.3 优化代码 - 2

上面的代码还存在一个问题,如果 Event 运行时间过长,可能导致 Shutdown 方法被阻塞,甚至永远不能退出,因此我们可以将 wg.Wait() 托管到其他 goroutine,并使用 context 来处理超时。具体代码如下:

package main

import (
	"context"
	"errors"
	"log"
	"net/http"
	"sync"
	"time"
)

func main() {
	var a App

	// handle ...

	const timeout = 3 * time.Second
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	if err := a.track.Shutdown(ctx); err != nil {
		log.Println(err)
	}
}

type Tracker struct{
	wg sync.WaitGroup
}

func (t *Tracker) Event(data string) {
	t.wg.Add(1)

	go func() {
		defer t.wg.Done()

		time.Sleep(time.Millisecond)
		log.Println(data)
	}()
}

func (t *Tracker) Shutdown(ctx context.Context) error {
	ch := make(chan struct{})

	go func() {
		t.wg.Wait()
		close(ch)
	}()

	select {
	case <-ch:
		return nil
	case <-ctx.Done():
		return errors.New("timeout")
	}
}

type App struct {
	track Tracker
}

func (a *App) Handle(w http.ResponseWriter, r http.Request) {
	// do some work...

	w.WriteHeader(http.StatusCreated)

	a.track.Event("this event")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

# 5.3 优化代码 - 3

上面的两种优化代码都存在同一个问题,就是每次调用 Event 方法都会开启一个 goroutine 去处理,由于埋点数据的调用频率高,这样就会创建大量的 goroutine 来处理任务,代价过大。下面再对其进行优化:

package main

import (
	"context"
	"fmt"
	"time"
)

func main() {
	tr := NewTracker()
	go tr.Run()
	_ = tr.Event(context.Background(), "test")
	_ = tr.Event(context.Background(), "test")
	_ = tr.Event(context.Background(), "test")
	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
	defer cancel()
	tr.Shutdown(ctx)
}

func NewTracker() *Tracker {
	return &Tracker{
		ch: make(chan string, 10),
	}
}

type Tracker struct {
	ch   chan string
	stop chan struct{}
}

func (t *Tracker) Event(ctx context.Context, data string) error {
	select {
	case t.ch <- data:
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}

func (t *Tracker) Run() {
	for data := range t.ch {
		time.Sleep(1 * time.Second)
		fmt.Println(data)
	}
	t.stop <- struct{}{}
}

func (t *Tracker) Shutdown(ctx context.Context) {
	close(t.ch)
	select {
	case <-t.stop:
	case <-ctx.Done():
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

这段代码的关键思想就是引入了 channel 来处理并发,即:使用一个后台的 goroutine 去消费 channel 中的消息,而不是每一个消息都去创建一个 goroutine。

# 六、总结

  • 把并发扔给调用者,即调用者决定后台执行还是前台执行;
  • 搞清楚 goroutine 什么时候退出,要管控它的生命周期;
  • 能够控制 goroutine 什么时候退出,通过 channel、context 等都可以。
Last Updated: 2023-01-28 4:31:25