Golang 原生支持多线程开发,因此多线程开发是学习 Golang 不可少的环节。

多线程相关语法

多线程 Golang 解决方案

等待子线程结束

众所周知,主线程结束将结束所有子线程。其他语言提供了 waitjoin 等方法等待子线程正常结束,在 Golang 中可以使用 channel 等待线程结束。

未等待结果

在下面这个案例中,你将会看到未等待子线程结束时,子线程未执行任何代码。因为在此情况下,主线程结束将立即杀死子线程。

1
2
3
4
5
6
7
8
9
10
11
func thread() {
for i := 0; i < 5; i++ {
fmt.Println("Thread", i)
time.Sleep(time.Microsecond * 50)
}
}

func main() {
go thread()
fmt.Println("Main ended.")
}
1
Main ended.

等待的结果

通过管道阻塞,我们可以做到使主线程等待子线程结束。

需要注意的是,这个案例中多线程复用无缓冲通道,如果情况更复杂可能需要考虑使用多通道或者有缓冲。通常此案例的方法是足够的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var wait = make(chan any)

func thread() {
defer func() { wait <- nil }()
// 使用 defer 可以避免 忘记发出信号 / 线程提前返回 / 线程异常终止 导致的信号未发出, 信号未发出将导致主线程阻塞
for i := 0; i < 5; i++ {
fmt.Println("Thread", i)
time.Sleep(time.Microsecond * 50)
}
}

func main() {
go thread()
time.Sleep(time.Second)
go thread()
<-wait
<-wait
fmt.Println("Main ended.")
}
1
2
3
4
5
6
7
8
9
10
11
Thread 0
Thread 1
Thread 2
Thread 3
Thread 4
Thread 0
Thread 1
Thread 2
Thread 3
Thread 4
Main ended.

更好的解决方案 sync.WaitGroup

我们也可以使用 WaitGroup,它位于 sync 包中,使用 WaitGroup 相对于上一种方法可以使代码逻辑更加清晰。直接上案例即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
"sync"
"time"
)

var wait sync.WaitGroup // 本质是一个计数器

func hello() {
defer wait.Done() // 计数器 -1
time.Sleep(time.Second)
fmt.Println("Hello World!")
}

func main() {
wait.Add(2) // 计数器 +2
go hello()
go hello()
wait.Wait() // 等待计数器为 0
}

无缓冲线程同步

无缓冲 channel 可以保证两个线程高度同步,但频繁使用该操作会降低多线程的效率,甚至劣于单线程。

线程对话

在线程对话的案例中我们可以使用阻塞来同步消息,使两个线程高度同步,这两个线程总是只有一个线程在工作,因此效率是更劣的,但有时我们需要这样的交互。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
var wait = make(chan any)

func thread1(info []string, from chan string, to chan string) {
defer func() { wait <- nil }()
for _, send := range info {
recv := <-from
fmt.Println("Thread 1 Receive", recv)

fmt.Println("Thread 1 Send ", send)
to <- send

time.Sleep(time.Millisecond * 500)
}
fmt.Println("Thread 1 Finished.")
}

func thread2(info []string, from chan string, to chan string) {
defer func() { wait <- nil }()
for _, send := range info {
fmt.Println("Thread 2 Send ", send)
to <- send

recv := <-from
fmt.Println("Thread 2 Receive", recv)

time.Sleep(time.Millisecond * 500)
}
fmt.Println("Thread 2 Finished.")
}

