diff --git a/.gitignore b/.gitignore index da14214..5c093cc 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ _build *.exe perf.* *.fxt +*.tmp diff --git a/bench/dune b/bench/dune index ade51c9..837ef79 100644 --- a/bench/dune +++ b/bench/dune @@ -10,6 +10,12 @@ (preprocess (pps ppx_trace)) (libraries trace.core trace-fuchsia)) +(executable + (name trace_tldrs) + (modules trace_tldrs) + (preprocess (pps ppx_trace)) + (libraries trace.core trace-tef.tldrs)) + (executable (name bench_fuchsia_write) (modules bench_fuchsia_write) diff --git a/bench/trace_tldrs.ml b/bench/trace_tldrs.ml new file mode 100644 index 0000000..ff8cc29 --- /dev/null +++ b/bench/trace_tldrs.ml @@ -0,0 +1,61 @@ +module Trace = Trace_core + +let ( let@ ) = ( @@ ) + +let work ~n () : unit = + for _i = 1 to n do + let@ _sp = + Trace.with_span ~__FILE__ ~__LINE__ "outer" ~data:(fun () -> + [ "i", `Int _i ]) + in + for _k = 1 to 10 do + let@ _sp = + Trace.with_span ~__FILE__ ~__LINE__ "inner" ~data:(fun () -> + (* add some big data sometimes *) + if _i mod 100 = 0 && _k = 9 then + [ "s", `String (String.make 5000 '-') ] + else + []) + in + () + done; + + if _i mod 1000 = 0 then Thread.yield () + (* Thread.delay 1e-6 *) + done + +let main ~n ~j ~child () : unit = + if child then + work ~n () + else + let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "parent" in + let cmd = + Printf.sprintf "%s --child -n=%d" (Filename.quote Sys.argv.(0)) n + in + let procs = Array.init j (fun _ -> Unix.open_process_in cmd) in + Array.iteri + (fun idx _ic -> + let@ _sp = + Trace.with_span ~__FILE__ ~__LINE__ "wait.child" ~data:(fun () -> + [ "i", `Int idx ]) + in + ignore @@ Unix.close_process_in _ic) + procs + +let () = + let@ () = Trace_tef_tldrs.with_setup () in + + let n = ref 10_000 in + let j = ref 4 in + let child = ref false in + + let args = + [ + "-n", Arg.Set_int n, " number of iterations"; + "-j", Arg.Set_int j, " set number of workers"; + "--child", Arg.Set child, " act as child process"; + ] + |> Arg.align + in + Arg.parse args ignore "bench1"; + main ~n:!n ~j:!j ~child:!child () diff --git a/bench_tldrs.sh b/bench_tldrs.sh new file mode 100755 index 0000000..3c5eabf --- /dev/null +++ b/bench_tldrs.sh @@ -0,0 +1,3 @@ +#!/bin/sh +DUNE_OPTS="--profile=release --display=quiet" +exec dune exec $DUNE_OPTS bench/trace_tldrs.exe -- $@ diff --git a/src/tef-tldrs/dune b/src/tef-tldrs/dune new file mode 100644 index 0000000..9f99684 --- /dev/null +++ b/src/tef-tldrs/dune @@ -0,0 +1,6 @@ + +(library + (name trace_tef_tldrs) + (public_name trace-tef.tldrs) + (synopsis "Multiprocess tracing using the `tldrs` daemon") + (libraries trace.core trace.private.util trace-tef unix threads)) diff --git a/src/tef-tldrs/trace_tef_tldrs.ml b/src/tef-tldrs/trace_tef_tldrs.ml new file mode 100644 index 0000000..ae09334 --- /dev/null +++ b/src/tef-tldrs/trace_tef_tldrs.ml @@ -0,0 +1,128 @@ +open Trace_core + +let spf = Printf.sprintf +let fpf = Printf.fprintf + +type output = [ `File of string ] + +(** Env variable used to communicate to subprocesses, which trace ID to use *) +let env_var_trace_id = "TRACE_TEF_TLDR_TRACE_ID" + +(** Env variable used to communicate to subprocesses, which trace ID to use *) +let env_var_unix_socket = "TRACE_TEF_TLDR_SOCKET" + +let get_unix_socket () = + match Sys.getenv_opt env_var_unix_socket with + | Some s -> s + | None -> + let s = "/tmp/tldrs.socket" in + (* children must agree on the socket file *) + Unix.putenv env_var_unix_socket s; + s + +type as_client = { + trace_id: string; + socket: string; + emit_tef_at_exit: string option; + (** For parent, ask daemon to emit traces here *) +} + +type role = as_client option + +let to_hex (s : string) : string = + let open String in + let i_to_hex (i : int) = + if i < 10 then + Char.chr (i + Char.code '0') + else + Char.chr (i - 10 + Char.code 'a') + in + + let res = Bytes.create (2 * length s) in + for i = 0 to length s - 1 do + let n = Char.code (get s i) in + Bytes.set res (2 * i) (i_to_hex ((n land 0xf0) lsr 4)); + Bytes.set res ((2 * i) + 1) (i_to_hex (n land 0x0f)) + done; + Bytes.unsafe_to_string res + +let create_trace_id () : string = + let now = Unix.gettimeofday () in + let rand = Random.State.make_self_init () in + + let rand_bytes = Bytes.create 16 in + for i = 0 to Bytes.length rand_bytes - 1 do + Bytes.set rand_bytes i (Random.State.int rand 256 |> Char.chr) + done; + (* convert to hex *) + spf "tr-%d-%s" (int_of_float now) (to_hex @@ Bytes.unsafe_to_string rand_bytes) + +(** Find what this particular process has to do wrt tracing *) +let find_role ~out () : role = + match Sys.getenv_opt env_var_trace_id with + | Some trace_id -> + Some { trace_id; emit_tef_at_exit = None; socket = get_unix_socket () } + | None -> + let write_to_file path = + (* normalize path so the daemon knows what we're talking about *) + let path = + if Filename.is_relative path then + Filename.concat (Unix.getcwd ()) path + else + path + in + let trace_id = create_trace_id () in + Unix.putenv env_var_trace_id trace_id; + { trace_id; emit_tef_at_exit = Some path; socket = get_unix_socket () } + in + + (match out with + | `File path -> Some (write_to_file path) + | `Env -> + (match Sys.getenv_opt "TRACE" with + | Some ("1" | "true") -> Some (write_to_file "trace.json") + | Some path -> Some (write_to_file path) + | None -> None)) + +let collector_ (client : as_client) : collector = + (* connect to unix socket *) + let sock = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in + (try Unix.connect sock (Unix.ADDR_UNIX client.socket) + with exn -> + failwith + @@ spf "Could not open socket to `tldrs` demon at %S: %s" client.socket + (Printexc.to_string exn)); + let out = Unix.out_channel_of_descr sock in + + (* what to do when the collector shuts down *) + let finally () = + (* ask the collector to emit the trace in a user-chosen file, perhaps *) + Option.iter + (fun file -> fpf out "EMIT_TEF %s\n" file) + client.emit_tef_at_exit; + (try flush out with _ -> ()); + try Unix.close sock with _ -> () + in + + fpf out "OPEN %s\n%!" client.trace_id; + Trace_tef.Internal_.collector_jsonl ~finally ~out:(`Output out) () + +let collector ~out () : collector = + let role = find_role ~out () in + match role with + | None -> assert false + | Some c -> collector_ c + +let setup ?(out = `Env) () = + let role = find_role ~out () in + match role with + | None -> () + | Some c -> Trace_core.setup_collector @@ collector_ c + +let with_setup ?out () f = + setup ?out (); + Fun.protect ~finally:Trace_core.shutdown f + +module Internal_ = struct + include Trace_tef.Internal_ +end diff --git a/src/tef-tldrs/trace_tef_tldrs.mli b/src/tef-tldrs/trace_tef_tldrs.mli new file mode 100644 index 0000000..dc67648 --- /dev/null +++ b/src/tef-tldrs/trace_tef_tldrs.mli @@ -0,0 +1,40 @@ +val collector : out:[ `File of string ] -> unit -> Trace_core.collector +(** Make a collector that writes into the given output. + See {!setup} for more details. *) + +type output = [ `File of string ] +(** Output for tracing. + - [`File "foo"] will enable tracing and print events into file + named "foo". The file is only written at exit. +*) + +val setup : ?out:[ output | `Env ] -> unit -> unit +(** [setup ()] installs the collector depending on [out]. + + @param out can take different values: + - regular {!output} value to specify where events go + - [`Env] will enable tracing if the environment + variable "TRACE" is set. + + - If it's set to "1", then the file is "trace.json". + - If it's set to "stdout", then logging happens on stdout (since 0.2) + - If it's set to "stderr", then logging happens on stdout (since 0.2) + - Otherwise, if it's set to a non empty string, the value is taken + to be the file path into which to write. +*) + +val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a +(** [with_setup () f] (optionally) sets a collector up, calls [f()], + and makes sure to shutdown before exiting. +*) + +(**/**) + +module Internal_ : sig + val mock_all_ : unit -> unit + (** use fake, deterministic timestamps, TID, PID *) + + val on_tracing_error : (string -> unit) ref +end + +(**/**) diff --git a/src/tef/trace_tef.ml b/src/tef/trace_tef.ml index 03824ba..7fea2ae 100644 --- a/src/tef/trace_tef.ml +++ b/src/tef/trace_tef.ml @@ -15,15 +15,13 @@ module Mock_ = struct float_of_int x end -let counter = Mtime_clock.counter () - (** Now, in microseconds *) let[@inline] now_us () : float = if !Mock_.enabled then Mock_.now_us () else ( - let t = Mtime_clock.count counter in - Mtime.Span.to_float_ns t /. 1e3 + let t = Mtime_clock.now () in + Int64.to_float (Mtime.to_uint64_ns t) /. 1e3 ) let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s) @@ -72,6 +70,7 @@ module Writer = struct | `File path -> open_out path, true | `File_append path -> open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true + | `Output oc -> oc, false in let pid = if !Mock_.enabled then diff --git a/src/tef/trace_tef.mli b/src/tef/trace_tef.mli index 03866f0..6c73f13 100644 --- a/src/tef/trace_tef.mli +++ b/src/tef/trace_tef.mli @@ -47,7 +47,7 @@ module Internal_ : sig val collector_jsonl : finally:(unit -> unit) -> - out:[ `File_append of string ] -> + out:[ `File_append of string | `Output of out_channel ] -> unit -> Trace_core.collector