Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Several fixes & improvements of C++ MonitorStage #2170

Original file line number Diff line number Diff line change
Expand Up @@ -104,27 +104,22 @@ class ProgressBarContextManager
{
std::lock_guard<std::mutex> lock(m_mutex);

// To avoid display_all() being executed after calling mark_pbar_as_completed() in some race conditions
// If the progress bars needs to be updated after completion, move the cursor up to the beginning
if (m_is_completed)
{
return;
move_cursor_up(m_progress_bars.size());
}

// A bit of hack here to make the font settings work. Indicators enables the font options only if the bars are
// output to standard streams (see is_colorized() in <indicators/termcolor.hpp>), but since we are still using
// the ostream (m_stdout_os) that is connected to the console terminal, the font options should be enabled.
// The internal function here is used to manually enable the font display.
m_stdout_os.iword(termcolor::_internal::colorize_index()) = 1;
display_all_impl();

for (auto& pbar : m_progress_bars)
// If all the progress bars are completed, keep the cursor position as it is
if (m_is_completed)
{
pbar->print_progress(true);
m_stdout_os << termcolor::reset; // The font option only works for the current bar
m_stdout_os << std::endl;
return;
}

// After each round of display, move cursor up ("\033[A") to the beginning of the first bar
m_stdout_os << "\033[" << m_progress_bars.size() << "A" << std::flush;
// Otherwise, move cursor up to the beginning after each round of display
move_cursor_up(m_progress_bars.size());
}

void mark_pbar_as_completed(size_t bar_id)
Expand All @@ -145,17 +140,9 @@ class ProgressBarContextManager
}
if (all_pbars_completed)
{
// Move the cursor down to the bottom of the last progress bar
// Doing this here instead of the destructor to avoid a race condition with the pipeline's
// "====Pipeline Complete====" log message.
// Using a string stream to ensure other logs are not interleaved.
std::ostringstream new_lines;
for (std::size_t i = 0; i < m_progress_bars.size(); ++i)
{
new_lines << "\n";
}
// Display again when completed to avoid progress bars being covered by other logs
display_all_impl();

m_stdout_os << new_lines.str() << std::flush;
m_is_completed = true;
}
}
Expand Down Expand Up @@ -199,6 +186,34 @@ class ProgressBarContextManager
return std::move(progress_bar);
}

void display_all_impl()
{
// A bit of hack here to make the font settings work. Indicators enables the font options only if the bars are
// output to standard streams (see is_colorized() in <indicators/termcolor.hpp>), but since we are still using
// the ostream (m_stdout_os) that is connected to the console terminal, the font options should be enabled.
// The internal function here is used to manually enable the font display.
m_stdout_os.iword(termcolor::_internal::colorize_index()) = 1;

for (auto& pbar : m_progress_bars)
{
pbar->print_progress(true);
m_stdout_os << termcolor::reset; // The font option only works for the current bar
m_stdout_os << std::endl;
}
}

void move_cursor_up(size_t lines)
{
// "\033[<n>A" means moving the cursor up for n lines
m_stdout_os << "\033[" << lines << "A" << std::flush;
}

void move_cursor_down(size_t lines)
{
// "\033[<n>B" means moving the cursor down for n lines
m_stdout_os << "\033[" << lines << "B" << std::flush;
}

indicators::DynamicProgress<indicators::IndeterminateProgressBar> m_dynamic_progress_bars;
std::vector<std::unique_ptr<indicators::IndeterminateProgressBar>> m_progress_bars;
std::mutex m_mutex;
Expand Down Expand Up @@ -227,8 +242,8 @@ class MonitorController
* @param unit : the unit of message count
* @param determine_count_fn : A function that computes the count for each incoming message
*/
MonitorController(const std::string& description,
std::string unit = "messages",
MonitorController(const std::string& description = "Progress",
const std::string& unit = "messages",
indicators::Color text_color = indicators::Color::cyan,
indicators::FontStyle font_style = indicators::FontStyle::bold,
std::optional<std::function<size_t(MessageT)>> determine_count_fn = std::nullopt);
Expand All @@ -239,23 +254,26 @@ class MonitorController
void sink_on_completed();

private:
static std::string format_duration(std::chrono::seconds duration);
static std::string format_throughput(std::chrono::seconds duration, size_t count, const std::string& unit);
static std::string format_duration(std::chrono::microseconds duration);
static std::string format_throughput(std::chrono::microseconds duration, size_t count, const std::string& unit);

size_t m_bar_id;
const std::string m_description;
const std::string m_unit;
std::optional<std::function<size_t(MessageT)>> m_determine_count_fn;
size_t m_count{0};
time_point_t m_start_time;
bool m_is_started{false}; // Set to true after the first call to progress_sink()
bool m_is_completed{false};
};

template <typename MessageT>
MonitorController<MessageT>::MonitorController(const std::string& description,
std::string unit,
const std::string& unit,
indicators::Color text_color,
indicators::FontStyle font_style,
std::optional<std::function<size_t(MessageT)>> determine_count_fn) :
m_description(std::move(description)),
m_unit(std::move(unit)),
m_determine_count_fn(determine_count_fn)
{
Expand All @@ -268,7 +286,7 @@ MonitorController<MessageT>::MonitorController(const std::string& description,
}
}

m_bar_id = ProgressBarContextManager::get_instance().add_progress_bar(description, text_color, font_style);
m_bar_id = ProgressBarContextManager::get_instance().add_progress_bar(m_description, text_color, font_style);
}

template <typename MessageT>
Expand All @@ -280,13 +298,15 @@ MessageT MonitorController<MessageT>::progress_sink(MessageT msg)
m_is_started = true;
}
m_count += (*m_determine_count_fn)(msg);
auto duration = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - m_start_time);
auto duration =
std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now() - m_start_time);

auto& manager = ProgressBarContextManager::get_instance();
auto& pbar = manager.progress_bars()[m_bar_id];

// Update the progress bar
pbar->set_option(indicators::option::PostfixText{format_throughput(duration, m_count, m_unit)});
pbar->set_option(indicators::option::PrefixText{m_description});
pbar->tick();

manager.display_all();
Expand All @@ -298,14 +318,19 @@ template <typename MessageT>
void MonitorController<MessageT>::sink_on_completed()
{
auto& manager = ProgressBarContextManager::get_instance();
auto& pbar = manager.progress_bars()[m_bar_id];

pbar->set_option(indicators::option::PrefixText{"[Completed]" + m_description});
pbar->set_option(indicators::option::ForegroundColor{indicators::Color::green});

manager.mark_pbar_as_completed(m_bar_id);
}

template <typename MessageT>
std::string MonitorController<MessageT>::format_duration(std::chrono::seconds duration)
std::string MonitorController<MessageT>::format_duration(std::chrono::microseconds duration)
{
auto minutes = std::chrono::duration_cast<std::chrono::minutes>(duration);
auto seconds = duration - minutes;
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration - minutes);

std::ostringstream oss;
oss << std::setw(2) << std::setfill('0') << minutes.count() << "m:" << std::setw(2) << std::setfill('0')
Expand All @@ -314,11 +339,12 @@ std::string MonitorController<MessageT>::format_duration(std::chrono::seconds du
}

template <typename MessageT>
std::string MonitorController<MessageT>::format_throughput(std::chrono::seconds duration,
std::string MonitorController<MessageT>::format_throughput(std::chrono::microseconds duration,
size_t count,
const std::string& unit)
{
double throughput = static_cast<double>(count) / duration.count();
double time_in_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(duration).count();
double throughput = static_cast<double>(count) / time_in_seconds;
std::ostringstream oss;
oss << count << " " << unit << " in " << format_duration(duration) << ", "
<< "Throughput: " << std::fixed << std::setprecision(2) << throughput << " " << unit << "/s";
Expand Down
2 changes: 0 additions & 2 deletions python/morpheus/morpheus/stages/general/monitor_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) ->
self._mc._font_style,
self._mc._determine_count_fn)

node.launch_options.pe_count = self._config.num_threads

else:
# Use a component so we track progress using the upstream progress engine. This will provide more accurate
# results
Expand Down
Loading