Skip to content

Commit

Permalink
Merge pull request #15 from kerthcet/feat/agent
Browse files Browse the repository at this point in the history
Delete chunks when reclaimPolicy is Deleted
  • Loading branch information
InftyAI-Agent authored Oct 28, 2024
2 parents 912fe41 + 878fda9 commit 76fcad8
Show file tree
Hide file tree
Showing 34 changed files with 902 additions and 262 deletions.
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -215,19 +215,19 @@ uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified
$(KUSTOMIZE) build config/crd | $(KUBECTL) delete --ignore-not-found=$(ignore-not-found) -f -

.PHONY: deploy
deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in ~/.kube/config.
deploy: manifests kustomize deploy-agent ## Deploy controller to the K8s cluster specified in ~/.kube/config.
cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG}
$(KUSTOMIZE) build config/default | $(KUBECTL) apply --server-side --force-conflicts -f -

# This is only used in local development with kind.
.PHONY: quick-deploy
quick-deploy: manifests kustomize kind-image-build ## Deploy controller to the K8s cluster specified in ~/.kube/config.
quick-deploy: manifests kustomize kind-image-build deploy-agent ## Deploy controller to the K8s cluster specified in ~/.kube/config.
cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG}
$(KUSTOMIZE) build config/default | $(KUBECTL) apply --server-side --force-conflicts -f -
kind load docker-image ${IMG}

.PHONY: undeploy
undeploy: ## Undeploy controller from the K8s cluster specified in ~/.kube/config. Call with ignore-not-found=true to ignore resource not found errors during deletion.
undeploy: undeploy-agent ## Undeploy controller from the K8s cluster specified in ~/.kube/config. Call with ignore-not-found=true to ignore resource not found errors during deletion.
$(KUSTOMIZE) build config/default | $(KUBECTL) delete --ignore-not-found=$(ignore-not-found) -f -

##@ Build Dependencies
Expand Down Expand Up @@ -277,6 +277,9 @@ artifacts: kustomize
$(KUSTOMIZE) build config/default -o artifacts/manifests.yaml
@$(call clean-manifests)

# Merge all the yamls, inclduing the agent.
./hack/merge-yaml.sh

HELMIFY ?= $(LOCALBIN)/helmify

