Skip to content

Commit

Permalink
Merge pull request #14 from kerthcet/feat/agent
Browse files Browse the repository at this point in the history
Add e2e tests
  • Loading branch information
InftyAI-Agent authored Oct 24, 2024
2 parents d5655be + a91b8ea commit 912fe41
Show file tree
Hide file tree
Showing 20 changed files with 609 additions and 67 deletions.
12 changes: 12 additions & 0 deletions agent/deploy/clusterrole-binding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: manta-agent-binding
subjects:
- kind: ServiceAccount
name: manta-agent
namespace: manta-system
roleRef:
kind: ClusterRole
name: manta-agent-role
apiGroup: rbac.authorization.k8s.io
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: manta-agent
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
Expand Down Expand Up @@ -44,16 +39,3 @@ rules:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: manta-agent-binding
subjects:
- kind: ServiceAccount
name: manta-agent
namespace: manta-system
roleRef:
kind: ClusterRole
name: manta-agent-role
apiGroup: rbac.authorization.k8s.io
13 changes: 7 additions & 6 deletions agent/deploy/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ apiVersion: apps/v1
kind: DaemonSet
metadata:
name: manta-agent
namespace: manta-system
labels:
app: manta-agent
spec:
Expand All @@ -16,14 +17,14 @@ spec:
serviceAccountName: manta-agent
initContainers:
- name: init-permissions
image: busybox
image: busybox:1.28
command: ['sh', '-c', 'chown -R 1000:3000 /workspace/models && chmod -R 777 /workspace/models']
volumeMounts:
- name: model-volume
mountPath: /workspace/models
containers:
- name: agent
image: inftyai/manta-agent:1022-01
image: inftyai/test:manta-agent-102201
ports:
- containerPort: 8080
resources:
Expand All @@ -37,11 +38,11 @@ spec:
fieldRef:
fieldPath: spec.nodeName
# Set the nodeTracker.spec.sizeLimit.
- name: SIZE_LIMIT
value: "990Mi"
# - name: SIZE_LIMIT
# value: "990Mi"
# If you have GFW problem in china.
- name: HF_ENDPOINT
value: https://hf-mirror.com
# - name: HF_ENDPOINT
# value: https://hf-mirror.com
volumeMounts:
- name: model-volume
mountPath: /workspace/models
Expand Down
4 changes: 4 additions & 0 deletions agent/deploy/serviceaccount.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: manta-agent
5 changes: 3 additions & 2 deletions api/v1alpha1/torrent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
)

const (
TorrentNameLabelKey = "manta.io/torrent-name"
DefaultWorkspace = "/workspace/models/"
TorrentNameLabelKey = "manta.io/torrent-name"
TorrentProtectionFinalizer = "manta.io/torrent-protect"

HUGGINGFACE_MODEL_HUB = "Huggingface"
)

