Go协程学习

  1. 1. 0x01 Go协程
    1. 1.1. Go协程概念
    2. 1.2. Go协程基础
  2. 2. 0x2 通道
    1. 2.1. Go协程间的通道
    2. 2.2. 通道阻塞
    3. 2.3. 带缓冲通道
    4. 2.4. 信号量模式
    5. 2.5. 通道方向
    6. 2.6. 通道关闭
  3. 3. 0x03 Go协程常见操作
    1. 3.1. select{}
    2. 3.2. 限制同时处理的请求数
    3. 3.3. 通道 “实现” “对象锁” 效果
    4. 3.4. 一种并行结构

0x01 Go协程

Go协程概念

对于协程的解释:协程,英文Coroutines,是一种基于线程之上,但又比线程更加轻量级的存在,这种由程序员自己写程序来管理的轻量级线程叫做『用户空间线程』,具有对内核来说不可见的特性。

=>

  1. 协程!=线程,协程是比线程更轻量的存在.
  2. 协程一般是用户自主开辟的异步任务.
  3. 一个线程也可以拥有多个协程

为什么需要协程?

  • 等待IO时,线程根本没有利用CPU去做运算,而是处于空转状态.
  • 协程的目的就是当出现长时间的I/O操作时,通过让出目前的协程调度,执行下一个任务的方式,来消除ContextSwitch上的开销。

协程的特点:

  • 线程的切换由操作系统负责调度,协程由用户自己进行调度,因此减少了上下文切换,提高了效率
  • 线程的默认Stack大小是1M,而协程更轻量,接近1K。因此可以在相同的内存中开启更多的协程。
  • 由于在同一个线程上,因此可以避免竞争关系而使用锁。
  • 适用于被阻塞的,且需要大量并发的场景。但不适用于大量计算的多线程,遇到此种情况,更好实用线程去解决。

goroutine – Go语言中的协程

  • Go 语言为构建并发程序的基本代码块是协程 (goroutine) 与通道 (channel)
  • 强调: 在Go协程中,不要通过共享内存来通信,而通过通信来共享内存

Go协程特殊之处:

  • Go的协程是根据一个或多个线程的可用性,映射(多路复用,执行于)在他们之上的;协程调度器在 Go 运行时很好的完成了这个工作
  • 协程工作在相同的地址空间中,所以共享内存的方式一定是同步的
  • 协程调度器

Go协程基础

使用 go关键词 启动一个新的协程. 对于main()函数本身它也是一个协程

Demo:

1
2
// go + function
go longWait()

在进行密集运算时,可以使用runtime.Gosched(),允许允许其它协程->会不时让出处理器和自动恢复.使用Gosched()可以使计算均匀分布

同样可以使用runtime.Goexit()使得协程停止.

Go的协程是并发的,也是可以并行的. 可以设置GOMAXPROCS,调整最多使用的操作系统线程数量

代码中可以通过如下代码进行设置:

1
2
3
var numCores = flag.Int("n", 2, "number of CPU cores to use")
flag.Parse()
runtime.GOMAXPROCS(*numCores)

假设 n 是机器上处理器或者核心的数量,若GOMAXPROCS>=n,则接下来协程会被分割(分散)到 n 个处理器上。而不是GOMAXPROCS。

GOMAXPROCS不是越大越好, 法则: 对于 n 个核心的情况设置 GOMAXPROCS 为 n-1 以获得最佳性能

0x2 通道

Go协程间的通道

Go中协程之间的同步主要依赖于通道,而不是共享变量.(可以使用共享变量,但是需要加锁.Go不提倡这种做法,直接使用通道即可).

通道(channel)是Go中的一种特殊类型.它是一种可以发送数据的管道,且一般只适用于固定类型数据传输.

通道中的数据本身实现了”在任何给定时间,一个数据被设计为只有一个协程可以对其访问”,所以不需要考虑数据竞争现象.

另外:通道本身就是一个队列,是FIFO的结构.

通道的声明:

1
2
3
4
var identifier chan datatype
// demo:
var chan1 chan int
vat chan2 chan interface{}

同时通道是一个引用类型,需要使用make为它创建一个实际的引用: make(chan datatype)

