type StateMachine struct {
planner Planner
eventsIn chan Event
name interface{}
st *statestore.StoredState
stateType reflect.Type
stageDone chan struct{}
closing chan struct{}
closed chan struct{}
busy int32
planner 是计划处理函数,它返回状态变化函数和已经处理的事件数量,可以看下面的tips
eventsIn 是一个管道,在其他地方传递数据过来,在 ## 状态机什么时候启动?
提到的 go res.run()
里每次循环出输出给一个切片
stageDone 和 closing 是两个标志位,在状态机运行中会循环检测
busy 是一个会被原子操作的值,可以理解为一个锁。
其他数据就是状态机的元数据
# planner
// Planner processes in queue events
// It returns:
// 1. a handler of type -- func(ctx Context, st <T>) (func(*<T>), error), where <T> is the typeOf(User) param
// 2. the number of events processed
// 3. an error if occured
# 状态机变化图
// Now decide what to do next
UndefinedSectorState (start)
v |
*<- WaitDeals <-> AddPiece |
| | /--------------------/
| v v
*<- Packing <- incoming committed capacity
| |
| v
| GetTicket
| | ^
| v |
*<- PreCommit1 <--> SealPreCommit1Failed
| | ^ ^^
| | *----------++----\
| v v || |
*<- PreCommit2 --------++--> SealPreCommit2Failed
| | ||
| v /-------/|
* PreCommitting <-----+---> PreCommitFailed
| | | ^
| v | |
*<- WaitSeed -----------+-----/
| ||| ^ |
| ||| \--------*-----/
| ||| |
| vvv v----+----> ComputeProofFailed
*<- Committing |
| | ^--> CommitFailed
| v ^
| SubmitCommit |
| | |
| v |
*<- CommitWait ---/
| |
| v
| FinalizeSector <--> FinalizeFailed
| |
| v
*<- Proving
FailedUnrecoverable
其实状态改变的函数是在 (github.com/filecoin-project/[email protected]/machine.go:103)实现的,通过反射实现的。只有当状态改变完成后才会修改fsm.busy。
go func() {
defer log.Debugw("leaving critical zone and resetting atomic var to zero", "len(pending)", len(pendingEvents))
if nextStep != nil {
res := reflect.ValueOf(nextStep).Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(ustate).Elem()})
if res[0].Interface() != nil {
log.Errorf("executing step: %+v", res[0].Interface().(error)) // TODO: propagate top level
return
atomic.StoreInt32(&fsm.busy, 0)
fsm.stageDone <- struct{}{}
上述代码主要是对 nextStep 对象进行反射操作,那么该对象是怎么出现的呢?它就是上述 planner 方法返回的第一个参数。下面看看 planner 到底是怎么运行的?
# lotus/extern/storage-sealing/fsm.go:19
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
next, processed, err := m.plan(events, user.(*SectorInfo))
if err != nil || next == nil {
return nil, processed, err
return func(ctx statemachine.Context, si SectorInfo) error {
err := next(ctx, si)
if err != nil {
log.Errorf("unhandled sector error (%d): %+v", si.SectorNumber, err)
return nil
return nil
}, processed, nil // TODO: This processed event count is not very correct
# lotus/extern/storage-sealing/fsm.go:274
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, uint64, error) {
switch state.State {
// Happy path
case Empty:
fallthrough
case WaitDeals:
return m.handleWaitDeals, processed, nil
case AddPiece:
return m.handleAddPiece, processed, nil
case Packing:
return m.handlePacking, processed, nil
case GetTicket:
return m.handleGetTicket, processed, nil
case PreCommit1:
return m.handlePreCommit1, processed, nil
case PreCommit2:
return m.handlePreCommit2, processed, nil
case PreCommitting:
return m.handlePreCommitting, processed, nil
case SubmitPreCommitBatch:
return m.handleSubmitPreCommitBatch, processed, nil
case PreCommitBatchWait:
fallthrough
case PreCommitWait:
return m.handlePreCommitWait, processed, nil
case WaitSeed:
return m.handleWaitSeed, processed, nil
case Committing:
return m.handleCommitting, processed, nil
case SubmitCommit:
return m.handleSubmitCommit, processed, nil
case SubmitCommitAggregate:
return m.handleSubmitCommitAggregate, processed, nil
case CommitAggregateWait:
fallthrough
case CommitWait:
return m.handleCommitWait, processed, nil
case CommitFinalize:
fallthrough
case FinalizeSector:
return m.handleFinalizeSector, processed, nil
// Handled failure modes
case AddPieceFailed:
return m.handleAddPieceFailed, processed, nil
case SealPreCommit1Failed:
return m.handleSealPrecommit1Failed, processed, nil
case SealPreCommit2Failed:
return m.handleSealPrecommit2Failed, processed, nil
case PreCommitFailed:
return m.handlePreCommitFailed, processed, nil
case ComputeProofFailed:
return m.handleComputeProofFailed, processed, nil
case CommitFailed:
return m.handleCommitFailed, processed, nil
case CommitFinalizeFailed:
fallthrough
case FinalizeFailed:
return m.handleFinalizeFailed, processed, nil
case PackingFailed: // DEPRECATED: remove this for the next reset
state.State = DealsExpired
fallthrough
case DealsExpired:
return m.handleDealsExpired, processed, nil
case RecoverDealIDs:
return wrapCtx(m.HandleRecoverDealIDs), processed, nil
// Post-seal
case Proving:
return m.handleProvingSector, processed, nil
case Terminating:
return m.handleTerminating, processed, nil
case TerminateWait:
return m.handleTerminateWait, processed, nil
case TerminateFinality:
return m.handleTerminateFinality, processed, nil
case TerminateFailed:
return m.handleTerminateFailed, processed, nil
case Removing:
return m.handleRemoving, processed, nil
case Removed:
return nil, processed, nil
case RemoveFailed:
return m.handleRemoveFailed, processed, nil
// Faults
case Faulty:
return m.handleFaulty, processed, nil
case FaultReported:
return m.handleFaultReported, processed, nil
// Fatal errors
case UndefinedSectorState:
log.Error("sector update with undefined state!")
case FailedUnrecoverable:
log.Errorf("sector %d failed unrecoverably", state.SectorNumber)
default:
log.Errorf("unexpected sector update state: %s", state.State)
return nil, processed, nil
planner 就会根据传递过来的事件返回对应的处理函数,然后进行状态改变。
内部的实现比较复杂,再进一步的逻辑就没再去看了。在状态机相关代码中定义了 var ErrTerminated = xerrors.New("normal shutdown of state machine")
。只会在 <- fsm.closed
或者进入到了最终状态才会返回,它意味着这是正常的状态机关闭。