nsq阅读(3)——nsqd
基于nsq v1.3.0
执行流程
在第一篇中,主要对nsq中涉及到的主要组件,以及数据流在这些组件中的流动进行了简单说明。在本篇中,我们跟随上一篇最后给出的demo,对应到nsqd的代码中,看一下执行流程是怎么样的。
上篇最后我们向4151端口发起http请求来发送消息:
1
curl -d "test message" "http://127.0.0.1:4151/pub?topic=test_topic"
这个调用的是http接口/pub
,对应nsqd源码http.go
中的doPUB
函数:
- 调用
nsqd.GetTopic
获取/创建topic - 调用
NewMesssage
创建message - 调用
topic.PutMessage
将消息投递到topic中
创建topic的时候会启动topic的messagePump协程,负责处理topic的相关事件,包括:
- 处理内存/磁盘消息
- 监听channel数量变更
- 暂停topic
- 优雅退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
func (t *Topic) messagePump() {
var msg *Message
var buf []byte
var err error
var chans []*Channel
var memoryMsgChan chan *Message
var backendChan <-chan []byte
...
for {
select {
case msg = <-memoryMsgChan:
// 获取到内存队列的消息
case buf = <-backendChan:
// 获取到磁盘队列的消息,需要解码
msg, err = decodeMessage(buf)
...
case <-t.channelUpdateChan:
// channel数量发生变化
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.pauseChan:
// 暂停channel
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.exitChan:
// 优雅退出
goto exit
}
// 将消息投递到每个channel
for i, channel := range chans {
// 第一个channel直接用msg
// 其它channel拷贝msg
chanMsg := msg
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
// 投递延时消息,用优先队列维护
if chanMsg.deferred != 0 {
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
// 投递普通消息,放到channel的内存队列或者磁盘队列
err := channel.PutMessage(chanMsg)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}
exit:
t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}
以上是topic的所有处理逻辑,外界通过信号,其实最主要的只有「将消息投递到所有绑定的channel」这一步。
而channel的创建时机有三个:
- 程序启动时的
NSQD.LoadMetadata
,读取本地配置文件配置,创建channel - 通过http接口
/channel/create
直接创建channel - client通过发起SUB请求订阅topic、channel的时候创建channel
我们最常见的是client通过SUB订阅topic来创建channel,client即连接到nsqd的消费者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
...
topicName := string(params[1])
channelName := string(params[2])
...
var channel *Channel
for i := 1; ; i++ {
// 获取topic,如果不存在就会创建
topic := p.nsqd.GetTopic(topicName)
// 获取channel,如果不存在就会创建
channel = topic.GetChannel(channelName)
// 将client加入channel中
if err := channel.AddClient(client.ID, client); err != nil {
return nil, protocol.NewFatalClientErr(err, "E_SUB_FAILED", "SUB failed "+err.Error())
}
...
break
}
atomic.StoreInt32(&client.State, stateSubscribed)
client.Channel = channel
// 通知client订阅了channel
client.SubEventChan <- channel
return okBytes, nil
}
将client加入channel后,channel中的消息就会负载均衡到所有订阅该channel的client。
SUB只是将topic、channel、client三者关系做了绑定,最后一行将这个channel发送到了client的messagePump当中,client的messagePump是在IOLoop中启动的,messagePump负责处理client相关的各种事件,包括:
- 消息发送控制
- 消息缓冲管理
- 心跳维护
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
var err error
var memoryMsgChan chan *Message
var backendMsgChan <-chan []byte
var subChannel *Channel
var flusherChan <-chan time.Time
var sampleRate int32
// client发送 SUB 命令订阅某个主题时,新的 Channel 会通过这个通道传递
subEventChan := client.SubEventChan
// client发送 IDENTIFY 命令协商client的配置的时候,携带的数据会通过这个通道传递
identifyEventChan := client.IdentifyEventChan
// 控制缓冲刷新的最小时间间隔
// 1. 批量发送优化: 将多个消息缓存起来一起发送,减少系统调用
// 2. 强制刷新控制: 如果消息量很小,确保消息不会在缓冲区停留太久
outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
// 心跳间隔,用于探测client的存活状态
heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
heartbeatChan := heartbeatTicker.C
// 控制消息处理超时
// 1. 限制客户端处理单个消息的最大时间,当消息处理超时时,NSQ 会重新投递该消息
// 2. 确保每个消息都得到正确处理或重试
msgTimeout := client.MsgTimeout
// 是否已经冲刷缓冲区,等下次有数据的时候再置为false,然后ticker到时间后会触发刷新
flushed := true
// 通知IOLoop,messagePump已初始化完毕
close(startedChan)
for {
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
backendMsgChan = nil
flusherChan = nil
// force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
} else if flushed {
// 上次已冲刷,不需要监听 flusherChan
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
} else {
// 未冲刷,需要监听 flusherChan
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
select {
case <-flusherChan:
// 冲刷缓冲区
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
case <-client.ReadyStateChan:
//
case subChannel = <-subEventChan:
// 只能订阅一次
subEventChan = nil
case identifyData := <-identifyEventChan:
// 只能协商一次
identifyEventChan = nil
outputBufferTicker.Stop()
if identifyData.OutputBufferTimeout > 0 {
outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
}
heartbeatTicker.Stop()
heartbeatChan = nil
if identifyData.HeartbeatInterval > 0 {
heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
heartbeatChan = heartbeatTicker.C
}
if identifyData.SampleRate > 0 {
sampleRate = identifyData.SampleRate
}
msgTimeout = identifyData.MsgTimeout
case <-heartbeatChan:
err = p.Send(client, frameTypeResponse, heartbeatBytes)
if err != nil {
goto exit
}
case b := <-backendMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg, err := decodeMessage(b)
if err != nil {
p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case msg := <-memoryMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
exit:
p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
heartbeatTicker.Stop()
outputBufferTicker.Stop()
if err != nil {
p.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
}
}
程序入口main
这篇进入nsqd程序的源码分析,废话不多说直接看到apps/nsqd/main.go这个程序入口文件。这里用了go-svc库,其实就是对系统信号通知进行了封装,定义回调接口的方式将代码规范化。
1
2
3
4
5
6
7
8
9
10
11
12
13
// 实现了svc.Service接口
type program struct {
once sync.Once
nsqd *nsqd.NSQD
}
func main() {
prg := &program{}
// 运行
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
logFatal("%s", err)
}
}
Run方法很简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func Run(service Service, sig ...os.Signal) error {
env := environment{}
// 回调Service.Init方法
if err := service.Init(env); err != nil {
return err
}
// 回调service.Start方法
if err := service.Start(); err != nil {
return err
}
if len(sig) == 0 {
sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
}
// 监听信号
signalChan := make(chan os.Signal, 1)
signalNotify(signalChan, sig...)
var ctx context.Context
if s, ok := service.(Context); ok {
ctx = s.Context()
} else {
ctx = context.Background()
}
for {
select {
case s := <-signalChan:
// 信号发生
if h, ok := service.(Handler); ok {
// 如果还实现了处理信号的接口,回调Service.Handle
if err := h.Handle(s); err == ErrStop {
goto stop
}
} else {
// 否则默认为退出
goto stop
}
case <-ctx.Done():
goto stop
}
}
stop:
return service.Stop()
}
所以在Start函数中应该再起一个协程去处理主事务,而主协程用来监听信号的发生,并且做优雅退出之类的处理。整个main函数就干了三件事,比较简单就不再贴代码:
- 解析命令行参数并初始化nsqd
- 加载本地的元数据并启动nsqd
- 出错或停止时调用nsqd.Exit优雅退出
初始化nsqd.New
主要做了如下工作:
- 初始化了一些配置参数
- 初始化与nsqlookupd通信的client,以在运行时感知集群的元数据
- 监听4150 tcp端口
- 监听4151 http端口
启动nsqd.Main
上面的“启动nsqd”就是运行nsqd.Main方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (n *NSQD) Main() error {
exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
once.Do(func() {
// 通知退出Main方法
if err != nil {
n.logf(LOG_FATAL, "%s", err)
}
exitCh <- err
})
}
// 启动TPCServer
n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
})
if n.httpListener != nil {
// 启动httpServer
httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})
}
// https
...
//
n.waitGroup.Wrap(n.queueScanLoop)
//
n.waitGroup.Wrap(n.lookupLoop)
//
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
err := <-exitCh
return err
}
TPC和HTTP都同时监听,不过干的都是一样的事情,前者是基于TCP,使用自己定义的包格式来传输,后者则是基于HTTP包格式进行传输。默认分别监听4150和4151端口。
协议 | 场景 | 支持的命令示例 |
---|---|---|
TCP | 持续连接、高频通信、大规模消息流处理 | PUB, SUB, RDY, FIN |
HTTP | 临时操作、调试、运维工具、REST API 调用 | /pub, /stats, /ping |
TCP
使用无限循环与每个客户端建立连接,开启协程单独处理每个连接:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
var wg sync.WaitGroup
for {
// 建立连接
clientConn, err := listener.Accept()
...
wg.Add(1)
// 开启协程处理连接
go func() {
handler.Handle(clientConn)
wg.Done()
}()
}
wg.Wait()
...
}
每个连接建立的时候,开始的4字节为协议约定好的魔数“ V2”,如果不是这个魔数则返回协议错误
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (p *tcpServer) Handle(conn net.Conn) {
// 读取魔数,作为协议的序言
buf := make([]byte, 4)
_, err := io.ReadFull(conn, buf)
protocolMagic := string(buf)
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{nsqd: p.nsqd}
default:
// 魔数不是预期值,直接返回协议错误
...
}
client := prot.NewClient(conn)
p.conns.Store(conn.RemoteAddr(), client)
// IOLoop循环读取并处理客户端的请求
err = prot.IOLoop(client)
if err != nil {
p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
}
p.conns.Delete(conn.RemoteAddr())
client.Close()
}
确定是这个协议,再接着进入IOLoop读取客户端的请求并执行请求:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (p *protocolV2) IOLoop(c protocol.Client) error {
var err error
var line []byte
var zeroTime time.Time
client := c.(*clientV2)
messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan
for {
// 读取一行客户端请求
line, err = client.Reader.ReadSlice('\n')
...
params := bytes.Split(line, separatorBytes)
var response []byte
// 执行请求
response, err = p.Exec(client, params)
...
if response != nil {
// 返回结果给客户端
err = p.Send(client, frameTypeResponse, response)
...
}
}
p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
close(client.ExitChan)
if client.Channel != nil {
client.Channel.RemoveClient(client.ID)
}
return err
}
HTTP
HTTP使用了httprouter这个库来进行路由
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func newHTTPServer(nsqd *NSQD, tlsEnabled bool, tlsRequired bool) *httpServer {
log := http_api.Log(nsqd.logf)
router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(nsqd.logf)
router.NotFound = http_api.LogNotFoundHandler(nsqd.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqd.logf)
s := &httpServer{
nsqd: nsqd,
tlsEnabled: tlsEnabled,
tlsRequired: tlsRequired,
router: router,
}
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))
// v1 negotiate
router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
...
// only v1
router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
...
// debug
router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
...
return s
}
对于每个路由的Handler还加了一个http_api.Decorate:
1
2
3
4
5
6
7
8
9
10
11
12
13
type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error)
type Decorator func(APIHandler) APIHandler
func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
decorated := f
// 组装成调用链
for _, decorate := range ds {
decorated = decorate(decorated)
}
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
decorated(w, req, ps)
}
}
Decorate的作用是给Handler增加中间件,比如log中间件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func Log(logf lg.AppLogFunc) Decorator {
return func(f APIHandler) APIHandler {
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
// 记录调用前的时间
start := time.Now()
// 调用
response, err := f(w, req, ps)
// 记录调用后的时间
elapsed := time.Since(start)
status := 200
if e, ok := err.(Err); ok {
status = e.Code
}
// 打印调用耗时
logf(lg.INFO, "%d %s %s (%s) %s",
status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)
return response, err
}
}
}
queueScanLoop
这个函数用时间最小堆来管理延时消息和等待确认的消息。这个函数管理了一个协程池,里面的协程叫做queueScanWorker
,他们并发地处理所有channel。
因为channel可能会空闲而没有消息,那么这些worker就应该睡眠等待有消息再起来工作。nsq可能考虑到实时唤醒这些worker比较复杂,于是参考了redis的概率过期算法(probabilistic expiration algorithm),采用轮询的方式去处理到达的消息:每隔一段时间(默认100ms)唤醒worker让他们从本地缓存中随机选择一部分channels(默认20个),发现有消息到达的channel被标记为dirty并处理。每轮如果dirty的channel占总选择的channel的一定比例以上(默认25%),那么继续重复上面的操作,否则睡眠等待下一轮唤醒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (n *NSQD) queueScanLoop() {
workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
closeCh := make(chan int)
workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
channels := n.channels()
// 调整worker池大小
n.resizePool(len(channels), workCh, responseCh, closeCh)
for {
select {
case <-workTicker.C:
// 每隔一段时间唤醒worker
if len(channels) == 0 {
continue
}
case <-refreshTicker.C:
// 每隔一段时间刷新channel数量,以调整worker数量
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
// nsqd退出
goto exit
}
num := n.getOpts().QueueScanSelectionCount
if num > len(channels) {
num = len(channels)
}
loop:
// 随机选取一部分的channels,将它们发送给workers进行处理
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i]
}
// 统计标记为dirty的channel数
numDirty := 0
for i := 0; i < num; i++ {
if <-responseCh {
numDirty++
}
}
// 如果dirty数到达一定量级,继续选取,不用睡眠
if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
goto loop
}
}
exit:
...
}
选取一部分channel的时候是随机的,这里用了一个从n个元素中随机选择m个的算法技巧(m<=n):第 i 个元素与后面随机一个元素进行交换,不断重复上述步骤,就得到实现了这样的效果。
而每个worker的工作是从最小堆中获取消息,然后保存到内存或者磁盘队列中,等待消费:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (c *Channel) put(m *Message) error {
select {
// 保存到内存
case c.memoryMsgChan <- m:
default:
// 保存到磁盘
err := writeMessageToBackend(m, c.backend)
c.nsqd.SetHealth(err)
if err != nil {
c.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
c.name, err)
return err
}
}
return nil
}
消息接收
上一节分析了nsq怎么处理已经接收到的消息的:基于时间最小堆,将消息保存到内存或者磁盘中等待下一步的处理。我们再回过头来看一下nsq怎么接收消息的
在demo中,我们用http接口进行消息的发送:
1
curl -d "test message" "http://127.0.0.1:4151/pub?topic=test_topic"
对应的handler代码为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
// TODO: one day I'd really like to just error on chunked requests
// to be able to fail "too big" requests before we even read
if req.ContentLength > s.nsqd.getOpts().MaxMsgSize {
return nil, http_api.Err{413, "MSG_TOO_BIG"}
}
// add 1 so that it's greater than our max when we test for it
// (LimitReader returns a "fake" EOF)
readMax := s.nsqd.getOpts().MaxMsgSize + 1
body, err := io.ReadAll(io.LimitReader(req.Body, readMax))
if err != nil {
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}
if int64(len(body)) == readMax {
return nil, http_api.Err{413, "MSG_TOO_BIG"}
}
if len(body) == 0 {
return nil, http_api.Err{400, "MSG_EMPTY"}
}
reqParams, topic, err := s.getTopicFromQuery(req)
if err != nil {
return nil, err
}
var deferred time.Duration
if ds, ok := reqParams["defer"]; ok {
var di int64
di, err = strconv.ParseInt(ds[0], 10, 64)
if err != nil {
return nil, http_api.Err{400, "INVALID_DEFER"}
}
deferred = time.Duration(di) * time.Millisecond
if deferred < 0 || deferred > s.nsqd.getOpts().MaxReqTimeout {
return nil, http_api.Err{400, "INVALID_DEFER"}
}
}
msg := NewMessage(topic.GenerateID(), body)
msg.deferred = deferred
err = topic.PutMessage(msg)
if err != nil {
return nil, http_api.Err{503, "EXITING"}
}
return "OK", nil
}
。其实是在IOLoop中,开启的messagePump协程中接收消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
var err error
var memoryMsgChan chan *Message
var backendMsgChan <-chan []byte
var subChannel *Channel
// NOTE: `flusherChan` is used to bound message latency for
// the pathological case of a channel on a low volume topic
// with >1 clients having >1 RDY counts
var flusherChan <-chan time.Time
var sampleRate int32
subEventChan := client.SubEventChan
identifyEventChan := client.IdentifyEventChan
outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
heartbeatChan := heartbeatTicker.C
msgTimeout := client.MsgTimeout
// v2 opportunistically buffers data to clients to reduce write system calls
// we force flush in two cases:
// 1. when the client is not ready to receive messages
// 2. we're buffered and the channel has nothing left to send us
// (ie. we would block in this loop anyway)
//
flushed := true
// signal to the goroutine that started the messagePump
// that we've started up
close(startedChan)
for {
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
backendMsgChan = nil
flusherChan = nil
// force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
} else if flushed {
// last iteration we flushed...
// do not select on the flusher ticker channel
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
} else {
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
select {
case <-flusherChan:
// if this case wins, we're either starved
// or we won the race between other channels...
// in either case, force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
case <-client.ReadyStateChan:
case subChannel = <-subEventChan:
// you can't SUB anymore
subEventChan = nil
case identifyData := <-identifyEventChan:
// you can't IDENTIFY anymore
identifyEventChan = nil
outputBufferTicker.Stop()
if identifyData.OutputBufferTimeout > 0 {
outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
}
heartbeatTicker.Stop()
heartbeatChan = nil
if identifyData.HeartbeatInterval > 0 {
heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
heartbeatChan = heartbeatTicker.C
}
if identifyData.SampleRate > 0 {
sampleRate = identifyData.SampleRate
}
msgTimeout = identifyData.MsgTimeout
case <-heartbeatChan:
err = p.Send(client, frameTypeResponse, heartbeatBytes)
if err != nil {
goto exit
}
case b := <-backendMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg, err := decodeMessage(b)
if err != nil {
p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case msg := <-memoryMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
exit:
p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
heartbeatTicker.Stop()
outputBufferTicker.Stop()
if err != nil {
p.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
}
}