Expand Down
4 changes: 2 additions & 2 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: inftyai/manta
newTag: 1022-04
newName: inftyai/llmaz
newTag: main
1 change: 1 addition & 0 deletions hack/kind-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
27 changes: 25 additions & 2 deletions pkg/controller/torrent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -73,6 +74,21 @@ func (r *TorrentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

logger.Info("reconcile Torrent", "Torrent", klog.KObj(torrent))

if !torrent.DeletionTimestamp.IsZero() {
if *torrent.Spec.ReclaimPolicy == api.DeleteReclaimPolicy {
if err := r.dispatcher.CleanupReplications(ctx, torrent); err != nil {
return ctrl.Result{}, err
}
}
// Add a new condition once matched, remove the finalizer
if controllerutil.RemoveFinalizer(torrent, api.TorrentProtectionFinalizer) {
if err := r.Client.Update(ctx, torrent); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
}

if torrentReady(torrent) {
logger.Info("start to delete replications since torrent is ready", "Torrent", klog.KObj(torrent))

Expand All @@ -92,6 +108,13 @@ func (r *TorrentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}

if controllerutil.AddFinalizer(torrent, api.TorrentProtectionFinalizer) {
if err := r.Client.Update(ctx, torrent); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

// Handle Pending status.
if torrent.Status.Repo == nil {
_ = setTorrentCondition(torrent, nil)
Expand Down Expand Up @@ -156,11 +179,11 @@ func (r *TorrentReconciler) Create(e event.CreateEvent) bool {
return true
}

func (r *TorrentReconciler) Delete(e event.DeleteEvent) bool {
func (r *TorrentReconciler) Update(e event.UpdateEvent) bool {
return true
}

func (r *TorrentReconciler) Update(e event.UpdateEvent) bool {
func (r *TorrentReconciler) Delete(e event.DeleteEvent) bool {
return true
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
)

const (
localHost = "localhost://"
localHost = "localhost://"
defaultWorkspace = "/workspace/models/"
)

// DefaultDownloader helps to download the chunks.
Expand Down Expand Up @@ -137,6 +138,13 @@ func (d *Dispatcher) PrepareReplications(ctx context.Context, torrent *api.Torre
return replications, torrentStatusChanged, nil
}

func (d *Dispatcher) CleanupReplications(ctx context.Context, torrent *api.Torrent) (err error) {
if torrent.Status.Repo == nil {
return nil
}
return nil
}

func (d *Dispatcher) syncChunk() (replications []*api.Replication, err error) {
return nil, fmt.Errorf("not implemented")
}
Expand Down Expand Up @@ -247,7 +255,7 @@ func buildReplication(torrent *api.Torrent, chunk framework.ChunkInfo, index int
},
},
Destination: &api.Target{
URI: ptr.To[string](localHost + api.DefaultWorkspace + repoName + "/blobs/" + chunk.Name),
URI: ptr.To[string](localHost + defaultWorkspace + repoName + "/blobs/" + chunk.Name),
},
SizeBytes: chunk.Size,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/dispatcher/plugins/diskaware/diskaware.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (ds *DiskAware) Score(ctx context.Context, chunk framework.ChunkInfo, nodeT
}

sizeLimit := sizeLimit(nodeTracker)
return (1 - float32(totalSize)/float32(sizeLimit)) * 100
return (1 - float32(totalSize+chunk.Size)/float32(sizeLimit)) * 100
}

func sizeLimit(nt api.NodeTracker) int64 {
Expand Down
169 changes: 169 additions & 0 deletions pkg/dispatcher/plugins/diskaware/diskaware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package diskaware

import (
"context"
"math"
"testing"

"github.com/google/go-cmp/cmp"
api "github.com/inftyai/manta/api/v1alpha1"
"github.com/inftyai/manta/pkg/dispatcher/cache"
"github.com/inftyai/manta/pkg/dispatcher/framework"
"github.com/inftyai/manta/test/util/wrapper"
)

func TestFilter(t *testing.T) {
testCases := []struct {
name string
chunk framework.ChunkInfo
nodeTracker api.NodeTracker
cache func() *cache.Cache
wantStatus framework.Status
}{
{
name: "small chunk size with empty cache",
chunk: framework.ChunkInfo{
Name: "chunk1",
Size: 512,
},
nodeTracker: *wrapper.MakeNodeTracker("node1").SizeLimit("10Mi").Obj(),
cache: func() *cache.Cache { return cache.NewCache().Snapshot() },
wantStatus: framework.Status{Code: framework.SuccessStatus},
},
{
name: "small chunk size with not empty cache",
chunk: framework.ChunkInfo{
Name: "chunk1",
Size: 512,
},
nodeTracker: *wrapper.MakeNodeTracker("node1").SizeLimit("10Mi").Obj(),
cache: func() *cache.Cache {
c := cache.NewCache()
c.AddChunks([]api.ChunkTracker{
{
ChunkName: "chunk1",
SizeBytes: 1 * 1024 * 1024, // 1Mi
},
}, "node1")

return c.Snapshot()
},
wantStatus: framework.Status{Code: framework.SuccessStatus},
},
{
name: "big chunk size with cache",
chunk: framework.ChunkInfo{
Name: "chunk1",
Size: 9 * 1024 * 1024,
},
nodeTracker: *wrapper.MakeNodeTracker("node1").SizeLimit("10Mi").Obj(),
cache: func() *cache.Cache {
c := cache.NewCache()
c.AddChunks([]api.ChunkTracker{
{
ChunkName: "chunk1",
SizeBytes: 1*1024*1024 + 1, // 1Mi + 1 Bytes
},
}, "node1")

return c.Snapshot()
},
wantStatus: framework.Status{Code: framework.UnschedulableStatus},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

plugin, err := New()
if err != nil {
t.Errorf("failed to construct plugin: %v", err)
}

ns := plugin.(*DiskAware)

gotStatus := ns.Filter(ctx, tc.chunk, tc.nodeTracker, tc.cache())
if diff := cmp.Diff(gotStatus, tc.wantStatus); diff != "" {
t.Errorf("unexpected status, diff: %v", diff)
}
})
}
}

func TestScore(t *testing.T) {
testCases := []struct {
name string
chunk framework.ChunkInfo
nodeTracker api.NodeTracker
cache func() *cache.Cache
wantScore float32
}{
{
name: "empty cache",
chunk: framework.ChunkInfo{
Name: "chunk1",
Size: 512,
},
nodeTracker: *wrapper.MakeNodeTracker("node1").SizeLimit("2Mi").Obj(),
cache: func() *cache.Cache { return cache.NewCache().Snapshot() },
wantScore: 99.98,
},
{
name: "non empty cache",
chunk: framework.ChunkInfo{
Name: "chunk1",
Size: 512,
},
nodeTracker: *wrapper.MakeNodeTracker("node1").SizeLimit("2Mi").Obj(),
cache: func() *cache.Cache {
c := cache.NewCache()
c.AddChunks([]api.ChunkTracker{
{
ChunkName: "chunk1",
SizeBytes: 1 * 1024 * 1024, // 1Mi
},
}, "node1")

return c.Snapshot()
},
wantScore: 49.98,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

plugin, err := New()
if err != nil {
t.Errorf("failed to construct plugin: %v", err)
}

ns := plugin.(*DiskAware)

gotScore := ns.Score(ctx, tc.chunk, tc.nodeTracker, tc.cache())
if math.Abs(float64(gotScore-tc.wantScore)) > 0.01 {
t.Errorf("unexpected score, want %v, got %v", tc.wantScore, gotScore)
}
})
}
}
2 changes: 1 addition & 1 deletion pkg/dispatcher/plugins/nodeselector/nodeselector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (ns *NodeSelector) Name() string {
return "NodeSelector"
}

func (ns *NodeSelector) Filter(ctx context.Context, chunk framework.ChunkInfo, nodeTracker api.NodeTracker, cache *cache.Cache) framework.Status {
func (ns *NodeSelector) Filter(ctx context.Context, chunk framework.ChunkInfo, nodeTracker api.NodeTracker, _ *cache.Cache) framework.Status {
// In a big cluster, this is really headache maybe we should have a preFilter extension point.
for k, v := range chunk.NodeSelector {
value, ok := nodeTracker.Labels[k]
Expand Down
Loading

0 comments on commit 912fe41

Please sign in to comment.