Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sync logic to manta agent #16

Merged
merged 4 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Dockerfile.agent
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ WORKDIR /
COPY --from=builder /workspace/manager .
USER 65532:65532

# Expose the http server.
EXPOSE 9090

ENTRYPOINT ["/manager"]
34 changes: 17 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
</p>

<h3 align="center">
A lightweight P2P-based cache system for model distributions.
A lightweight P2P-based cache system for model distributions on Kubernetes.
</h3>

[![stability-alpha](https://img.shields.io/badge/stability-alpha-f4d03f.svg)](/~https://github.com/mkenney/software-guides/blob/master/STABILITY-BADGES.md#alpha)
Expand All @@ -16,7 +16,7 @@ A lightweight P2P-based cache system for model distributions.
[GoReport Widget]: https://goreportcard.com/badge/github.com/inftyai/manta
[GoReport Status]: https://goreportcard.com/report/github.com/inftyai/manta

_Name Story: the inspiration of the name `Manta` is coming from Dota2, called [Manta Style](https://liquipedia.net/dota2/Manta_Style), which will create 2 images of your hero just like peers in the P2P network._
_Name Story: the inspiration of the name `Manta` is coming from Dota2, called [Manta Style](https://dota2.fandom.com/wiki/Manta_Style), which will create 2 images of your hero just like peers in the P2P network._


## Architecture
Expand All @@ -29,7 +29,7 @@ _Name Story: the inspiration of the name `Manta` is coming from Dota2, called [M

- **Preheat Models**: Models could be preloaded to the cluster, or even specified nodes to accelerate the model serving.
- **Model Caching**: Once models are downloaded, origin access is no longer necessary, but from other node peers.
- **Plug Framework**: _Filter_ and _Score_ extension points could be customized with plugins to pick the right peers.
- **Plugin Framework**: _Filter_ and _Score_ extension points could be extended with your own logic to pick up the best candidates in the form of plugin.
- **Model LCM**: Manage the model lifecycles automatically with different configurations.
- **Memory Management(WIP)**: Specify the maximum reserved memory for use, and GC with LRU algorithm.

Expand All @@ -39,9 +39,9 @@ _Name Story: the inspiration of the name `Manta` is coming from Dota2, called [M

Read the [Installation](./docs//installation.md) for guidance.

### Preload Models
### Preheat Models

A toy sample to preload the `Qwen/Qwen2-7B-Instruct` model:
A toy sample to preload the `Qwen/Qwen2.5-0.5B-Instruct` model:

```yaml
apiVersion: manta.io/v1alpha1
Expand All @@ -51,7 +51,7 @@ metadata:
spec:
replicas: 1
hub:
repoID: Qwen/Qwen2-7B-Instruct
repoID: Qwen/Qwen2.5-0.5B-Instruct
```

If you want to preload the model to specified nodes, use the `NodeSelector`:
Expand All @@ -64,11 +64,13 @@ metadata:
spec:
replicas: 1
hub:
repoID: Qwen/Qwen2-7B-Instruct
repoID: Qwen/Qwen2.5-0.5B-Instruct
nodeSelector:
zone: zone-a
```

### Delete Models

If you want to remove the model weights once `Torrent` is deleted, set the `ReclaimPolicy=Delete`, default to `Retain`:

```yaml
Expand All @@ -79,26 +81,24 @@ metadata:
spec:
replicas: 1
hub:
repoID: Qwen/Qwen2-7B-Instruct
nodeSelector:
zone: zone-a
repoID: Qwen/Qwen2.5-0.5B-Instruct
reclaimPolicy: Delete
```

More details refer to the [APIs](/~https://github.com/InftyAI/Manta/blob/main/api/v1alpha1/torrent_types.go).

## Roadmap

- GC policy with LRU algorithm
- Support GC policy with LRU algorithm
- More integrations with serving projects
- Support file chunking

## Contributions
## Community

Join us for more discussions:

🚀 All kinds of contributions are welcomed ! Please follow [CONTRIBUTING.md](./CONTRIBUTING.md).
* **Slack Channel**: [#manta](https://inftyai.slack.com/archives/C07SY8WS45U)

**🎉 Thanks to all these contributors !**
## Contributions

<a href="/~https://github.com/inftyai/manta/graphs/contributors">
<img src="https://contrib.rocks/image?repo=inftyai/manta" />
</a>
All kinds of contributions are welcomed ! Please following [CONTRIBUTING.md](./CONTRIBUTING.md).
13 changes: 13 additions & 0 deletions agent/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package main
import (
"context"
"os"
"os/signal"
"syscall"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand All @@ -30,6 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/inftyai/manta/agent/pkg/controller"
"github.com/inftyai/manta/agent/pkg/server"
"github.com/inftyai/manta/agent/pkg/task"
api "github.com/inftyai/manta/api/v1alpha1"
)
Expand Down Expand Up @@ -81,9 +84,19 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
cancel()
}()

// Background tasks.
task.BackgroundTasks(ctx, mgr.GetClient())

// Run http server to receive sync requests.
go server.Run(ctx)

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
Expand Down
6 changes: 4 additions & 2 deletions agent/deploy/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ spec:
labels:
app: manta-agent
spec:
hostNetwork: true
serviceAccountName: manta-agent
initContainers:
- name: init-permissions
Expand All @@ -24,9 +25,10 @@ spec:
mountPath: /workspace/models
containers:
- name: agent
image: inftyai/manta-agent:v0.0.1
# image: inftyai/manta-agent:v0.0.1
image: inftyai/test:manta-agent-110811
ports:
- containerPort: 8080
- containerPort: 9090
resources:
limits:
memory: 200Mi
Expand Down
1 change: 1 addition & 0 deletions agent/deploy/serviceaccount.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ apiVersion: v1
kind: ServiceAccount
metadata:
name: manta-agent
namespace: manta-system
5 changes: 3 additions & 2 deletions agent/pkg/controller/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"

agenthandler "github.com/inftyai/manta/agent/pkg/handler"
"github.com/inftyai/manta/agent/pkg/handler"
api "github.com/inftyai/manta/api/v1alpha1"
)

Expand Down Expand Up @@ -82,7 +82,8 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// This may take a long time, the concurrency is controlled by the MaxConcurrentReconciles.
if err := agenthandler.HandleReplication(ctx, replication); err != nil {
if err := handler.HandleReplication(ctx, replication); err != nil {
logger.Error(err, "error to handle replication", "Replication", klog.KObj(replication))
return ctrl.Result{}, err
} else {
if err := r.updateNodeTracker(ctx, replication); err != nil {
Expand Down
104 changes: 104 additions & 0 deletions agent/pkg/handler/chunk_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright 2024.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package handler

import (
"fmt"
"io"
"net/http"
"os"

"github.com/inftyai/manta/api"
)

const (
buffSize = 4 * 1024 * 1024 // 4MB buffer
)

// SendChunk will send the chunk content via http request.
func SendChunk(w http.ResponseWriter, r *http.Request) {
path := r.URL.Query().Get("path")

if path == "" {
http.Error(w, "path is required", http.StatusBadRequest)
return
}

file, err := os.Open(path)
if err != nil {
http.Error(w, "File not found", http.StatusNotFound)
return
}
defer func() {
_ = file.Close()
}()

buffer := make([]byte, buffSize)
for {
n, err := file.Read(buffer)
if err != nil {
if err == io.EOF {
break
} else {
fmt.Println("Error reading file")
http.Error(w, "Error reading file", http.StatusInternalServerError)
return
}
}

if n > 0 {
_, writeErr := w.Write(buffer[:n])
if writeErr != nil {
fmt.Println("Error writing to response:", writeErr)
http.Error(w, "Error writing to response", http.StatusInternalServerError)
return
}
}
}
}

func recvChunk(blobPath, snapshotPath, peerName string) error {
url := fmt.Sprintf("http://%s:%s/sync?path=%s", peerName, api.HttpPort, blobPath)

resp, err := http.Get(url)
if err != nil {
return err
}
defer func() {
_ = resp.Body.Close()
}()

// Use the same path for different peers.
file, err := os.Create(blobPath)
if err != nil {
return err
}
defer func() {
_ = file.Close()
}()

_, err = io.Copy(file, resp.Body)
if err != nil {
return err
}

if err := createSymlink(blobPath, snapshotPath); err != nil {
return err
}

return nil
}
69 changes: 69 additions & 0 deletions agent/pkg/handler/hf_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
Copyright 2024.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package handler

import (
"fmt"
"os"

"github.com/inftyai/manta/agent/pkg/util"
)

const (
maxAttempts = 10
)

// The downloadPath is the full path, like: /workspace/models/Qwen--Qwen2-7B-Instruct/blobs/20024bfe7c83998e9aeaf98a0cd6a2ce6306c2f0--0001
func downloadFromHF(modelID, revision, path string, downloadPath string) error {
// Example: "https://huggingface.co/Qwen/Qwen2.5-72B-Instruct/resolve/main/model-00031-of-00037.safetensors"
url := fmt.Sprintf("%s/%s/resolve/%s/%s", hfEndpoint(), modelID, revision, path)
token := hfToken()

attempts := 0
for {

attempts += 1

if err := util.DownloadFileWithResume(url, downloadPath, token); err != nil {
if attempts > maxAttempts {
return fmt.Errorf("reach maximum download attempts for %s, err: %v", downloadPath, err)
}
continue
}
break
}

return nil
}

func hfEndpoint() string {
hfEndpoint := "https://huggingface.co"
if endpoint := os.Getenv("HF_ENDPOINT"); endpoint != "" {
hfEndpoint = endpoint
}
return hfEndpoint
}

func hfToken() string {
if token := os.Getenv("HF_TOKEN"); token != "" {
return token
}
if token := os.Getenv("HUGGING_FACE_HUB_TOKEN"); token != "" {
return token
}
return ""
}
File renamed without changes.
Loading
Loading