1
2
3
4
var ch1 chan string
ch1 = make(chan string)
// Other:
funcChan := make(chan func())

通道操作符: <- 针对通道的操作符

数据流入通道: chan1 <- int1

从通道流出数据:int2 = <- chan2

也可以使用这种for循环的方法从通道取出数据

1
2
3
>for v := range ch {
fmt.Printf("The value is %v\n", v)
>}

<- ch返回并丢弃通道中的下一个值:

1
2
3
if <- ch != 1000{
...
}

通道的发送和接收都是原子操作

通道阻塞

通道具有同步且无缓冲的特性:

  • 必须要一个接收者准备好接收通道的数据然后发送者可以直接把数据发送给接收者
  • 所以通道的发送/接收操作在对方准备好之前是阻塞的

阻塞特性可以很好的帮助我们实现Go之前协程的同步

一个比较好的理解阻塞的例子:

1
2
3
4
5
6
7
8
9
10
11
func main() {
c := make(chan int)
go func() {
time.Sleep(15 * 1e9)
x := <-c
fmt.Println("received", x)
}()
fmt.Println("sending", 10)
c <- 10
fmt.Println("sent", 10)
}
1
2
3
4
5
Output:
sending 10
(15 s later):
received 10
sent 10

阻塞导致的死锁: (上述代码稍微修改一下)

1
2
3
4
5
6
7
8
9
10
func main() {
c := make(chan int)
fmt.Println("receive!")
x := <-c
fmt.Println("received", x)
fmt.Println("sending", 10)
time.Sleep(15 * 1e9)
c <- 10
fmt.Println("sent", 10)
}

对于同一代码片段: (协程可调节)

  • 对于同一个通道,发送操作(协程或者函数中的),在接收者准备好之前是阻塞的
  • 对于同一个通道,接收操作是阻塞的(协程或函数中的),直到发送者可用

-> 顺序,必须先创建Sender再创Receiver.
-> 同时,通道的关闭也只需要关闭Sender即可,Receiver不需要执行关闭操作

带缓冲通道

顾名思义,带缓冲的通道

1
2
buf := 100 // 通道可以同时容纳的元素个数
ch1 := make(chan int, buf)

如:

1
2
3
4
5
6
7
8
9
10
11
func main() {
c := make(chan int,100) // different here!
go func() {
time.Sleep(15 * 1e9)
x := <-c
fmt.Println("received", x)
}()
fmt.Println("sending", 10)
c <- 10
fmt.Println("sent", 10)
}

代码输出:

1
2
3
4
sending 10
sent 10

Process finished with the exit code 0

未received代码即结束

但从代码逻辑上,依然要先创建Sender再创Receiver.

信号量模式

通过信号量同步进程: 在协程中通过通道发送信号告知处理已经完成

如:

1
2
3
4
5
6
7
8
9
10
11
done := make(chan bool)
// doSort is a lambda function, so a closure which knows the channel done:
doSort := func(s []int){
sort(s)
done <- true
}
i := pivot(s)
go doSort(s[:i])
go doSort(s[i:])
<-done
<-done
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Empty interface {}
var empty Empty
...
data := make([]float64, N)
res := make([]float64, N)
sem := make(chan Empty, N)
...
for i, xi := range data {
go func (i int, xi float64) {
res[i] = doSomething(i, xi)
sem <- empty
} (i, xi)
}
// wait for goroutines to finish
for i := 0; i < N; i++ { <-sem }

通道方向

通道类型可以用注解来表示它只发送或者只接收:chan<- <-chan

1
2
var send_only chan<- int 		// channel can only receive data
var recv_only <-chan int // channel can only send data

通道关闭

只有发送者需要关闭通道,接收者永远不会需要。

close(chan1)

1
2
ch := make(chan float64)
defer close(ch)

判断通道是否关闭?

1
v, ok := <-ch   // ok is true if v received value

检测当前通道是否阻塞? -> 需要使用到select{}

1
2
3
4
5
6
7
8
9
10
select {
case v, ok := <-ch:
if ok {
process(v)
} else {
fmt.Println("The channel is closed")
}
default:
fmt.Println("The channel is blocked")
}

0x03 Go协程常见操作

select{}

