文章

nsq阅读(2)——diskqueue

nsq阅读(2)——diskqueue

开篇

DiskQueue 是 NSQ 中的持久化组件,承担着将消息存储到磁盘的任务,它以高效、可靠的方式在内存与磁盘之间进行切换。在开始分析 nsq 那几个大件之前,先深入分析下 DiskQueue 的核心设计和实现,解密其背后的关键技术点,包括写入机制、读取逻辑以及在异常场景下的恢复策略。

设计

nsq将diskqueue抽取出来作为一个库来维护:https://github.com/nsqio/go-diskqueue

里面只有一个源码文件diskqueue.go,里面核心结构体是基于磁盘的消息队列diskQueue,实现了nsqd中的BackendQueue接口,比较简单:

1
2
3
4
5
6
7
8
type BackendQueue interface {
	Put([]byte) error
	ReadChan() <-chan []byte
	Close() error
	Delete() error
	Depth() int64
	Empty() error
}

消息以字节数组的方式传递,接收端需要对字节数组进行解码,存储端的实现就能相对简单。

diskqueue在创建时会运行一个叫做ioLoop的协程用于处理io相关的事务,并与主协程通过以下各个chan进行通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type diskQueue struct {
    ...
    
    // 队列大小(即depth在nsq中表示的就是可读消息数)
	depthChan         chan int64
    // 写磁盘的数据
	writeChan         chan []byte
    // 写磁盘的结果
	writeResponseChan chan error
    // 清空队列
	emptyChan         chan int
    // 清空队列的结果
	emptyResponseChan chan error
    // 退出
	exitChan          chan int
    // 主协程等待ioLoop退出
	exitSyncChan      chan int
}

logrotation与metadata

diskqueue还用了rotation的方式来限制单个文件的大小(拓展:在linux中有一个实现logrotation命令叫logrotate),这样便于在消息读出之后,通过直接删除文件的方式来删除不再需要的数据。为了实现logrotation,还会维护一些元数据:

  1. 队列大小(消息数)
  2. 正在读第几个文件中的第几个字节
  3. 正在写第几个文件中的第几个字节

代码中通过persistMetaData方法来原子性地持久化这些元数据。元数据非常重要,如果不是原子性地写入,比如写一半就宕机了,那么所有数据将会不可用。原子性是通过写临时文件、将临时文件重命名为最终文件名来保证的

写数据

使用接口方法Put写入:

1
2
3
4
5
6
7
8
9
10
11
func (d *diskQueue) Put(data []byte) error {
	d.RLock()
	defer d.RUnlock()

	if d.exitFlag == 1 {
		return errors.New("exiting")
	}

	d.writeChan <- data
	return <-d.writeResponseChan
}

ioLoop协程处理数据的写入,调用writeOne进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func (d *diskQueue) writeOne(data []byte) error {
	var err error

	dataLen := int32(len(data))
	totalBytes := int64(4 + dataLen)

	if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile {
        // 写入的大小将超出单个文件大小限制,触发logrotation,即写入下一个新文件
		if d.readFileNum == d.writeFileNum {
			d.maxBytesPerFileRead = d.writePos
		}

		d.writeFileNum++
		d.writePos = 0

        // 每次logrotation前,都sync将最后一个文件以及元数据落盘
		err = d.sync()
		if err != nil {
			d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
		}

        // 关闭序号为i的数据文件
		if d.writeFile != nil {
			d.writeFile.Close()
			d.writeFile = nil
		}
	}
    
    // 创建序号为i+1的数据文件
	if d.writeFile == nil {
		curFileName := d.fileName(d.writeFileNum)
		d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
		if err != nil {
			return err
		}

		d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)

		if d.writePos > 0 {
			_, err = d.writeFile.Seek(d.writePos, 0)
			if err != nil {
				d.writeFile.Close()
				d.writeFile = nil
				return err
			}
		}
	}

    // 数据大小写入缓存
	d.writeBuf.Reset()
	err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
	if err != nil {
		return err
	}

    // 数据写入缓存
	_, err = d.writeBuf.Write(data)
	if err != nil {
		return err
	}

    // 缓存写入文件
	_, err = d.writeFile.Write(d.writeBuf.Bytes())
	if err != nil {
		d.writeFile.Close()
		d.writeFile = nil
		return err
	}

	d.writePos += totalBytes
	d.depth += 1

	return err
}

