Skip to content

Commit

Permalink
Changed the logger to run all commands in a single shell.
Browse files Browse the repository at this point in the history
Previously, a new shell was spawned for every command run in the logger.
Now it is the case that the logger will run all commands in one shell.
So running something like logger.log("Switch groups", "newgrp my-group")
followed by logger.log("Print group", "id -gn") would have the same effect
as running "newgrp my-group" and "id -gn" in your terminal: the output
should be "my-group".
  • Loading branch information
dc-snl committed Feb 15, 2021
1 parent 25ca040 commit 69496f2
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 111 deletions.
154 changes: 144 additions & 10 deletions logger/classes.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
#!/usr/bin/env python3
from .util import run_teed_command, program_exists_in_path
from abc import abstractmethod
from collections import namedtuple
import fcntl
from io import StringIO
import inspect
from multiprocessing import Process, Manager
import os
from pathlib import Path
import subprocess
import sys
from threading import Thread
import time
from types import SimpleNamespace

def trace_collector(command, **kwargs):
def trace_collector(**kwargs):
trace_name = kwargs["trace"]
Collectors = [c for c in Trace.subclasses if c.trace_name == trace_name]
if len(Collectors) == 1:
Collector = Collectors[0]
return Collector(command, **kwargs)
return Collector(**kwargs)
elif len(Collectors) == 0:
raise RuntimeError(f"Unsupported trace type: {trace_name}")
else:
Expand All @@ -29,6 +35,138 @@ def stats_collectors(**kwargs):
collectors.append(Collector(interval, manager))
return collectors

class Shell:
def __init__(self, pwd=Path.cwd()):
self.aux_stdin_rfd, self.aux_stdin_wfd = os.pipe()
self.aux_stdout_rfd, self.aux_stdout_wfd = os.pipe()
self.aux_stderr_rfd, self.aux_stderr_wfd = os.pipe()

aux_stdout_write_flags = fcntl.fcntl(self.aux_stdout_wfd, fcntl.F_GETFL)
fcntl.fcntl(self.aux_stdout_wfd,
fcntl.F_SETFL,
aux_stdout_write_flags | os.O_NONBLOCK)
aux_stderr_write_flags = fcntl.fcntl(self.aux_stderr_wfd, fcntl.F_GETFL)
fcntl.fcntl(self.aux_stderr_wfd,
fcntl.F_SETFL,
aux_stderr_write_flags | os.O_NONBLOCK)

os.set_inheritable(self.aux_stdout_wfd, True)
os.set_inheritable(self.aux_stderr_wfd, True)
self.shell = subprocess.Popen(os.environ["SHELL"],
stdin=self.aux_stdin_rfd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=False)
os.set_inheritable(self.aux_stdout_wfd, False)
os.set_inheritable(self.aux_stderr_wfd, False)
self.cd(pwd)

def __def__(self):
os.close(self.aux_stdin_rfd)
os.close(self.aux_stdin_wfd)
os.close(self.aux_stdout_rfd)
os.close(self.aux_stdout_wfd)
os.close(self.aux_stderr_rfd)
os.close(self.aux_stderr_wfd)

def __eq__(self, other):
return type(self) == type(other) and self.pwd() == other.pwd()

def pwd(self):
directory, _ = self.auxiliary_command(posix="pwd", nt="cd", strip=True)
return directory

def cd(self, dir):
os.chdir(dir)
self.auxiliary_command(posix=f"cd {dir}", nt=f"cd {dir}")

def run(self, command, **kwargs):
start = round(time.time() * 1000)
os.write(self.aux_stdin_wfd, f"{command}\n".encode())
os.write(self.aux_stdin_wfd, f"printf '\\4'\n".encode())
os.write(self.aux_stdin_wfd, f"printf '\\4' 1>&2\n".encode())
output = self.tee(self.shell.stdout, self.shell.stderr, **kwargs)
aux_out, _ = self.auxiliary_command(posix="echo $?", nt="echo %ERRORLEVEL%")
returncode = int(aux_out)
finish = round(time.time() * 1000)
return SimpleNamespace(
returncode = returncode,
args = command,
stdout = output.stdout_str,
stderr = output.stderr_str,
start = start,
finish = finish,
wall = finish - start
)

def tee(self, stdout, stderr, **kwargs):
sys_stdout = None if kwargs.get("quiet_stdout") else sys.stdout
sys_stderr = None if kwargs.get("quiet_stderr") else sys.stderr
stdout_io = StringIO() if kwargs.get("stdout_str") else None
stderr_io = StringIO() if kwargs.get("stderr_str") else None
stdout_path = open(kwargs.get("stdout_path", os.devnull), "a")
stderr_path = open(kwargs.get("stderr_path", os.devnull), "a")
stdout_tee = [sys_stdout, stdout_io, stdout_path]
stderr_tee = [sys_stderr, stderr_io, stderr_path]
def write(input, outputs):
chunk = os.read(input.fileno(), 4096)
while chunk[-1] != 4:
for output in outputs:
if output is not None:
output.write(chunk.decode())
chunk = os.read(input.fileno(), 4096)
chunk = chunk[:-1]
for output in outputs:
if output is not None:
output.write(chunk.decode())
threads = [
Thread(target=write, args=(stdout, stdout_tee)),
Thread(target=write, args=(stderr, stderr_tee)),
]
for thread in threads:
thread.daemon = True
thread.start()
for thread in threads:
thread.join()
stdout_str = stdout_io.getvalue() if stdout_io is not None else None
stderr_str = stderr_io.getvalue() if stderr_io is not None else None
for file in (stdout_tee + stderr_tee):
if file not in [None, sys.stdout, sys.stderr, sys.stdin]:
if not file.closed:
file.close()
return SimpleNamespace(
stdout_str = stdout_str,
stderr_str = stderr_str
)

def auxiliary_command(self, **kwargs):
stdout, stderr = None, None
cmd = kwargs[os.name]
out = self.aux_stdout_wfd
err = self.aux_stderr_wfd
if os.name in kwargs:
os.write(self.aux_stdin_wfd, f"{cmd} 1>&{out} 2>&{err}\n".encode())
os.write(self.aux_stdin_wfd, f"printf '\\4' 1>&{out}\n".encode())
os.write(self.aux_stdin_wfd, f"printf '\\4' 1>&{err}\n".encode())
stdout = ""
stderr = ""
aux = os.read(self.aux_stdout_rfd, 65536)
while aux[-1] != 4:
stdout += aux.decode()
aux = os.read(self.aux_stdout_rfd, 65536)
aux = aux[:-1]
stdout += aux.decode()
aux = os.read(self.aux_stderr_rfd, 65536)
while aux[-1] != 4:
stderr += aux.decode()
aux = os.read(self.aux_stderr_rfd, 65536)
aux = aux[:-1]
stderr += aux.decode()
if stdout and kwargs.get("strip"):
stdout = stdout.strip()
stderr = stderr.strip()
return stdout, stderr

Stat = namedtuple("Stat", ["data", "svg"])

class Trace:
Expand All @@ -38,21 +176,17 @@ def subclass(TraceSubclass):
if (issubclass(TraceSubclass, Trace)):
Trace.subclasses.append(TraceSubclass)
return TraceSubclass
def __init__(self, command, **kwargs):
program_exists_in_path(self.trace_name)
def __init__(self, **kwargs):
if kwargs.get("trace_path"):
self.output_path = Path(kwargs["trace_path"])
else:
self.output_path = Path(f"{self.trace_name}.log")
self.command = command
@property
@abstractmethod
def trace_args(self):
raise AbstractMethod()
def __call__(self, **kwargs):
command = f"{self.trace_args} -- {self.command}"
completed_process = run_teed_command(command, **kwargs)
return completed_process
def command(self, command, **kwargs):
return f"{self.trace_args} -- {command}"

class StatsCollector:
stat_name = "undefined"
Expand Down
74 changes: 40 additions & 34 deletions logger/logger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3

from .classes import Trace, StatsCollector, Stat, trace_collector, stats_collectors
from .util import make_svg_line_chart, run_teed_command, nested_SimpleNamespace_to_dict
from .classes import Shell, Trace, StatsCollector, Stat, trace_collector, stats_collectors
from .util import make_svg_line_chart, nested_SimpleNamespace_to_dict
from collections.abc import Iterable, Mapping
import datetime
import distutils.dir_util as dir_util
Expand All @@ -11,10 +11,9 @@
import psutil
import random
import re
import select
import shutil
import string
import subprocess
import sys
import tempfile
from textwrap import indent
import time
Expand Down Expand Up @@ -68,6 +67,12 @@ def default(self, obj):
return path
elif obj is None:
return None
elif isinstance(obj, Shell):
shell = {
'__type__': 'Shell',
'pwd': obj.pwd()
}
return shell
else:
# Call JSONEncoder's implementation
return json.JSONEncoder.default(self, obj)
Expand Down Expand Up @@ -108,6 +113,8 @@ def dict_to_object(self, obj):
return Path(obj['value'])
elif obj['__type__'] == 'tuple':
return tuple(obj['items'])
elif obj['__type__'] == 'Shell':
return Shell(Path(obj['pwd']))


