fabric_code_consensus_event

fabric consensus event源码解析 代码位置为 fabric/consensus/…

想写pbft的代码解析来着。看到里面的事件流,设计得很赞,学习一下。

Manager 事件管理

主要实现的功能是事件通道的处理,通过相关接口,写入数据到通道中,数据处理方提供处理方法即可,不需关注通道的实现。

1 //首先,接口事件要做的事主要是,内部提供一个事件通道,暴露出往通道写数据的接口,以及数据处理接收方法,接收方法由Receiver提供,Magager的实现者需要实现数据从通道读出来,回调给接收者的过程。
2 type Manager interface {
3   Inject(Event)         //一个暂时的接口,跳过了通道,直接给到接收者,只限于Manager本线程使用,因为没有了通道做数据保护
4   Queue() chan<- Event  //提供一个只写通道,用于往通道内写数据
5   SetReceiver(Receiver) //设置接收者,接收者需要实现接收方法,在Receiver接口中有具体方法定义
6   Start()               // 启动Manager线程
7   Halt()                // 停止manager线程
8 }

Receiver接口

1 type Receiver interface {
2   //事件处理的地方会调用这个方法传递事件给到Receiver,注意到这个设计的是返回值也是Event事件,这个在之后的地方进行详细解释
3   ProcessEvent(e Event) Event
4 }

Manager实现

1 type managerImpl struct {
2   threaded  //提供一个exit的通道,作为从外面结束线程的信号,包括Halt方法的实现
3   receiver Receiver  //接收者
4   events   chan Event //数据通道
5 }

之前说manager是一个事件通道的处理,那么,业务逻辑一定是不断循环,从通道中读出数据,然后调用接收者的方法传递给接收者。所以代码是..

1func (em *managerImpl) Start() {
2    //开启子线程
3   go em.eventLoop()
4 }
5func (em *managerImpl) eventLoop() {
6   for {
7       select {
8       case next := <-em.events:
9           em.Inject(next)
10      case <-em.exit:
11          logger.Debug("eventLoop told to exit")
12          return
13      }
14  }
15 }
16//传递Event给receiver
17func (em *managerImpl) Inject(event Event) {
18  if em.receiver != nil {
19      SendEvent(em.receiver, event)
20  }
21 }
22//调用receiver.PeoceesEvent方法将事件传递给接收者
23func SendEvent(receiver Receiver, event Event) {
24  next := event
25  for {
26      // 如果ProcessEvent方法返回值不为空,则作为新的事件,继续处理
27      next = receiver.ProcessEvent(next)
28      if next == nil {
29          break
30      }
31  }
32 }

ProcessEvent设计为有返回值,且为Event,当返回值不为空,则作为新的事件进行处理,直到返回值为空。当然,因为是同一个receiver,如果设计为返回值为空,然后在ProcessEvent里直接进行递归调用,感觉也是一样的。但这样写的话,在ProcessEvent就会干净很多。

Timer

go源码包中是有time包的,为什么还要封装一个timer呢,原因是封装的timer一旦被reset或stop,就算倒计时触发事件了,事件也不会传递到事件队列中。

既然提到go中原time包,多嘴一句,time包中的timer也是具体stop和reset方法,但可以看到官方函数解释

1> Stop does not close the channel, to prevent a read from the channel succeeding incorrectly
2>

即stop方法不会关闭通道,一般使用timer的定时器或ticker,都是监听通道是否有值,就意味着即使stop掉定时器,但通道还是在的,监听通道的程序不会退出。

好了,下面说明event中封装的Timer吧。

直接看实现吧,接口就略过了

1// newTimer creates a new instance of timerImpl
2func newTimerImpl(manager Manager) Timer {
3   et := &timerImpl{
4       startChan: make(chan *timerStart),
5       stopChan:  make(chan struct{}),
6       threaded:  threaded{make(chan struct{})},
7       manager:   manager,
8   }
9   go et.loop()
10  return et
11 }

注意到结构体中有manager的引用,实现的定时器为,当开启定时器时,需要传入定时时间,和event事件,当时间触发时,把event事件传递给manager.receiver,传递方式为manager中的传递方式。故主要代码为

1func (et *timerImpl) loop() {
2   var eventDestChan chan<- Event //内部缓存通道
3   var event Event //事件
4
5   for {
6       select {
7       case start := <-et.startChan:
8           //计时开始,时间为start.duration,事件为start.event
9           if et.timerChan != nil {
10              if start.hard {
11                  logger.Debug("Resetting a running timer")
12              } else {
13                  continue
14              }
15          }
16          logger.Debug("Starting timer")
17          et.timerChan = time.After(start.duration)
18          if eventDestChan != nil {
19              logger.Debug("Timer cleared pending event")
20          }
21          event = start.event
22          eventDestChan = nil
23      case <-et.stopChan:
24          //结束计时
25          if et.timerChan == nil && eventDestChan == nil {
26              logger.Debug("Attempting to stop an unfired idle timer")
27          }
28          et.timerChan = nil
29          logger.Debug("Stopping timer")
30          if eventDestChan != nil {
31              logger.Debug("Timer cleared pending event")
32          }
33          eventDestChan = nil
34          event = nil
35      case <-et.timerChan:
36          //倒计时触发,这里好绕,倒计时触发,仅仅只是将事件传递通道的引用缓存下来
37          logger.Debug("Event timer fired")
38          et.timerChan = nil
39          eventDestChan = et.manager.Queue()
40      case eventDestChan <- event:
41          //如果事件传递通道不为空,且event也不为空,则将event传递给事件通道中 eventDestChan <- event这一句话就竟然能实现这么多件事!
42          logger.Debug("Timer event delivered")
43          eventDestChan = nil
44      case <-et.exit:
45          //退出
46          logger.Debug("Halting timer")
47          return
48      }
49  }

代码看到,倒计时触发的时候,并不是立即直接将event送入通道,而是将通道缓存下来,等到下一次select,再执行将事件送入通道的事。在nil通道上发送和接受将永远被阻塞,在select中,如果其通道是nil,它将永远不会被选择。所以,上述eventDestChan如果为nil, case eventDestChan <- event的语句就不会被选择。eventDestChan不为nil时,就能被选择了,select的用法是不是感觉很赞!

全部评论(0)