这里要注意在整个rotation的过程中并不保证原子性,而是分为三步:

  1. 落盘序号为 i 的数据文件
  2. 落盘元数据文件(其中的writeFileNum为 i+1)
  3. 创建序号为 i+1 的数据文件

对于1、2之间发生宕机的情况,会导致数据文件大小比元数据的writePos要大,这里采取的措施是直接触发rotation

对于2、3之间发生宕机的话,重启之后读取序号为 i+1 的文件,报文件不存在错误但是会忽略这个错误。然后第一次写入数据时会创建这个文件,正常运行。

读数据

读数据是通过ioLoop在每次循环中调用readOne实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func (d *diskQueue) readOne() ([]byte, error) {
	var err error
	var msgSize int32

    // 创建读取文件,定位到上次读的地方
	if d.readFile == nil {
		curFileName := d.fileName(d.readFileNum)
		d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
		if err != nil {
			return nil, err
		}

		d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)

		if d.readPos > 0 {
			_, err = d.readFile.Seek(d.readPos, 0)
			if err != nil {
				d.readFile.Close()
				d.readFile = nil
				return nil, err
			}
		}

		// 对于已经归档的文件,将最大可读大小设置为文件大小
		d.maxBytesPerFileRead = d.maxBytesPerFile
		if d.readFileNum < d.writeFileNum {
			stat, err := d.readFile.Stat()
			if err == nil {
				d.maxBytesPerFileRead = stat.Size()
			}
		}

        // 设置reader
		d.reader = bufio.NewReader(d.readFile)
	}

    // 读取数据大小
	err = binary.Read(d.reader, binary.BigEndian, &msgSize)
	if err != nil {
		d.readFile.Close()
		d.readFile = nil
		return nil, err
	}

    ...

    // 读入数据
	readBuf := make([]byte, msgSize)
	_, err = io.ReadFull(d.reader, readBuf)
	if err != nil {
		d.readFile.Close()
		d.readFile = nil
		return nil, err
	}

	totalBytes := int64(4 + msgSize)

	// 先将readPos和readFileNum先暂存到next*,等消息发送给消费者之后再更新他们
	d.nextReadPos = d.readPos + totalBytes
	d.nextReadFileNum = d.readFileNum

	// 这个文件已经读完,rotate到下一个文件
	if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
		if d.readFile != nil {
			d.readFile.Close()
			d.readFile = nil
		}

		d.nextReadFileNum++
		d.nextReadPos = 0
	}

	return readBuf, nil
}

读完之后,然后ioLoopselect其中一个分支会将数据发送到readChan中,给外界读取。

数据落盘

为了提高数据读写的效率,消息数据不会每次写入都落盘,数据只有以下几个时机会sync落盘:

  1. 触发rotation
  2. 读写次数到达syncEvery
  3. syncTimeout定期落盘

综上,高并发写入数据时,特别是maxBytesPerFile设置得比较大、syncEvery设置得比较大、syncTimeout设置得比较大时,宕机其实很容易会丢失一部分数据,这部分数据是存在操作系统的page cache当中而丢失的。

从侧面反映出,依赖于diskqueue的nsq消息队列,进行所谓的“持久化”操作时其实也会因为宕机而丢失数据。

其它细节

看一下用于查询队列大小的Depth方法:

1
2
3
4
5
6
7
8
func (d *diskQueue) Depth() int64 {
	depth, ok := <-d.depthChan
	if !ok {
		// ioLoop exited
		depth = d.depth
	}
	return depth
}

这里优先从depthChan中返回深度,而不是直接返回d.depth,原因是d.depth只会被ioLoop协程改写,而由于调用Depth的一定是其它协程,读写d.depth时势必要加锁避免并发读写冲突。为了避免加锁带来性能上的影响,因此优先从d.depthChan中读取,相应的,ioLoop的select有一个分支是写入d.depthChan

1
2
3
4
5
6
7
8
9
10
11
12
func (d *diskQueue) ioLoop() {
	for {
		...

		select {
            // 写入depthChan
		case d.depthChan <- d.depth:
            // 其它分支改写d.depth
        case ...
		}
	}
}

当整个diskQeueu被关闭,ioLoop结束时,d.depth不会再被改写,此时直接返回d.depth

本文由作者按照 CC BY 4.0 进行授权