.PHONY: helmify
Expand Down
78 changes: 78 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,81 @@ _Name Story: the inspiration of the name `Manta` is coming from Dota2, called [M
![architecture](./docs/assets/arch.png)

> Note: [llmaz](/~https://github.com/InftyAI/llmaz) is just one kind of integrations, **Manta** can be deployed and used independently.
## Features Overview

- **Preheat Models**: Models could be preloaded to the cluster, or even specified nodes to accelerate the model serving.
- **Model Caching**: Once models are downloaded, origin access is no longer necessary, but from other node peers.
- **Plug Framework**: _Filter_ and _Score_ extension points could be customized with plugins to pick the right peers.
- **Model LCM**: Manage the model lifecycles automatically with different configurations.
- **Memory Management(WIP)**: Specify the maximum reserved memory for use, and GC with LRU algorithm.

## Quick Start

### Installation

Read the [Installation](./docs//installation.md) for guidance.

### Preload Models

A toy sample to preload the `Qwen/Qwen2-7B-Instruct` model:

```yaml
apiVersion: manta.io/v1alpha1
kind: Torrent
metadata:
name: torrent-sample
spec:
replicas: 1
hub:
repoID: Qwen/Qwen2-7B-Instruct
```
If you want to preload the model to specified nodes, use the `NodeSelector`:

```yaml
apiVersion: manta.io/v1alpha1
kind: Torrent
metadata:
name: torrent-sample
spec:
replicas: 1
hub:
repoID: Qwen/Qwen2-7B-Instruct
nodeSelector:
zone: zone-a
```

If you want to remove the model weights once `Torrent` is deleted, set the `ReclaimPolicy=Delete`, default to `Retain`:

```yaml
apiVersion: manta.io/v1alpha1
kind: Torrent
metadata:
name: torrent-sample
spec:
replicas: 1
hub:
repoID: Qwen/Qwen2-7B-Instruct
nodeSelector:
zone: zone-a
reclaimPolicy: Delete
```

More details refer to the [APIs](/~https://github.com/InftyAI/Manta/blob/main/api/v1alpha1/torrent_types.go).

## Roadmap

- GC policy with LRU algorithm
- More integrations with serving projects
- Support file chunking

## Contributions

🚀 All kinds of contributions are welcomed ! Please follow [CONTRIBUTING.md](./CONTRIBUTING.md).

**🎉 Thanks to all these contributors !**

<a href="/~https://github.com/inftyai/manta/graphs/contributors">
<img src="https://contrib.rocks/image?repo=inftyai/manta" />
</a>
2 changes: 1 addition & 1 deletion agent/deploy/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ spec:
mountPath: /workspace/models
containers:
- name: agent
image: inftyai/test:manta-agent-102201
image: inftyai/manta-agent:v0.0.1
ports:
- containerPort: 8080
resources:
Expand Down
49 changes: 32 additions & 17 deletions agent/pkg/controller/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,34 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Filter out unrelated events.
if replication.Spec.NodeName != NODE_NAME ||
replicationReady(replication) ||
// Waiting for the control plane set the Pending status.
len(replication.Status.Conditions) == 0 {
logger.V(10).Info("Skip replication", "Replication", klog.KObj(replication))
return ctrl.Result{}, nil
}

logger.Info("Reconcile replication", "Replication", klog.KObj(replication))

if conditionChanged := setReplicationCondition(replication, api.DownloadConditionType); conditionChanged {
conditionType := api.DownloadConditionType
if replication.Spec.Destination == nil {
conditionType = api.ReclaimingConditionType
}
if conditionChanged := setReplicationCondition(replication, conditionType); conditionChanged {
return ctrl.Result{}, r.Status().Update(ctx, replication)
}

// This may take a long time, the concurrency is controlled by the MaxConcurrentReconciles.
if err := agenthandler.HandleReplication(logger, replication); err != nil {
if err := agenthandler.HandleReplication(ctx, replication); err != nil {
return ctrl.Result{}, err
} else {
if err := r.updateNodeTracker(ctx, replication); err != nil {
return ctrl.Result{}, err
}
if conditionChanged := setReplicationCondition(replication, api.ReadyConditionType); conditionChanged {
if err := r.Status().Update(ctx, replication); err != nil {
return ctrl.Result{}, err
}
}
if err := r.updateNodeTracker(ctx, replication); err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
Expand All @@ -100,20 +105,30 @@ func (r *ReplicationReconciler) updateNodeTracker(ctx context.Context, replicati
}

chunkName := replication.Spec.ChunkName
for _, chunk := range nodeTracker.Spec.Chunks {
if chunk.ChunkName == chunkName {
// Already included.
return nil

if replication.Spec.Destination == nil {
// Delete chunk
for i, chunk := range nodeTracker.Spec.Chunks {
if chunk.ChunkName == chunkName {
nodeTracker.Spec.Chunks = append(nodeTracker.Spec.Chunks[0:i], nodeTracker.Spec.Chunks[i+1:len(nodeTracker.Spec.Chunks)]...)
break
}
}
} else {
// Add chunk
for _, chunk := range nodeTracker.Spec.Chunks {
if chunk.ChunkName == chunkName {
// Already included.
return nil
}
}
nodeTracker.Spec.Chunks = append(nodeTracker.Spec.Chunks, api.ChunkTracker{
ChunkName: chunkName,
SizeBytes: replication.Spec.SizeBytes,
})
}
nodeTracker.Spec.Chunks = append(nodeTracker.Spec.Chunks, api.ChunkTracker{
ChunkName: chunkName,
SizeBytes: replication.Spec.SizeBytes,
})
if err := r.Client.Update(ctx, nodeTracker); err != nil {
return err
}
return nil

return r.Client.Update(ctx, nodeTracker)
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
77 changes: 61 additions & 16 deletions agent/pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,42 @@ limitations under the License.
package handler

import (
"context"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/go-logr/logr"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/log"

api "github.com/inftyai/manta/api/v1alpha1"
)

// This only happens when replication not ready.
func HandleReplication(logger logr.Logger, replication *api.Replication) error {
func HandleReplication(ctx context.Context, replication *api.Replication) error {
// If destination is nil, the address must not be localhost.
if replication.Spec.Destination == nil {
// TODO: Delete OP
return nil
return deleteChunk(ctx, replication)
}

var localPath, revision, filename, targetPath string
if replication.Spec.Source.Hub != nil {
return downloadChunk(ctx, replication)
}
return nil
}

func downloadChunk(ctx context.Context, replication *api.Replication) error {
logger := log.FromContext(ctx)

var blobPath, revision, filename, targetPath string

// If modelHub != nil, it must be download to the localhost.
if replication.Spec.Source.ModelHub != nil {
_, localPath = parseURI(*replication.Spec.Destination.URI)
revision = *replication.Spec.Source.ModelHub.Revision
filename = *replication.Spec.Source.ModelHub.Filename
splits := strings.Split(localPath, "/blobs/")
// If hub != nil, it must be download to the localhost.
if replication.Spec.Source.Hub != nil {
_, blobPath = parseURI(*replication.Spec.Destination.URI)
revision = *replication.Spec.Source.Hub.Revision
filename = *replication.Spec.Source.Hub.Filename
splits := strings.Split(blobPath, "/blobs/")
targetPath = splits[0] + "/snapshots/" + revision + "/" + filename

// symlink exists means already downloaded.
Expand All @@ -51,9 +61,9 @@ func HandleReplication(logger logr.Logger, replication *api.Replication) error {
return nil
}

if *replication.Spec.Source.ModelHub.Name == api.HUGGINGFACE_MODEL_HUB {
if *replication.Spec.Source.Hub.Name == api.HUGGINGFACE_MODEL_HUB {
logger.Info("Start to download file from Huggingface Hub", "file", filename)
if err := downloadFromHF(replication.Spec.Source.ModelHub.ModelID, revision, filename, localPath); err != nil {
if err := downloadFromHF(replication.Spec.Source.Hub.RepoID, revision, filename, blobPath); err != nil {
return err
}
// TODO: handle modelScope
Expand All @@ -65,7 +75,7 @@ func HandleReplication(logger logr.Logger, replication *api.Replication) error {
// symlink can helps to validate the file is downloaded successfully.
// TODO: once we support split a file to several chunks, the targetPath should be
// changed here, such as targetPath-0001.
if err := createSymlink(localPath, targetPath); err != nil {
if err := createSymlink(blobPath, targetPath); err != nil {
logger.Error(err, "failed to create symlink")
return err
}
Expand All @@ -74,8 +84,19 @@ func HandleReplication(logger logr.Logger, replication *api.Replication) error {
return nil
}

// localPath looks like: /workspace/models/Qwen--Qwen2-0.5B-Instruct-GGUF/blobs/8b08b8632419bd6d7369362945b5976c7f47b1c1--0001
// symlink file looks like: /mnt/models/Qwen--Qwen2-0.5B-Instruct-GGUF/snapshots/main/qwen2-0_5b-instruct-q5_k_m.gguf
func deleteChunk(ctx context.Context, replication *api.Replication) error {
logger := log.FromContext(ctx)
logger.Info("try to delete chunk", "Replication", replication.Name, "chunk", replication.Spec.ChunkName)
splits := strings.Split(*replication.Spec.Source.URI, "://")
if err := deleteSymlinkAndTarget(splits[1]); err != nil {
logger.Error(err, "failed to delete chunk", "Replication", klog.KObj(replication), "chunk", replication.Spec.ChunkName)
}
return nil
}

// local(real) file looks like: /workspace/models/Qwen--Qwen2-0.5B-Instruct-GGUF/blobs/8b08b8632419bd6d7369362945b5976c7f47b1c1--0001
// target file locates at /workspace/models/Qwen--Qwen2-0.5B-Instruct-GGUF/snapshots/main/qwen2-0_5b-instruct-q5_k_m.gguf
// the symlink of target file looks like ../../blobs/8b08b8632419bd6d7369362945b5976c7f47b1c1--0001
func createSymlink(localPath, targetPath string) error {
dir := filepath.Dir(targetPath)
err := os.MkdirAll(dir, 0755)
Expand Down Expand Up @@ -103,6 +124,30 @@ func createSymlink(localPath, targetPath string) error {
return os.Symlink(sourcePath, targetPath)
}

func deleteSymlinkAndTarget(symlinkPath string) error {
targetPath, err := filepath.EvalSymlinks(symlinkPath)
if err != nil {
return fmt.Errorf("failed to read symlink: %v", err)
}

if err := os.Remove(symlinkPath); err != nil {
return fmt.Errorf("failed to remove symlink: %v", err)
}

if _, err := os.Stat(targetPath); err == nil {
if err := os.Remove(targetPath); err != nil {
return fmt.Errorf("failed to remove target file: %v", err)
}
fmt.Printf("Target file %s removed.\n", targetPath)
} else if os.IsNotExist(err) {
fmt.Printf("Target file %s does not exist.\n", targetPath)
} else {
return fmt.Errorf("failed to check target file: %v", err)
}

return nil
}

func parseURI(uri string) (host string, address string) {
splits := strings.Split(uri, "://")
return splits[0], splits[1]
Expand Down
Loading

0 comments on commit 76fcad8

Please sign in to comment.