添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
  • 启动 miner,创建全局唯一Sealing对象( lotus/extern/storage-sealing/sealing.go:145 )
  • Sealing 对象run,会定期的检查是否开启刷单,如果刷单会调用一个 PledgeSector( lotus/extern/storage-sealing/garbage.go:13 ),承诺一个新的扇区
  • 在 PledgeSector 的结束会调用 Sealing.sectors.Send(id, evt)( github.com/filecoin-project/[email protected]/group.go:85 ),这个方法非常重要,几乎所有的状态变更都依赖这个。(tips:Sealing内部的sectors,类型是statemachine.StateGroup,这个 StateGroup 就负责所有sector的状态机管理)
  • 第一次调用 Send 时,会通过id先在 StateGroup.sms( map[datastore.Key]*StateMachine ),理论上这时候是没有这个状态机的,只有走到StateGroup.loadOrCreate(id, userState)( github.com/filecoin-project/[email protected]/group.go:102 )中加载或者创建一个状态机,然后一个 go res.run() ,状态机就运行起来了
  • 状态机如何变化状态?

    先看一下状态机的结构:

    type StateMachine struct {
    	planner  Planner
    	eventsIn chan Event
    	name      interface{}
    	st        *statestore.StoredState
    	stateType reflect.Type
    	stageDone chan struct{}
    	closing   chan struct{}
    	closed    chan struct{}
    	busy int32
      
  • planner 是计划处理函数,它返回状态变化函数和已经处理的事件数量,可以看下面的tips
  • eventsIn 是一个管道,在其他地方传递数据过来,在 ## 状态机什么时候启动? 提到的 go res.run() 里每次循环出输出给一个切片
  • stageDone 和 closing 是两个标志位,在状态机运行中会循环检测
  • busy 是一个会被原子操作的值,可以理解为一个锁。
  • 其他数据就是状态机的元数据
  • # planner
    // Planner processes in queue events
    // It returns:
    // 1. a handler of type -- func(ctx Context, st <T>) (func(*<T>), error), where <T> is the typeOf(User) param
    // 2. the number of events processed
    // 3. an error if occured
    
    # 状态机变化图
    	// Now decide what to do next
    				      UndefinedSectorState (start)
    				       v                     |
    				*<- WaitDeals <-> AddPiece   |
    				|   |   /--------------------/
    				|   v   v
    				*<- Packing <- incoming committed capacity
    				|   |
    				|   v
    				|   GetTicket
    				|   |   ^
    				|   v   |
    				*<- PreCommit1 <--> SealPreCommit1Failed
    				|   |       ^          ^^
    				|   |       *----------++----\
    				|   v       v          ||    |
    				*<- PreCommit2 --------++--> SealPreCommit2Failed
    				|   |                  ||
    				|   v          /-------/|
    				*   PreCommitting <-----+---> PreCommitFailed
    				|   |                   |     ^
    				|   v                   |     |
    				*<- WaitSeed -----------+-----/
    				|   |||  ^              |
    				|   |||  \--------*-----/
    				|   |||           |
    				|   vvv      v----+----> ComputeProofFailed
    				*<- Committing    |
    				|   |        ^--> CommitFailed
    				|   v             ^
    			        |   SubmitCommit  |
    		        	|   |             |
    		        	|   v             |
    				*<- CommitWait ---/
    				|   |
    				|   v
    				|   FinalizeSector <--> FinalizeFailed
    				|   |
    				|   v
    				*<- Proving
    				FailedUnrecoverable
    

    其实状态改变的函数是在 (github.com/filecoin-project/[email protected]/machine.go:103)实现的,通过反射实现的。只有当状态改变完成后才会修改fsm.busy。

    go func() {
    				defer log.Debugw("leaving critical zone and resetting atomic var to zero", "len(pending)", len(pendingEvents))
    				if nextStep != nil {
    					res := reflect.ValueOf(nextStep).Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(ustate).Elem()})
    					if res[0].Interface() != nil {
    						log.Errorf("executing step: %+v", res[0].Interface().(error)) // TODO: propagate top level
    						return
    				atomic.StoreInt32(&fsm.busy, 0)
    				fsm.stageDone <- struct{}{}
    

    上述代码主要是对 nextStep 对象进行反射操作,那么该对象是怎么出现的呢?它就是上述 planner 方法返回的第一个参数。下面看看 planner 到底是怎么运行的?

    # lotus/extern/storage-sealing/fsm.go:19
    func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
    	next, processed, err := m.plan(events, user.(*SectorInfo))
    	if err != nil || next == nil {
    		return nil, processed, err
    	return func(ctx statemachine.Context, si SectorInfo) error {
    		err := next(ctx, si)
    		if err != nil {
    			log.Errorf("unhandled sector error (%d): %+v", si.SectorNumber, err)
    			return nil
    		return nil
    	}, processed, nil // TODO: This processed event count is not very correct
    # lotus/extern/storage-sealing/fsm.go:274
    func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, uint64, error) {
    	switch state.State {
    	// Happy path
    	case Empty:
    		fallthrough
    	case WaitDeals:
    		return m.handleWaitDeals, processed, nil
    	case AddPiece:
    		return m.handleAddPiece, processed, nil
    	case Packing:
    		return m.handlePacking, processed, nil
    	case GetTicket:
    		return m.handleGetTicket, processed, nil
    	case PreCommit1:
    		return m.handlePreCommit1, processed, nil
    	case PreCommit2:
    		return m.handlePreCommit2, processed, nil
    	case PreCommitting:
    		return m.handlePreCommitting, processed, nil
    	case SubmitPreCommitBatch:
    		return m.handleSubmitPreCommitBatch, processed, nil
    	case PreCommitBatchWait:
    		fallthrough
    	case PreCommitWait:
    		return m.handlePreCommitWait, processed, nil
    	case WaitSeed:
    		return m.handleWaitSeed, processed, nil
    	case Committing:
    		return m.handleCommitting, processed, nil
    	case SubmitCommit:
    		return m.handleSubmitCommit, processed, nil
    	case SubmitCommitAggregate:
    		return m.handleSubmitCommitAggregate, processed, nil
    	case CommitAggregateWait:
    		fallthrough
    	case CommitWait:
    		return m.handleCommitWait, processed, nil
    	case CommitFinalize:
    		fallthrough
    	case FinalizeSector:
    		return m.handleFinalizeSector, processed, nil
    	// Handled failure modes
    	case AddPieceFailed:
    		return m.handleAddPieceFailed, processed, nil
    	case SealPreCommit1Failed:
    		return m.handleSealPrecommit1Failed, processed, nil
    	case SealPreCommit2Failed:
    		return m.handleSealPrecommit2Failed, processed, nil
    	case PreCommitFailed:
    		return m.handlePreCommitFailed, processed, nil
    	case ComputeProofFailed:
    		return m.handleComputeProofFailed, processed, nil
    	case CommitFailed:
    		return m.handleCommitFailed, processed, nil
    	case CommitFinalizeFailed:
    		fallthrough
    	case FinalizeFailed:
    		return m.handleFinalizeFailed, processed, nil
    	case PackingFailed: // DEPRECATED: remove this for the next reset
    		state.State = DealsExpired
    		fallthrough
    	case DealsExpired:
    		return m.handleDealsExpired, processed, nil
    	case RecoverDealIDs:
    		return wrapCtx(m.HandleRecoverDealIDs), processed, nil
    	// Post-seal
    	case Proving:
    		return m.handleProvingSector, processed, nil
    	case Terminating:
    		return m.handleTerminating, processed, nil
    	case TerminateWait:
    		return m.handleTerminateWait, processed, nil
    	case TerminateFinality:
    		return m.handleTerminateFinality, processed, nil
    	case TerminateFailed:
    		return m.handleTerminateFailed, processed, nil
    	case Removing:
    		return m.handleRemoving, processed, nil
    	case Removed:
    		return nil, processed, nil
    	case RemoveFailed:
    		return m.handleRemoveFailed, processed, nil
    		// Faults
    	case Faulty:
    		return m.handleFaulty, processed, nil
    	case FaultReported:
    		return m.handleFaultReported, processed, nil
    	// Fatal errors
    	case UndefinedSectorState:
    		log.Error("sector update with undefined state!")
    	case FailedUnrecoverable:
    		log.Errorf("sector %d failed unrecoverably", state.SectorNumber)
    	default:
    		log.Errorf("unexpected sector update state: %s", state.State)
    	return nil, processed, nil
    

    planner 就会根据传递过来的事件返回对应的处理函数,然后进行状态改变。

    内部的实现比较复杂,再进一步的逻辑就没再去看了。在状态机相关代码中定义了 var ErrTerminated = xerrors.New("normal shutdown of state machine")。只会在 <- fsm.closed 或者进入到了最终状态才会返回,它意味着这是正常的状态机关闭。