你准备好了吗?你准备好了吗?你准备好了吗?你准备好了吗?你准备好了吗? …

select{}是一个监听操作,会监听case中的通道是否存在输出:

1
2
3
4
5
6
7
8
9
10
11
12
select {
case u:= <- ch1:
...
break
case v:= <- ch2:
...
...
break
default: // no value ready to be received
...
// TODO
}
  • 如果所有的通道都阻塞了,会等待直到其中一个可以处理
  • 如果多个通道可以处理,会随机选择一个
  • 没有通道可用则执行default操作

限制同时处理的请求数

即限制最大并发数量(同时处理请求的最大数量),可以通过使用带缓冲区的通道来实现。

根据带缓冲区的通道实现并发池。

demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"fmt"
"time"
)

const MAXREQS = 50
var start time.Time
var sem = make(chan int, MAXREQS) // 并发数量

type Request struct {
i int
replyc chan int
}

func process(r *Request) {
end := time.Now()
time.Sleep(1e7)
delta := end.Sub(start)
fmt.Printf("i: %d longCalculation took this amount of time: %s\n", r.i,delta)
}

func handle(r *Request) {
sem <- 1 // doesn't matter what we put in it
process(r)
<-sem // one empty place in the buffer: the next request can start
}

func main() {
// 首先装载任务
service := make(chan *Request,2000)
for i:=0;i<2000;i++{
service <- &Request{
i:i,
}
}
start = time.Now()
for {
request := <-service
go handle(request)
}
time.Sleep(100e9)
}
1
2
3
4
5
MAXREQS与任务运行时间:
1-> 15.5300921s
8-> 3.8846014s
50-> 604.985ms
500-> 49.39ms

代码运行会存在死锁,不过不影响测试

(也可以在代码中调整runtime.GOMAXPROCS(n)来调整占用的处理器数量.)

通道 “实现” “对象锁” 效果

即通过通道控制对对象某个属性的修改操作.

在初始化对象的时候,启动一个该对象的守护协程. 协程中不停执行成员通道中的处理函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Person struct {
Name string
salary float64
chF chan func()
}

func NewPerson(name string, salary float64) *Person {
p := &Person{name, salary, make(chan func())}
go p.backend() // 后台协程
return p
}

func (p *Person) backend() {
for f := range p.chF {
f() // 守护进程不停执行通道中传来的函数
}
}

处理函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Set salary.
func (p *Person) SetSalary(sal float64) {
p.chF <- func() { p.salary = sal }
}

// Retrieve salary.
func (p *Person) Salary() float64 {
fChan := make(chan float64)
p.chF <- func() { fChan <- p.salary }
return <-fChan
}

func (p *Person) String() string {
return "Person - name is: " + p.Name + " - salary is: " + strconv.FormatFloat(p.Salary(), 'f', 2, 64)
}

代码demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
"fmt"
"strconv"
)
// 通过通道来实现类似”对象锁“的效果,即一个属性同时只有一个协程可以进行写操作

type Person struct {
Name string
salary float64
chF chan func()
}

func NewPerson(name string, salary float64) *Person {
p := &Person{name, salary, make(chan func())}
go p.backend() // 后台协程
return p
}

func (p *Person) backend() {
for f := range p.chF {
f() // 守护进程不停执行通道中传来的函数
}
}

// Set salary.
func (p *Person) SetSalary(sal float64) {
p.chF <- func() { p.salary = sal }
}

// Retrieve salary.
func (p *Person) Salary() float64 {
fChan := make(chan float64)
p.chF <- func() { fChan <- p.salary }
return <-fChan
}

func (p *Person) String() string {
return "Person - name is: " + p.Name + " - salary is: " + strconv.FormatFloat(p.Salary(), 'f', 2, 64)
}

func main() {
bs := NewPerson("Smith Bill", 2500.5)
fmt.Println(bs)
bs.SetSalary(4000.25)
fmt.Println("Salary changed:")
fmt.Println(bs)
}

一种并行结构

Core把任务分发给Handler处理,然后结果发送到通道,统一交给ResultHandler处理收集结果.

Channels中寻找可用通道可以通过select{}

image-20211214182149612

(在Handler和ResultHandler IO操作都较多的情况下以上结构运行效率会高一些)