Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus Metrics #71

Merged
merged 16 commits into from
May 10, 2024
11 changes: 10 additions & 1 deletion jetstream/core/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@
from jetstream.core.proto import jetstream_pb2_grpc
from jetstream.core.utils import async_multifuture
from jetstream.engine import engine_api
import numpy as np

import numpy as np
import prometheus_client

root = logging.getLogger()
root.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -209,6 +210,9 @@ class Driver:
# todo: remove jax_padding after all then engine migrate to np padding
_jax_padding = True

# Record metrics for prefill_backlog size
_prefill_backlog_size_metric: prometheus_client.Gauge

def __init__(
self,
prefill_engines: Optional[list[engine_api.Engine]] = None,
Expand Down Expand Up @@ -242,6 +246,8 @@ def __init__(
# Stage 1
# At first, a request is placed here in order to get prefilled.
self._prefill_backlog = queue.Queue()
self._prefill_backlog_size_metric = prometheus_client.Gauge("jetstream_prefill_backlog_size", "Size of prefill queue")

# Stage 2
# After prefilling, it is placed here in order to get transferred to
# one of the generate backlogs.
Expand Down Expand Up @@ -421,6 +427,7 @@ def place_request_on_prefill_queue(self, request: ActiveRequest):
"""Used to place new requests for prefilling and generation."""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the function "place_request_on_prefill_queue" used by the orchestrator to add requests to the prefill queue? @JoeZijunZhou

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asking bc I didn't see PetStream or other places is invoking this API

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is adding requests from JetStream's client to JetStream Orchestrator's prefill backlog. It doesn't relate to engines.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but where is the API used? I didn't find the usage... so I'm not sure whether we should record metrics here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which API? This is part of the workflow of the JetStream Decode API.

# Don't block so we can fail and shed load when the queue is full.
self._prefill_backlog.put(request, block=False)
self._prefill_backlog_size_metric.set(self._prefill_backlog.qsize())

def _load_cache_history(self, path: str) -> Union[None, Any]:
"""Loads previous kv cache for a longer conversation."""
Expand All @@ -442,6 +449,8 @@ def _prefill_thread(self, idx: int):
my_transfer_backlog = self._transfer_backlogs[idx]
# The prefill thread can just sleep until it has work to do.
request = self._prefill_backlog.get(block=True)
self._prefill_backlog_size_metric.set(self._prefill_backlog.qsize())
Copy link

@liurupeng liurupeng May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq, should we record the metrics in two places?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would assume this is the real prefill queue size during runtime @FanhaiLu1 right?

Copy link
Collaborator

@JoeZijunZhou JoeZijunZhou May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L430 logs the prefill backlog queue size after a request added to the queue; L452 logs the prefill backlog queue size after a request remove/get from the queue (to start prefill operation for the request).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then I would assume we only need to add it in one place right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, @Bslabe123 is trying to collect the prefill queue size metric as the first step?


if request is None:
break
# Tokenize, and introduce a leading dimension
Expand Down
9 changes: 8 additions & 1 deletion jetstream/core/server_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
from jetstream.core import orchestrator
from jetstream.core.proto import jetstream_pb2_grpc

from prometheus_client import start_http_server

_HOST = "[::]"

PROMETHEUS_PORT = 9090

class JetStreamServer:
"""JetStream grpc server."""
Expand Down Expand Up @@ -130,6 +131,12 @@ def run(
logging.info("Starting server on port %d with %d threads", port, threads)

jetstream_server.start()

logging.info("Starting Prometheus server on port %d", port)

# Setup Prometheus server
start_http_server(PROMETHEUS_PORT)

return jetstream_server


Expand Down
3 changes: 2 additions & 1 deletion requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ jax
jaxlib
numpy
portpicker
prometheus-client
pytest
seqio
tiktoken
blobfile
blobfile
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ pluggy==1.4.0
# via pytest
portpicker==1.6.0
# via -r requirements.in
prometheus-client==0.20.0
# via -r requirements.in
promise==2.3
# via tfds-nightly
protobuf==3.20.3
Expand Down
Loading