From 8cb2c753dfb48bb970af562ec8b76ab909850621 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 3 May 2024 16:15:13 +0000 Subject: [PATCH 01/14] initial commit --- jetstream/core/orchestrator.py | 11 ++++++++++- jetstream/core/server_lib.py | 9 ++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/jetstream/core/orchestrator.py b/jetstream/core/orchestrator.py index 9d290af6..00d62237 100644 --- a/jetstream/core/orchestrator.py +++ b/jetstream/core/orchestrator.py @@ -93,8 +93,9 @@ from jetstream.core.proto import jetstream_pb2_grpc from jetstream.core.utils import async_multifuture from jetstream.engine import engine_api -import numpy as np +import numpy as np +import prometheus_client root = logging.getLogger() root.setLevel(logging.DEBUG) @@ -203,6 +204,9 @@ class Driver: # todo: remove jax_padding after all then engine migrate to np padding _jax_padding = True + # Record metrics for prefill_backlog size + _prefill_backlog_size_metric: prometheus_client.Gauge + def __init__( self, prefill_engines: Optional[list[engine_api.Engine]] = None, @@ -233,6 +237,8 @@ def __init__( # Stage 1 # At first, a request is placed here in order to get prefilled. self._prefill_backlog = queue.Queue() + self._prefill_backlog_size_metric = prometheus_client.Gauge(self._prefill_backlog.qsize()) + # _ready_to_prefill event will block the prefill thread until there is # available decode slot to insert the prefill result. self._ready_to_prefill = threading.Event() @@ -382,6 +388,7 @@ def place_request_on_prefill_queue(self, request: ActiveRequest): """Used to place new requests for prefilling and generation.""" # Don't block so we can fail and shed load when the queue is full. self._prefill_backlog.put(request, block=False) + self._prefill_backlog_size_metric.set(self._prefill_backlog.qsize()) def _load_cache_history(self, path: str) -> Union[None, Any]: """Loads previous kv cache for a longer conversation.""" @@ -414,6 +421,8 @@ def _prefill_thread(self, idx: int): ) # The prefill thread can just sleep until it has work to do. request = self._prefill_backlog.get(block=True) + self._prefill_backlog_size_metric.set(self._prefill_backlog.qsize()) + if request is None: break # TODO: Implement hot/cold cache for history. diff --git a/jetstream/core/server_lib.py b/jetstream/core/server_lib.py index 8911b9f6..08aa02ed 100644 --- a/jetstream/core/server_lib.py +++ b/jetstream/core/server_lib.py @@ -29,9 +29,10 @@ from jetstream.core import orchestrator from jetstream.core.proto import jetstream_pb2_grpc +from prometheus_client import start_http_server _HOST = "[::]" - +PROMETHEUS_PORT = 9090 class JetStreamServer: """JetStream grpc server.""" @@ -126,6 +127,12 @@ def run( logging.info("Starting server on port %d with %d threads", port, threads) jetstream_server.start() + + logging.info("Starting Prometheus server on port %d", port) + + # Setup Prometheus server + start_http_server(PROMETHEUS_PORT) + return jetstream_server From 54bbf2cbc51eddabae3bec3a50dae827c8bc4f36 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 3 May 2024 21:05:54 +0000 Subject: [PATCH 02/14] updated requirements.txt --- requirements.in | 3 ++- requirements.txt | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/requirements.in b/requirements.in index bc5ba8fc..6f4ebb60 100644 --- a/requirements.in +++ b/requirements.in @@ -6,7 +6,8 @@ jax jaxlib numpy portpicker +prometheus-client pytest seqio tiktoken -blobfile \ No newline at end of file +blobfile diff --git a/requirements.txt b/requirements.txt index eb88cba9..10f73252 100644 --- a/requirements.txt +++ b/requirements.txt @@ -177,6 +177,8 @@ pluggy==1.4.0 # via pytest portpicker==1.6.0 # via -r requirements.in +prometheus-client==0.20.0 + # via -r requirements.in promise==2.3 # via tfds-nightly protobuf==3.20.3 From 2db32dffec61ce791be0626e2bf5ae9a4db74480 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 3 May 2024 21:39:28 +0000 Subject: [PATCH 03/14] correct parameters for Gauge --- jetstream/core/orchestrator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetstream/core/orchestrator.py b/jetstream/core/orchestrator.py index cdfc068a..f0e21b36 100644 --- a/jetstream/core/orchestrator.py +++ b/jetstream/core/orchestrator.py @@ -246,7 +246,7 @@ def __init__( # Stage 1 # At first, a request is placed here in order to get prefilled. self._prefill_backlog = queue.Queue() - self._prefill_backlog_size_metric = prometheus_client.Gauge(self._prefill_backlog.qsize()) + self._prefill_backlog_size_metric = prometheus_client.Gauge("jetstream_prefil_backlog_size", "Size of prefill queue") # Stage 2 # After prefilling, it is placed here in order to get transferred to From 9f0bddf09d0e2076872baa384dee986cd204d909 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 3 May 2024 15:29:53 -0700 Subject: [PATCH 04/14] Update orchestrator.py --- jetstream/core/orchestrator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetstream/core/orchestrator.py b/jetstream/core/orchestrator.py index f0e21b36..efb93993 100644 --- a/jetstream/core/orchestrator.py +++ b/jetstream/core/orchestrator.py @@ -246,7 +246,7 @@ def __init__( # Stage 1 # At first, a request is placed here in order to get prefilled. self._prefill_backlog = queue.Queue() - self._prefill_backlog_size_metric = prometheus_client.Gauge("jetstream_prefil_backlog_size", "Size of prefill queue") + self._prefill_backlog_size_metric = prometheus_client.Gauge("jetstream_prefill_backlog_size", "Size of prefill queue") # Stage 2 # After prefilling, it is placed here in order to get transferred to From 28afc73f8b75a1232ed02a04dd9ff0a263b1a6fe Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Mon, 6 May 2024 21:22:23 -0700 Subject: [PATCH 05/14] Update orchestrator.py --- jetstream/core/orchestrator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jetstream/core/orchestrator.py b/jetstream/core/orchestrator.py index efb93993..3ea989f0 100644 --- a/jetstream/core/orchestrator.py +++ b/jetstream/core/orchestrator.py @@ -246,7 +246,8 @@ def __init__( # Stage 1 # At first, a request is placed here in order to get prefilled. self._prefill_backlog = queue.Queue() - self._prefill_backlog_size_metric = prometheus_client.Gauge("jetstream_prefill_backlog_size", "Size of prefill queue") + self._prefill_backlog_size_metric = prometheus_client.Gauge( + "jetstream_prefill_backlog_size", "Size of prefill queue") # Stage 2 # After prefilling, it is placed here in order to get transferred to From b3a0b79f0454a25ddeb316b92c76fe9f324e4a1f Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Mon, 6 May 2024 21:22:37 -0700 Subject: [PATCH 06/14] Update server_lib.py --- jetstream/core/server_lib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetstream/core/server_lib.py b/jetstream/core/server_lib.py index 3cc242ae..8664cf63 100644 --- a/jetstream/core/server_lib.py +++ b/jetstream/core/server_lib.py @@ -136,7 +136,7 @@ def run( # Setup Prometheus server start_http_server(PROMETHEUS_PORT) - + return jetstream_server From 12b706d894f7fd3cd9742619ed5788dcd7a7444c Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Tue, 7 May 2024 15:45:25 -0700 Subject: [PATCH 07/14] Fix prometheus client port collision --- jetstream/core/server_lib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetstream/core/server_lib.py b/jetstream/core/server_lib.py index 8664cf63..cd32e081 100644 --- a/jetstream/core/server_lib.py +++ b/jetstream/core/server_lib.py @@ -32,7 +32,7 @@ from prometheus_client import start_http_server _HOST = "[::]" -PROMETHEUS_PORT = 9090 +PROMETHEUS_PORT = 9100 class JetStreamServer: """JetStream grpc server.""" From 16eb68925a5da1a8dc8d6ee084fde856aaaec2ea Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 9 May 2024 16:23:53 +0000 Subject: [PATCH 08/14] prometheus port 9100 -> 9090 --- .gitignore | 3 ++- jetstream/core/server_lib.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 6de8849a..a13d13c3 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ google_jetstream.egg-info/ data/ logs/ tmp/ -venv/ \ No newline at end of file +venv/ +.vscode/ diff --git a/jetstream/core/server_lib.py b/jetstream/core/server_lib.py index cd32e081..8664cf63 100644 --- a/jetstream/core/server_lib.py +++ b/jetstream/core/server_lib.py @@ -32,7 +32,7 @@ from prometheus_client import start_http_server _HOST = "[::]" -PROMETHEUS_PORT = 9100 +PROMETHEUS_PORT = 9090 class JetStreamServer: """JetStream grpc server.""" From a3687b7867948a3f304998b528b6e2987fd293d4 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 9 May 2024 21:01:27 +0000 Subject: [PATCH 09/14] Pyink formatting --- jetstream/core/orchestrator.py | 3 ++- jetstream/core/server_lib.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/jetstream/core/orchestrator.py b/jetstream/core/orchestrator.py index 3ea989f0..34b45d8e 100644 --- a/jetstream/core/orchestrator.py +++ b/jetstream/core/orchestrator.py @@ -247,7 +247,8 @@ def __init__( # At first, a request is placed here in order to get prefilled. self._prefill_backlog = queue.Queue() self._prefill_backlog_size_metric = prometheus_client.Gauge( - "jetstream_prefill_backlog_size", "Size of prefill queue") + "jetstream_prefill_backlog_size", "Size of prefill queue" + ) # Stage 2 # After prefilling, it is placed here in order to get transferred to diff --git a/jetstream/core/server_lib.py b/jetstream/core/server_lib.py index 8664cf63..8935f36f 100644 --- a/jetstream/core/server_lib.py +++ b/jetstream/core/server_lib.py @@ -34,6 +34,7 @@ _HOST = "[::]" PROMETHEUS_PORT = 9090 + class JetStreamServer: """JetStream grpc server.""" From 1b1a534d97fa5d5c2a295a67643e3118d1a5fea9 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 9 May 2024 22:03:35 +0000 Subject: [PATCH 10/14] Addressed above comment --- jetstream/core/server_lib.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/jetstream/core/server_lib.py b/jetstream/core/server_lib.py index 8935f36f..8a87afa4 100644 --- a/jetstream/core/server_lib.py +++ b/jetstream/core/server_lib.py @@ -20,6 +20,7 @@ import asyncio from concurrent import futures import logging +import os import threading from typing import Any, Type @@ -32,7 +33,7 @@ from prometheus_client import start_http_server _HOST = "[::]" -PROMETHEUS_PORT = 9090 +PROMETHEUS_ENABLED_ON_PORT = 9090 class JetStreamServer: @@ -136,7 +137,8 @@ def run( logging.info("Starting Prometheus server on port %d", port) # Setup Prometheus server - start_http_server(PROMETHEUS_PORT) + if "PROMETHEUS_ENABLED_ON_PORT" in os.environ: + start_http_server(PROMETHEUS_ENABLED_ON_PORT) return jetstream_server From 9c2bb15861c5140f0f13a94bced8b7e23375c4f7 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 9 May 2024 23:17:08 +0000 Subject: [PATCH 11/14] logging in wrong place --- jetstream/core/server_lib.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/jetstream/core/server_lib.py b/jetstream/core/server_lib.py index 8a87afa4..f9d366d9 100644 --- a/jetstream/core/server_lib.py +++ b/jetstream/core/server_lib.py @@ -134,10 +134,11 @@ def run( jetstream_server.start() - logging.info("Starting Prometheus server on port %d", port) - # Setup Prometheus server if "PROMETHEUS_ENABLED_ON_PORT" in os.environ: + logging.info( + "Starting Prometheus server on port %d", PROMETHEUS_ENABLED_ON_PORT + ) start_http_server(PROMETHEUS_ENABLED_ON_PORT) return jetstream_server From 54552d78de0fc856a27f138880f4555d59323786 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 10 May 2024 15:02:08 +0000 Subject: [PATCH 12/14] Conditional fix --- jetstream/core/server_lib.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/jetstream/core/server_lib.py b/jetstream/core/server_lib.py index f9d366d9..9ec53ae0 100644 --- a/jetstream/core/server_lib.py +++ b/jetstream/core/server_lib.py @@ -33,7 +33,7 @@ from prometheus_client import start_http_server _HOST = "[::]" -PROMETHEUS_ENABLED_ON_PORT = 9090 +PROMETHEUS_ENABLED_ON_PORT = os.getenv("PROMETHEUS_ENABLED_ON_PORT") class JetStreamServer: @@ -135,12 +135,15 @@ def run( jetstream_server.start() # Setup Prometheus server - if "PROMETHEUS_ENABLED_ON_PORT" in os.environ: + if PROMETHEUS_ENABLED_ON_PORT is not None: logging.info( "Starting Prometheus server on port %d", PROMETHEUS_ENABLED_ON_PORT ) start_http_server(PROMETHEUS_ENABLED_ON_PORT) - + else: + logging.info( + "Not starting Prometheus server: PROMETHEUS_ENABLED_ON_PORT not set" + ) return jetstream_server From fca13e86ec4132e523d29ee21a59aeb04bd0f56e Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 10 May 2024 11:32:35 -0700 Subject: [PATCH 13/14] Update server_lib.py --- jetstream/core/server_lib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetstream/core/server_lib.py b/jetstream/core/server_lib.py index 9ec53ae0..4e0cbfbf 100644 --- a/jetstream/core/server_lib.py +++ b/jetstream/core/server_lib.py @@ -33,7 +33,7 @@ from prometheus_client import start_http_server _HOST = "[::]" -PROMETHEUS_ENABLED_ON_PORT = os.getenv("PROMETHEUS_ENABLED_ON_PORT") +PROMETHEUS_ENABLED_ON_PORT = int(os.getenv("PROMETHEUS_ENABLED_ON_PORT")) if os.getenv("PROMETHEUS_ENABLED_ON_PORT") else None class JetStreamServer: From 019daa3d693733e77c05f812df3fbb20d0052c4b Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 10 May 2024 18:33:18 +0000 Subject: [PATCH 14/14] reformat --- jetstream/core/server_lib.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/jetstream/core/server_lib.py b/jetstream/core/server_lib.py index 4e0cbfbf..cc983535 100644 --- a/jetstream/core/server_lib.py +++ b/jetstream/core/server_lib.py @@ -33,7 +33,11 @@ from prometheus_client import start_http_server _HOST = "[::]" -PROMETHEUS_ENABLED_ON_PORT = int(os.getenv("PROMETHEUS_ENABLED_ON_PORT")) if os.getenv("PROMETHEUS_ENABLED_ON_PORT") else None +PROMETHEUS_ENABLED_ON_PORT = ( + int(os.getenv("PROMETHEUS_ENABLED_ON_PORT")) + if os.getenv("PROMETHEUS_ENABLED_ON_PORT") + else None +) class JetStreamServer: