Skip to content

Commit

Permalink
feat: add fsm for persistent cache peer (#3563)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Oct 9, 2024
1 parent 8b6e40f commit 1198c98
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 3 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/compatibility-e2e-v2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ jobs:
include:
- module: manager
image: manager
image-tag: v2.1.58
image-tag: v2.1.60
chart-name: manager
- module: scheduler
image: scheduler
image-tag: v2.1.58
image-tag: v2.1.60
chart-name: scheduler
- module: client
image: client
Expand Down
134 changes: 134 additions & 0 deletions scheduler/resource/persistentcache/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2024 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package persistentcache

import (
"context"
"time"

"github.com/bits-and-blooms/bitset"
"github.com/looplab/fsm"

logger "d7y.io/dragonfly/v2/internal/dflog"
)

const (
// Peer has been created but did not start running.
PeerStatePending = "Pending"

// Peer successfully registered and perpared to download.
PeerStateReceived = "Received"

// Peer is downloading resources from peer.
PeerStateRunning = "Running"

// Peer has been downloaded successfully.
PeerStateSucceeded = "Succeeded"

// Peer has been downloaded failed.
PeerStateFailed = "Failed"
)

const (
// Peer is registered and perpared to download.
PeerEventRegister = "Register"

// Peer is downloading.
PeerEventDownload = "Download"

// Peer downloaded successfully.
PeerEventDownloadSucceeded = "DownloadSucceeded"

// Peer downloaded failed.
PeerEventDownloadFailed = "DownloadFailed"
)

// Peer contains content for persistent cache peer.
type Peer struct {
// ID is persistent cache peer id.
ID string

// Pieces is finished pieces bitset.
FinishedPieces *bitset.BitSet

// Persistent cache peer state machine.
FSM *fsm.FSM

// Task is persistent cache task.
Task *Task

// Host is the peer host.
Host *Host

// BlockParents is bad parents ids.
BlockParents []string

// Cost is the cost of downloading.
Cost time.Duration

// CreatedAt is persistent cache peer create time.
CreatedAt time.Time

// UpdatedAt is persistent cache peer update time.
UpdatedAt time.Time

// Persistent cache peer log.
Log *logger.SugaredLoggerOnWith
}

// New persistent cache peer instance.
func NewPeer(id, state string, finishedPieces *bitset.BitSet, blockParents []string, task *Task, host *Host,
cost time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith) *Peer {
p := &Peer{
ID: id,
FinishedPieces: finishedPieces,
Task: task,
Host: host,
BlockParents: blockParents,
Cost: cost,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
Log: logger.WithPeer(host.ID, task.ID, id),
}

// Initialize state machine.
p.FSM = fsm.NewFSM(
PeerStatePending,
fsm.Events{
{Name: PeerEventRegister, Src: []string{PeerStatePending}, Dst: PeerStateReceived},
{Name: PeerEventDownload, Src: []string{PeerStateReceived}, Dst: PeerStateRunning},
{Name: PeerEventDownloadSucceeded, Src: []string{PeerStateRunning}, Dst: PeerStateSucceeded},
{Name: PeerEventDownloadFailed, Src: []string{PeerStateRunning}, Dst: PeerStateFailed},
},
fsm.Callbacks{
PeerEventRegister: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownload: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownloadSucceeded: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownloadFailed: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
},
)

return p
}
2 changes: 1 addition & 1 deletion scheduler/resource/standard/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ type Peer struct {
// Used only in v2 version of the grpc.
AnnouncePeerStream *atomic.Value

// Task state machine.
// Peer state machine.
FSM *fsm.FSM

// Task is peer task.
Expand Down

0 comments on commit 1198c98

Please sign in to comment.