nsq源码阅读(2)-TCPServer封装

TCPHandler

上文理解了nsq怎么做启动和退出, 并且知道nsqlookup 启动了tcp 和 http 两个服务进程.

现在我们直接看 nsqd.Main()

func (n *NSQD) Main() {
	...

	// 启动tcp listener
	tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)

	//实现了具体的Handler
	tcpServer := &tcpServer{ctx: ctx}

	//上一篇说的, 用以优雅退出
	n.waitGroup.Wrap(func() {
		protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
	})

	...
}

nsq 封装了一个TCPHandler, 当 listener.Accept() 接到client 的连接获取到connect 时, 交给 TCPHandler.Handle(net.Conn) 函数处理

所有的 tcp 服务都可以复用此代码, 不同的服务内容, 只需要实现不同的TCPHandler即可


type TCPHandler interface {
	Handle(net.Conn)
}

func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
	l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))

	for {
		// 有客户端连接
		clientConn, err := listener.Accept()
		if err != nil {
			if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
				l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
				runtime.Gosched()       // 是临时的错误, 暂停一下继续
				continue
			}
			// theres no direct way to detect this error because it is not exposed
			if !strings.Contains(err.Error(), "use of closed network connection") {
				l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
			}
			break
		}
		//启动一个线程, 交给 handler 处理, 这里使用的是 one connect per thread 模式
		//因为golang的特性, one connect per thread 模式 实际上是  one connect per goroutine
		//再加上golang将io操作都做了封装, 那么实际上这里是 one connect per event loop 模式
		go handler.Handle(clientConn)
	}

	l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}

协议不同版本 的 封装处理

接到连接之后, 交给TCPHandler 处理, 这里是由tcpServer 实现:


func (p *tcpServer) Handle(clientConn net.Conn) {
	p.ctx.nsqd.logf("TCP: new client(%s)", clientConn.RemoteAddr())

	// 先读取4个字节 作为 协议版本号
	buf := make([]byte, 4)
	_, err := io.ReadFull(clientConn, buf)
	if err != nil {
		p.ctx.nsqd.logf("ERROR: failed to read protocol version - %s", err)
		return
	}
	protocolMagic := string(buf)

	p.ctx.nsqd.logf("CLIENT(%s): desired protocol magic '%s'",
		clientConn.RemoteAddr(), protocolMagic)

	var prot protocol.Protocol
	switch protocolMagic {
	case "  V2":
		// protocolV2 实现了 V2 这个版本的协议
		prot = &protocolV2{ctx: p.ctx}
	//假如有另外一个版本的协议
	//case "  V3":
	//	prot = &protocolV3{ctx: p.ctx}
	default:
		protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
		clientConn.Close()
		p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'",
			clientConn.RemoteAddr(), protocolMagic)
		return
	}

	//交给具体protocol实现类处理每个连接
	err = prot.IOLoop(clientConn)
	if err != nil {
		p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err)
		return
	}
}

Handle 函数先读取4个字节的字符串作为版本号, 根据版本号选用具体的 protocol 实现, 这里的 “ V2”这个版本的协议由protocolV2 实现

将协议封装, 方便以后不同协议的扩展

type Protocol interface {
	IOLoop(conn net.Conn) error
}

精简总流程:

func (n *NSQD) Main() {
	tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
	tcpServer := &tcpServer{ctx: ctx}
	n.waitGroup.Wrap(func() {
		protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
	})
}

func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
	for {
		clientConn, err := listener.Accept()
		go handler.Handle(clientConn)
	}
}

func (p *tcpServer) Handle(clientConn net.Conn) {
	buf := make([]byte, 4)
	_, err := io.ReadFull(clientConn, buf)
	protocolMagic := string(buf)

	var prot protocol.Protocol
	switch protocolMagic {
	case "  V2":
		prot = &protocolV2{ctx: p.ctx}
	}

	err = prot.IOLoop(clientConn)
}