Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tldrs backend to TEF #31

Merged
merged 6 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ _build
*.exe
perf.*
*.fxt
*.tmp
6 changes: 6 additions & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
(preprocess (pps ppx_trace))
(libraries trace.core trace-fuchsia))

(executable
(name trace_tldrs)
(modules trace_tldrs)
(preprocess (pps ppx_trace))
(libraries trace.core trace-tef.tldrs))

(executable
(name bench_fuchsia_write)
(modules bench_fuchsia_write)
Expand Down
61 changes: 61 additions & 0 deletions bench/trace_tldrs.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
module Trace = Trace_core

let ( let@ ) = ( @@ )

let work ~n () : unit =
for _i = 1 to n do
let@ _sp =
Trace.with_span ~__FILE__ ~__LINE__ "outer" ~data:(fun () ->
[ "i", `Int _i ])
in
for _k = 1 to 10 do
let@ _sp =
Trace.with_span ~__FILE__ ~__LINE__ "inner" ~data:(fun () ->
(* add some big data sometimes *)
if _i mod 100 = 0 && _k = 9 then
[ "s", `String (String.make 5000 '-') ]
else
[])
in
()
done;

if _i mod 1000 = 0 then Thread.yield ()
(* Thread.delay 1e-6 *)
done

let main ~n ~j ~child () : unit =
if child then
work ~n ()
else
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "parent" in
let cmd =
Printf.sprintf "%s --child -n=%d" (Filename.quote Sys.argv.(0)) n
in
let procs = Array.init j (fun _ -> Unix.open_process_in cmd) in
Array.iteri
(fun idx _ic ->
let@ _sp =
Trace.with_span ~__FILE__ ~__LINE__ "wait.child" ~data:(fun () ->
[ "i", `Int idx ])
in
ignore @@ Unix.close_process_in _ic)
procs

let () =
let@ () = Trace_tef_tldrs.with_setup () in

let n = ref 10_000 in
let j = ref 4 in
let child = ref false in

let args =
[
"-n", Arg.Set_int n, " number of iterations";
"-j", Arg.Set_int j, " set number of workers";
"--child", Arg.Set child, " act as child process";
]
|> Arg.align
in
Arg.parse args ignore "bench1";
main ~n:!n ~j:!j ~child:!child ()
3 changes: 3 additions & 0 deletions bench_tldrs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/sh
DUNE_OPTS="--profile=release --display=quiet"
exec dune exec $DUNE_OPTS bench/trace_tldrs.exe -- $@
6 changes: 6 additions & 0 deletions src/tef-tldrs/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

(library
(name trace_tef_tldrs)
(public_name trace-tef.tldrs)
(synopsis "Multiprocess tracing using the `tldrs` daemon")
(libraries trace.core trace.private.util trace-tef unix threads))
128 changes: 128 additions & 0 deletions src/tef-tldrs/trace_tef_tldrs.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
open Trace_core

let spf = Printf.sprintf
let fpf = Printf.fprintf

type output = [ `File of string ]

(** Env variable used to communicate to subprocesses, which trace ID to use *)
let env_var_trace_id = "TRACE_TEF_TLDR_TRACE_ID"

(** Env variable used to communicate to subprocesses, which trace ID to use *)
let env_var_unix_socket = "TRACE_TEF_TLDR_SOCKET"

let get_unix_socket () =
match Sys.getenv_opt env_var_unix_socket with
| Some s -> s
| None ->
let s = "/tmp/tldrs.socket" in
(* children must agree on the socket file *)
Unix.putenv env_var_unix_socket s;
s

type as_client = {
trace_id: string;
socket: string;
emit_tef_at_exit: string option;
(** For parent, ask daemon to emit traces here *)
}

type role = as_client option

let to_hex (s : string) : string =
let open String in
let i_to_hex (i : int) =
if i < 10 then
Char.chr (i + Char.code '0')
else
Char.chr (i - 10 + Char.code 'a')
in

let res = Bytes.create (2 * length s) in
for i = 0 to length s - 1 do
let n = Char.code (get s i) in
Bytes.set res (2 * i) (i_to_hex ((n land 0xf0) lsr 4));
Bytes.set res ((2 * i) + 1) (i_to_hex (n land 0x0f))
done;
Bytes.unsafe_to_string res

let create_trace_id () : string =
let now = Unix.gettimeofday () in
let rand = Random.State.make_self_init () in

let rand_bytes = Bytes.create 16 in
for i = 0 to Bytes.length rand_bytes - 1 do
Bytes.set rand_bytes i (Random.State.int rand 256 |> Char.chr)
done;
(* convert to hex *)
spf "tr-%d-%s" (int_of_float now) (to_hex @@ Bytes.unsafe_to_string rand_bytes)

(** Find what this particular process has to do wrt tracing *)
let find_role ~out () : role =
match Sys.getenv_opt env_var_trace_id with
| Some trace_id ->
Some { trace_id; emit_tef_at_exit = None; socket = get_unix_socket () }
| None ->
let write_to_file path =
(* normalize path so the daemon knows what we're talking about *)
let path =
if Filename.is_relative path then
Filename.concat (Unix.getcwd ()) path
else
path
in
let trace_id = create_trace_id () in
Unix.putenv env_var_trace_id trace_id;
{ trace_id; emit_tef_at_exit = Some path; socket = get_unix_socket () }
in

(match out with
| `File path -> Some (write_to_file path)
| `Env ->
(match Sys.getenv_opt "TRACE" with
| Some ("1" | "true") -> Some (write_to_file "trace.json")
| Some path -> Some (write_to_file path)
| None -> None))

let collector_ (client : as_client) : collector =
(* connect to unix socket *)
let sock = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in
(try Unix.connect sock (Unix.ADDR_UNIX client.socket)
with exn ->
failwith
@@ spf "Could not open socket to `tldrs` demon at %S: %s" client.socket
(Printexc.to_string exn));
let out = Unix.out_channel_of_descr sock in

(* what to do when the collector shuts down *)
let finally () =
(* ask the collector to emit the trace in a user-chosen file, perhaps *)
Option.iter
(fun file -> fpf out "EMIT_TEF %s\n" file)
client.emit_tef_at_exit;
(try flush out with _ -> ());
try Unix.close sock with _ -> ()
in

fpf out "OPEN %s\n%!" client.trace_id;
Trace_tef.Internal_.collector_jsonl ~finally ~out:(`Output out) ()

let collector ~out () : collector =
let role = find_role ~out () in
match role with
| None -> assert false
| Some c -> collector_ c

let setup ?(out = `Env) () =
let role = find_role ~out () in
match role with
| None -> ()
| Some c -> Trace_core.setup_collector @@ collector_ c

let with_setup ?out () f =
setup ?out ();
Fun.protect ~finally:Trace_core.shutdown f

module Internal_ = struct
include Trace_tef.Internal_
end
40 changes: 40 additions & 0 deletions src/tef-tldrs/trace_tef_tldrs.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
val collector : out:[ `File of string ] -> unit -> Trace_core.collector
(** Make a collector that writes into the given output.
See {!setup} for more details. *)

type output = [ `File of string ]
(** Output for tracing.
- [`File "foo"] will enable tracing and print events into file
named "foo". The file is only written at exit.
*)

val setup : ?out:[ output | `Env ] -> unit -> unit
(** [setup ()] installs the collector depending on [out].

@param out can take different values:
- regular {!output} value to specify where events go
- [`Env] will enable tracing if the environment
variable "TRACE" is set.

- If it's set to "1", then the file is "trace.json".
- If it's set to "stdout", then logging happens on stdout (since 0.2)
- If it's set to "stderr", then logging happens on stdout (since 0.2)
- Otherwise, if it's set to a non empty string, the value is taken
to be the file path into which to write.
*)

val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a
(** [with_setup () f] (optionally) sets a collector up, calls [f()],
and makes sure to shutdown before exiting.
*)

(**/**)

module Internal_ : sig
val mock_all_ : unit -> unit
(** use fake, deterministic timestamps, TID, PID *)

val on_tracing_error : (string -> unit) ref
end

(**/**)
7 changes: 3 additions & 4 deletions src/tef/trace_tef.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ module Mock_ = struct
float_of_int x
end

let counter = Mtime_clock.counter ()

(** Now, in microseconds *)
let[@inline] now_us () : float =
if !Mock_.enabled then
Mock_.now_us ()
else (
let t = Mtime_clock.count counter in
Mtime.Span.to_float_ns t /. 1e3
let t = Mtime_clock.now () in
Int64.to_float (Mtime.to_uint64_ns t) /. 1e3
)

let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s)
Expand Down Expand Up @@ -72,6 +70,7 @@ module Writer = struct
| `File path -> open_out path, true
| `File_append path ->
open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true
| `Output oc -> oc, false
in
let pid =
if !Mock_.enabled then
Expand Down
2 changes: 1 addition & 1 deletion src/tef/trace_tef.mli
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ module Internal_ : sig

val collector_jsonl :
finally:(unit -> unit) ->
out:[ `File_append of string ] ->
out:[ `File_append of string | `Output of out_channel ] ->
unit ->
Trace_core.collector

Expand Down
Loading