class Logger:
Expand Down Expand Up @@ -201,6 +208,7 @@ def __init__(self, name, log_dir=Path.cwd(), strm_dir=None, html_file=None,
self.duration = duration
self.indent = indent
self.is_parent = True if self.indent == 0 else False
self.shell = Shell(Path.cwd())

# log_dir
# -------
Expand Down Expand Up @@ -726,16 +734,15 @@ def run(self, command, **kwargs):
kwargs[key] = True
old_pwd = os.getcwd()
if kwargs.get("pwd"):
os.chdir(kwargs.get("pwd"))
aux_info = auxiliary_information()
self.shell.cd(kwargs.get("pwd"))
aux_info = self.auxiliary_information()
for collector in collectors:
collector.start()
if "trace" in kwargs:
trace = trace_collector(command, **kwargs)
completed_process = trace(**kwargs)
trace = trace_collector(**kwargs)
command = trace.command(command, **kwargs)
trace_output = trace.output_path
else:
completed_process = run_teed_command(command, **kwargs)
completed_process = self.shell.run(command, **kwargs)
for collector in collectors:
stats[collector.stat_name] = collector.finish()
setattr(completed_process, "trace_path", trace_output)
Expand All @@ -746,34 +753,33 @@ def run(self, command, **kwargs):
else:
setattr(completed_process, "trace", None)
if kwargs.get("pwd"):
os.chdir(old_pwd)
self.shell.cd(old_pwd)
return SimpleNamespace(**completed_process.__dict__, **aux_info.__dict__)

def auxiliary_information():
return SimpleNamespace(
pwd = auxiliary_command_output(posix="pwd", nt="cd", strip=True),
environment = auxiliary_command_output(posix="env", nt="set"),
umask = auxiliary_command_output(posix="umask", strip=True),
user = auxiliary_command_output(posix="whoami", nt="whoami", strip=True),
group = auxiliary_command_output(posix="id -gn", strip=True),
shell = auxiliary_command_output(posix="printenv SHELL", strip=True),
ulimit = auxiliary_command_output(posix="ulimit -a")
)

def auxiliary_command_output(**kwargs):
stdout = None
if os.name in kwargs:
c = subprocess.run(kwargs[os.name], capture_output=True, shell=True, check=True)
stdout = c.stdout.decode()
if stdout and kwargs.get("strip"):
stdout = stdout.strip()
return stdout
def auxiliary_information(self):
pwd, _ = self.shell.auxiliary_command(posix="pwd", nt="cd", strip=True)
environment, _ = self.shell.auxiliary_command(posix="env", nt="set")
umask, _ = self.shell.auxiliary_command(posix="umask", strip=True)
user, _ = self.shell.auxiliary_command(posix="whoami", nt="whoami", strip=True)
group, _ = self.shell.auxiliary_command(posix="id -gn", strip=True)
shell, _ = self.shell.auxiliary_command(posix="printenv SHELL", strip=True)
ulimit, _ = self.shell.auxiliary_command(posix="ulimit -a")
x = SimpleNamespace(
pwd=pwd,
environment=environment,
umask=umask,
user=user,
group=group,
shell=shell,
ulimit=ulimit
)
return x

@Trace.subclass
class Strace(Trace):
trace_name = "strace"
def __init__(self, command, **kwargs):
super().__init__(command, **kwargs)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.summary = True if kwargs.get("summary") else False
self.expression = kwargs.get("expression")
@property
Expand All @@ -788,8 +794,8 @@ def trace_args(self):
@Trace.subclass
class Ltrace(Trace):
trace_name = "ltrace"
def __init__(self, command, **kwargs):
super().__init__(command, **kwargs)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.summary = True if kwargs.get("summary") else False
self.expression = kwargs.get("expression")
@property
Expand Down
Loading

0 comments on commit 69496f2

Please sign in to comment.