func main() {
chan1 := make(chan string)
chan2 := make(chan string)
go thread1([]string{
"I'm doing well, thank you! How about you, Jamhus?",
"Well, first I need to finish up some reports for the meeting this afternoon...",
"Thanks for offering, but I think I can handle it.",
}, chan1, chan2)
go thread2([]string{
"Good morning, Mr.Smith! How are you today?",
"I'm great, thanks for asking. So, what's on the agenda for today?",
"Sounds busy! Do you need any help with the reports?",
}, chan2, chan1)
<-wait
<-wait
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Thread 2 Send    Good morning, Mr.Smith! How are you today?
Thread 1 Receive Good morning, Mr.Smith! How are you today?
Thread 1 Send I'm doing well, thank you! How about you, Jamhus?
Thread 2 Receive I'm doing well, thank you! How about you, Jamhus?
Thread 2 Send I'm great, thanks for asking. So, what's on the agenda for today?
Thread 1 Receive I'm great, thanks for asking. So, what's on the agenda for today?
Thread 1 Send Well, first I need to finish up some reports for the meeting this afternoon...
Thread 2 Receive Well, first I need to finish up some reports for the meeting this afternoon...
Thread 2 Send Sounds busy! Do you need any help with the reports?
Thread 1 Receive Sounds busy! Do you need any help with the reports?
Thread 1 Send Thanks for offering, but I think I can handle it.
Thread 2 Receive Thanks for offering, but I think I can handle it.
Thread 2 Finished.
Thread 1 Finished.

缓冲任务分发

使用 channel 进行任务分发可以避免资源的重复占用或任务的重复执行,在下面这个案例,我们通过将消息放入队列用于缓冲,两个线程同时工作。

景区售票模型

在这个案例中,我们实现了两个功能,并且发现了两个疑点:

  • 两个功能:
    • 每张票只销售一次。程序通过 manager 线程向子线程发送 ,从而保证了每张票只销售一次。
    • 发出售罄信号。manager 线程在完成 的生产任务后会设置 noTicketRest 值,如果通道为空且该值为真,salesman 线程可以认为票售罄。
  • 两个疑点:
    • 票的销售不是连续的。我们理想的情况应该是 salesman 线程交替销售连续的票号。我们可以将售票的过程抽象为 取票交易报告 三个过程,而打印结果显示的是 报告 的顺序。由于在此案例中售票过程高速进行,可能会出现 salesman1 取票但尚未完成 交易报告 环节时,salesman2 完成了售票的全过程。至于 salesman2 为何可以晚于 salesman1 开始而提前完成全过程,需用时间片轮转原理解释。
    • 票的断供情况。很显然票的断供是由 manager 引起的,salesman 销售完了缓冲区全部的票而 manager 没有及时补充,解决这个问题可以扩大缓冲区。在凉皮的 Laptop 上,将缓冲区设置为 20 基本上解决了这个问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
var wait = make(chan any)         // 等待线程结束
var tickets = make(chan int, 10) // 售票缓存
var noTicketsRest = false // 票已全入缓存

func manager() {
defer func() { wait <- nil }()
for i := 1; i <= 100; i++ {
tickets <- i // 如果缓存满了此处会阻塞
}
noTicketsRest = true
}

func salesman(id int) {
defer func() { wait <- nil }() // 使用 defer 可以避免忘记发送信号或提前 return
for {
select {
case ticketId := <-tickets:
fmt.Printf("Salesman%v sales ticket No.%v.\n", id, ticketId)
default:
if noTicketsRest {
return // 售票缓存为空且票已全入缓存
}
fmt.Printf("Salesman%v waits for the next ticket.\n", id)
}
// time.Sleep(time.Millisecond)
}
}

func main() {
go salesman(1)
go salesman(2)
go manager()
<-wait
<-wait
<-wait
}

爬虫任务分发

  • 爬虫经常使用多进程、多线程等进行加速。将上面的景区售票模型中的 票Id 改为 url 即是一个爬虫任务分发模型。manager 负责解析并获取 urlsalesman 负责下载 url 并解析,而 salesman 角色通常不是一个而是多个下载器,因为 url 分发的速度通常比解析速度快。
  • 由于 票Id 是连续的,可以使用互斥量实现更优,而爬虫任务分发使用通道实现更佳,除非 url 具有一定逻辑性。

消息广播

开放式广播

  • 开放式广播的特点是高效且节约内存,但无法指定广播对象。
  • 开放式广播有两种实现方式:
    • 使用全局变量存储状态,接收方定时查看变量状态。5.2.3.1 景区售票模型 中的 noTicketRest 即是这种方式。局限性是消息状态有限,消息状态可能未被接收方捕获又被更改,仅用于简单广播。
    • 使用链表存储消息序列,接收方持有指针向后查看链表。局限性是代码实现相对复杂,历史消息不处理会一直占用内存。
通过变量广播
  • 即使用一个公共变量进行消息广播,这个变量通常与一个读写锁绑定。
  • 所谓读写锁允许多线程同时访问数据,但只允许单线程同时访问数据,读与写操作互斥。
通过链表广播

使用链表存储消息序列,接收方持有指针向后查看链表,当发现指针后方出现新元素时获取消息。局限性是代码实现相对复杂,历史消息不处理会一直占用内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"container/list"
"fmt"
"time"
)

