nsq源码阅读(1)-启动和优雅退出

启动, 利用svc框架

nsq使用了svc框架来启动一个service, Run 时, 分别调用prg 实现的 Init 和 Start 方法 启动’program’,然后监听 后两个参数的信号量, 当信号量到达, 调用 prg 实现的 Stop 方法来退出

func main() {
	prg := &program{}
	// svc 框架的Run 方法启动一个service
	if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
		log.Fatal(err)
	}
}
// svc 框架, 启动 service, 也可以理解为 daemon
//
// 方法会一直阻塞, 直到 指定的信号量到来
// 如果指定信号为空, 默认是 syscall.SIGINT and syscall.SIGTERM
func Run(service Service, sig ...os.Signal) error {
	env := environment{}

	//先调用Init 初始化
	if err := service.Init(env); err != nil {
		return err
	}

	//再调用Start
	//其实有时候根本分不太清楚哪里是init哪里是start
	//作为一个框架, 当然要做的优雅一点
	if err := service.Start(); err != nil {
		return err
	}

	//准备信号量处理
	if len(sig) == 0 {
		sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
	}

	signalChan := make(chan os.Signal, 1)
	signalNotify(signalChan, sig...)

	//整个程序会阻塞在这里, 等待系统信号量
	<-signalChan

	// 当信号来到, 调用stop 方法停掉
	return service.Stop()
}

svc 生命周期三个方法的实现:

func (p *program) Init(env svc.Environment) error {
	//svc 封装了不同操作系统的Deamon服务进程相关的东西
	//TODO window deamon的不同点
	if env.IsWindowsService() {
		dir := filepath.Dir(os.Args[0])
		return os.Chdir(dir)
	}
	return nil
}

func (p *program) Start() error {
	// 配置参数解析
	flagSet.Parse(os.Args[1:])

	if *showVersion {
		fmt.Println(version.String("nsqlookupd"))
		os.Exit(0)
	}

	var cfg map[string]interface{}
	if *config != "" {
		_, err := toml.DecodeFile(*config, &cfg)
		if err != nil {
			log.Fatalf("ERROR: failed to load config file %s - %s", *config, err.Error())
		}
	}

	opts := nsqlookupd.NewOptions()
	options.Resolve(opts, flagSet, cfg)
	daemon := nsqlookupd.New(opts)

	// 这里是主要的启动方法
	daemon.Main()
	p.nsqlookupd = daemon
	return nil
}

func (p *program) Stop() error {
	if p.nsqlookupd != nil {
		p.nsqlookupd.Exit()
	}
	return nil
}

去掉svc框架, 把启动流程整个精简拼凑:

func main() {
	p.nsqlookupd.Main()
	<-signalChan
	p.nsqlookupd.Exit()
}

优雅退出

NSQLookupd 的真正的启动函数是Main

lookup服务主要有通过两种方式提供服务, tcp 和 http


func (l *NSQLookupd) Main() {
	ctx := &Context{l}

	tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
	if err != nil {
		l.logf("FATAL: listen (%s) failed - %s", l.opts.TCPAddress, err)
		os.Exit(1)
	}
	// 这里为什么要加锁?
	// 当前场景不好理解, 换个场景:
	//if l.playerList.count() > 0 {
	//	如果这里l 不 lock的话, 多线程 执行了 l.playerList = anotherPlayerList
	//	然后再 执行delete 就不对了
	//	l.playerList.delete(n)
	//}
	l.Lock()
	l.tcpListener = tcpListener
	l.Unlock()
	tcpServer := &tcpServer{ctx: ctx} // 这个就是handler

	//起一个tcp处理进程, 主要处理 client 和nsq server的连接
	l.waitGroup.Wrap(func() {
		protocol.TCPServer(tcpListener, tcpServer, l.opts.Logger)
	})

	//nsqlookup 还支持http操作, 所以再起一个线程
	httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
	if err != nil {
		l.logf("FATAL: listen (%s) failed - %s", l.opts.HTTPAddress, err)
		os.Exit(1)
	}
	l.Lock()
	l.httpListener = httpListener
	l.Unlock()
	httpServer := newHTTPServer(ctx)
	l.waitGroup.Wrap(func() {
		http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger)
	})
}

现在我们就有了3个线程:

假如Main线程在接收到系统信号量之后直接退出, 那么其他两个线程也会跟着销毁. 于是这里就会出现一个问题, tcp服务线程 和 http服务线程, 是业务逻辑实现的主要线程, 里面可能正在”干活”, 此时需要退出, 是否有数据需要持久化了? 事务是否正好执行到一半?

所以退出时需要做优雅退出处理. Main所在线程退出之前, 需要 ‘通知’ 并且 ‘等待’ 两个服务线程退出后才能退出

为了实现这一点, nsq 利用了 sync包中的 waitgroup 组件

nsq 为了方便使用, 做了一个封装:

type WaitGroupWrapper struct {
	sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
	w.Add(1) // +1
	// 起一个线程, 就是go 的 goroutine
	go func() {
		cb() //执行任务
		w.Done() // -1
	}()
}

那么比如nsq 启动 tcp 服务线程 的代码, 就相当于:

w.Add(1)
go func() {
	protocol.TCPServer(tcpListener, tcpServer, l.opts.Logger)
	w.Done()
}()
w.Wait()  //Main 线程就会阻塞在这里; 等待TCPServer 结束时调用w.Done()后才退出

通知 goroutine 退出

好了, 现在Main 会等待TCPServer 退出之后再退出了, 那, Main要退出的时候, 如何通知 TCPServer 退出?

protocol.TCPServer 就是循环的监听 tcp listener的 accept 连接, 交给 handler 处理; 然后这个for 循环是一个死循环, 依靠 accept 返回 error 才退出

// ==================== 注意, 这里的 TCPServer 运行在一个新的 goroutine 中
// 这个TCPServer 也是一个可以复用的比较不错的 tcp 程序 accept 模式
func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
	l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))

	for {
		clientConn, err := listener.Accept()
		if err != nil {
			if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
				l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
				runtime.Gosched()       // 是临时的错误, 暂停一下继续
				continue
			}
			// theres no direct way to detect this error because it is not exposed
			if !strings.Contains(err.Error(), "use of closed network connection") {
				l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
			}
			break
		}
		go handler.Handle(clientConn)
	}

	l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}

再看Exit函数:

// ==================== 注意, 这里 是 Main 所在线程
func (l *NSQLookupd) Exit() {
	if l.tcpListener != nil {
		//Close 之后, listener.Accept() 就会返回error,
		//从而退出循环, TCPServer 线程退出
		l.tcpListener.Close()
	}

	if l.httpListener != nil {
		l.httpListener.Close()
	}
	l.waitGroup.Wait()
}

把整个流程代码的主干拼凑在一起, 就比较明了了:

func (l *NSQLookupd) Main() {
	w.Add(1)
	go func() {
		//=== TCPServer 线程 start
		for {
			clientConn, err := listener.Accept()
			if err != nil {
				break
			}
			go handler.Handle(clientConn)
		}
		w.Done()
		//=== TCPServer 线程 end
	}()
}

func (l *NSQLookupd) Exit() {
	l.tcpListener.Close()
	l.waitGroup.Wait()
}

如果把 svc 的也套进来, 整个程序的 启动流程就是:

func main() {
	// 告诉wg 准备要启动一个goroutine
	waitGroup.Add(1)
	go func() {
		for {
			clientConn, err := listener.Accept()
			if err != nil {
				break
			}
			go handler.Handle(clientConn)
		}

		waitGroup.Done()
	}()

	// 当退出信号到达
	<-signalChan

	// 关闭listener
	tcpListener.Close()

	//检查并等待 所有的 goroutine 都结束, 而不是强制退出
	waitGroup.Wait()
}

这里信号量到达之后, 做了一个 tcpListener.Close(), 其实这个动作的目的是通知 tcpListener 服务所在的线程退出

这里直接利用了 listener 被 close 之后再 accept 调用会返回error的特性, 既退出了线程, 又关闭了 listener, 一举两得

nsq 里还有各种退出线程的方式

精简总流程:

这里的大框架是app 启动和优雅退出的总流程

func main() {
	// 告诉wg 准备要启动一个goroutine
	waitGroup.Add(1)
	go func() {
		for {
			clientConn, err := listener.Accept()
			if err != nil {
				break
			}
			go handler.Handle(clientConn)
		}

		waitGroup.Done()
	}()

	// 当退出信号到达
	<-signalChan

	// 关闭listener
	// tcp 服务线程也会因为accept 返回error 而退出
	tcpListener.Close()

	//检查并等待 所有的 goroutine 都结束, 而不是强制退出
	waitGroup.Wait()
}