var wait = make(chan any)
var msg = list.New()

func reciever(name string) {
defer func() { wait <- nil }()
msgIterator := msg.Front()
for {
if msgIterator != msg.Back() {
msgIterator = msgIterator.Next()
x := msgIterator.Value
if x == nil {
break
}
fmt.Println(name, x)
}
}
}

func sender() {
defer func() { wait <- nil }()
msg.PushBack("123")
msg.PushBack("abc")
msg.PushBack("XYZ")
msg.PushBack(nil)
}

func main() {
msg.PushBack(nil)
go reciever("Thread1")
go sender()
go reciever("Thread2")
time.Sleep(time.Second)
go reciever("Thread3")
<-wait
<-wait
<-wait
<-wait
}
1
2
3
4
5
6
7
8
9
Thread2 123
Thread2 abc
Thread2 XYZ
Thread1 123
Thread1 abc
Thread1 XYZ
Thread3 123
Thread3 abc
Thread3 XYZ

一对一广播

一对一广播为每个线程创建通道,所有发送方消息通常都由广播线程代理发送到所有或指定接收方,这样可以减少通道的数量。消息转发具有一定开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main

import (
"fmt"
"time"
)

type aa struct {
message chan any
name string
}

func (p *aa) Run() {
defer close(p.message)
for {
select {
case ss := <-p.message:
fmt.Println(p.name, ss)
case <-Exitp:
fmt.Println("exit :", p.name)
return
}
}
}

func broadcast() {
for event := range Eventbus { //不建议此种方式接收chan数据,建议用select
for _, v := range Reicevermap {
v.message <- event
}
}
}

var Eventbus chan any
var Exitp chan any
var Reicevermap map[string]*aa

func main() {
Exitp = make(chan any) //控制go程退出
Eventbus = make(chan any, 1)
Reicevermap = make(map[string]*aa)
a := &aa{make(chan any, 3), "aa"} //aa接收者
go a.Run() //aa接收广播线程
Reicevermap[a.name] = a
b := &aa{make(chan any, 3), "bb"} //bb接收者
go b.Run() //bb接收广播线程
Reicevermap[b.name] = b
fmt.Println("111")
go broadcast()
time.Sleep(time.Duration(1) * time.Second)
Eventbus <- "第一个广播"
time.Sleep(time.Duration(3) * time.Second)
close(Exitp)
time.Sleep(time.Duration(3) * time.Second)
}
1
2
3
4
5
111
bb 第一个广播
aa 第一个广播
exit : bb
exit : aa

互斥量

  • 虽然 Golang 官方推荐尽量使用 channel 实现线程间通信,并且互斥量可以使用缓冲为 1 的 channel 实现。但互斥量一般使用 实现,没必要滥用 channel
  • Golang提供了 sync.Mutex 用于维护互斥量。另外,Golang 提供了 atomic 可以更加高效的维护互斥量,但无法维护原子操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"sync"
"time"
)

var (
val int
lock sync.Mutex // 互斥锁
)

func addWithLock() {
for i := 0; i < 2000; i++ {
lock.Lock() // 加锁
val++
lock.Unlock() // 解锁
}
}

func addWithoutLock() {
for i := 0; i < 2000; i++ {
val++
}
}

func main() {
val = 0
for i := 0; i < 5; i++ {
go addWithoutLock()
}
time.Sleep(time.Second)
fmt.Printf("AddWithoutLock: %d\n", val) // AddWithoutLock: 8512

val = 0
for i := 0; i < 5; i++ {
go addWithLock()
}
time.Sleep(time.Second)
fmt.Printf("AddWithLock: %d\n", val) // AddWithLock: 10000
}

单通道多线程读写

  • 单通道一读一写:无缓冲线程同步 中的案例即为这种模式。

  • 单通道多读一写:等待子线程结束 即为这种模式。

  • 单通道一读多写:缓冲任务分发 中的两个案例即为这种模式。

  • 单通道多读多写:最复杂且难以控制的模式,应该尽量避免。难以避免需要谨慎使用,也可考虑使用中转线程对通道内消息筛选后转发。此处不作案例。