Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup(userspace/libsinsp): call sinsp_observer methods after an event has been processed by all parsers #2222

Merged
merged 6 commits into from
Jan 13, 2025
147 changes: 95 additions & 52 deletions userspace/libsinsp/parsers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1276,9 +1276,14 @@
return;
}

/* If there's a listener, invoke it */
//
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_clone(evt, new_child.get(), tid_collision);
m_inspector->m_post_process_cbs.emplace(
[new_child, tid_collision](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_clone(evt, new_child.get(), tid_collision);
});
}

/* If we had to erase a previous entry for this tid and rebalance the table,
Expand All @@ -1292,8 +1297,6 @@
new_child->m_comm.c_str());
}
/*=============================== ADD THREAD TO THE TABLE ===========================*/

return;
}

void sinsp_parser::parse_clone_exit_child(sinsp_evt *evt) {
Expand Down Expand Up @@ -1761,17 +1764,19 @@
evt->set_tinfo(new_child.get());

//
// If there's a listener, invoke it
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_clone(evt, new_child.get(), tid_collision);
m_inspector->m_post_process_cbs.emplace(
[new_child, tid_collision](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_clone(evt, new_child.get(), tid_collision);

Check warning on line 1772 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L1772

Added line #L1772 was not covered by tests
});
}

/* If we had to erase a previous entry for this tid and rebalance the table,
* make sure we reinitialize the child_tinfo pointer for this event, as the thread
* generating it might have gone away.
*/

if(tid_collision != -1) {
reset(evt);
/* Right now we have collisions only on the clone() caller */
Expand All @@ -1781,7 +1786,6 @@
}

/*=============================== CREATE NEW THREAD-INFO ===========================*/
return;
}

void sinsp_parser::parse_clone_exit(sinsp_evt *evt) {
Expand Down Expand Up @@ -2230,15 +2234,11 @@
evt->get_tinfo()->m_flags |= PPM_CL_NAME_CHANGED;

//
// Recompute the program hash
//
evt->get_tinfo()->compute_program_hash();

//
// If there's a listener, invoke it
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_execve(evt);
m_inspector->m_post_process_cbs.emplace(
[](sinsp_observer *observer, sinsp_evt *evt) { observer->on_execve(evt); });
}

/* If any of the threads in a thread group performs an
Expand Down Expand Up @@ -2787,10 +2787,11 @@
evt->get_fd_info()->m_name = evt->get_param_as_str(1, &parstr, sinsp_evt::PF_SIMPLE);

//
// If there's a listener callback, invoke it
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_bind(evt);
m_inspector->m_post_process_cbs.emplace(
[](sinsp_observer *observer, sinsp_evt *evt) { observer->on_bind(evt); });

Check warning on line 2794 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L2793-L2794

Added lines #L2793 - L2794 were not covered by tests
}
}

Expand Down Expand Up @@ -2982,7 +2983,6 @@

void sinsp_parser::parse_connect_exit(sinsp_evt *evt) {
const sinsp_evt_param *parinfo;
uint8_t *packed_data;
int64_t retval;
int64_t fd;
bool force_overwrite_stale_data = false;
Expand Down Expand Up @@ -3048,15 +3048,18 @@
return;
}

packed_data = (uint8_t *)parinfo->m_val;
uint8_t *packed_data = (uint8_t *)parinfo->m_val;

fill_client_socket_info(evt, packed_data, force_overwrite_stale_data);

//
// If there's a listener callback, invoke it
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_connect(evt, packed_data);
m_inspector->m_post_process_cbs.emplace(
[packed_data](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_connect(evt, packed_data);

Check warning on line 3061 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3059-L3061

Added lines #L3059 - L3061 were not covered by tests
});
}
}

Expand Down Expand Up @@ -3144,8 +3147,14 @@
fdi->m_name = evt->get_param_as_str(1, &parstr, sinsp_evt::PF_SIMPLE);
fdi->m_flags = 0;

//
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_accept(evt, fd, packed_data, fdi.get());
m_inspector->m_post_process_cbs.emplace(
[fd, packed_data](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_accept(evt, fd, packed_data, evt->get_fd_info());

Check warning on line 3156 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3155-L3156

Added lines #L3155 - L3156 were not covered by tests
});
}

//
Expand Down Expand Up @@ -3205,8 +3214,21 @@
m_inspector->get_fds_to_remove().push_back(params->m_fd);
}

//
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_erase_fd(params);
auto ts = params->m_ts;
auto remove_from_table = params->m_remove_from_table;
auto fd = params->m_fd;
auto tinfo = params->m_tinfo;
auto fdinfo = params->m_fdinfo;
m_inspector->m_post_process_cbs.emplace(
[ts, remove_from_table, fd, tinfo, fdinfo](sinsp_observer *observer,
sinsp_evt *evt) {
erase_fd_params p = {remove_from_table, fd, tinfo, fdinfo, ts};
observer->on_erase_fd(&p);
});
}
}

Expand Down Expand Up @@ -3669,8 +3691,6 @@
}

if(eflags & EF_READS_FROM_FD) {
const char *data;
uint32_t datalen;
int32_t tupleparam = -1;

if(etype == PPME_SOCKET_RECVFROM_X) {
Expand Down Expand Up @@ -3729,20 +3749,23 @@
parinfo = evt->get_param(1);
}

datalen = parinfo->m_len;
data = parinfo->m_val;
uint32_t datalen = parinfo->m_len;
const char *data = parinfo->m_val;

//
// If there's an fd listener, call it now
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_read(evt,
tid,
evt->get_tinfo()->m_lastevent_fd,
evt->get_fd_info(),
data,
(uint32_t)retval,
datalen);
m_inspector->m_post_process_cbs.emplace(
[tid, data, retval, datalen](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_read(evt,
tid,
evt->get_tinfo()->m_lastevent_fd,
evt->get_fd_info(),
data,
(uint32_t)retval,
datalen);
});
}

//
Expand Down Expand Up @@ -3791,7 +3814,6 @@
#endif

} else {
const char *data;
uint32_t datalen;
int32_t tupleparam = -1;

Expand Down Expand Up @@ -3849,19 +3871,22 @@
//
parinfo = evt->get_param(1);
datalen = parinfo->m_len;
data = parinfo->m_val;
const char *data = parinfo->m_val;

//
// If there's an fd listener, call it now
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_write(evt,
tid,
evt->get_tinfo()->m_lastevent_fd,
evt->get_fd_info(),
data,
(uint32_t)retval,
datalen);
m_inspector->m_post_process_cbs.emplace(
[tid, data, retval, datalen](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_write(evt,

Check warning on line 3882 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3880-L3882

Added lines #L3880 - L3882 were not covered by tests
tid,
evt->get_tinfo()->m_lastevent_fd,

Check warning on line 3884 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3884

Added line #L3884 was not covered by tests
evt->get_fd_info(),
data,
(uint32_t)retval,
datalen);
});
}

// perform syslog decoding if applicable
Expand All @@ -3873,8 +3898,14 @@
if(evt->get_fd_info()->m_type == SCAP_FD_IPV4_SOCK ||
evt->get_fd_info()->m_type == SCAP_FD_IPV6_SOCK) {
evt->get_fd_info()->set_socket_failed();
//
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_socket_status_changed(evt);
m_inspector->m_post_process_cbs.emplace(
[](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_socket_status_changed(evt);

Check warning on line 3907 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3905-L3907

Added lines #L3905 - L3907 were not covered by tests
});
}
}
}
Expand All @@ -3897,7 +3928,6 @@
//
if(retval >= 0) {
sinsp_evt *enter_evt = &m_tmp_evt;
int64_t fdin;

if(!retrieve_enter_event(enter_evt, evt)) {
return;
Expand All @@ -3906,13 +3936,16 @@
//
// Extract the in FD
//
fdin = enter_evt->get_param(1)->as<int64_t>();
int64_t fdin = enter_evt->get_param(1)->as<int64_t>();

//
// If there's an fd listener, call it now
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_sendfile(evt, fdin, (uint32_t)retval);
m_inspector->m_post_process_cbs.emplace(
[fdin, retval](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_sendfile(evt, fdin, (uint32_t)retval);

Check warning on line 3947 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3945-L3947

Added lines #L3945 - L3947 were not covered by tests
});
}
}
}
Expand Down Expand Up @@ -4068,8 +4101,13 @@
return;
}

//
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_socket_shutdown(evt);
m_inspector->m_post_process_cbs.emplace([](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_socket_shutdown(evt);

Check warning on line 4109 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L4108-L4109

Added lines #L4108 - L4109 were not covered by tests
});
}
}
}
Expand Down Expand Up @@ -5048,8 +5086,13 @@
} else {
evt->get_fd_info()->set_socket_connected();
}
//
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_socket_status_changed(evt);
m_inspector->m_post_process_cbs.emplace([](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_socket_status_changed(evt);

Check warning on line 5094 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L5093-L5094

Added lines #L5093 - L5094 were not covered by tests
});
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions userspace/libsinsp/sinsp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,20 @@ int32_t sinsp::next(sinsp_evt** puevt) {
// todo(jason): should we log parsing errors here?
pp.process_event(evt, m_event_sources);
}

// Once we processed all events, make sure to call all
// requested post-process callbacks.
// At this point, any plugin could have modified state tables,
// thus we can guarantee that any post-process callback
// will see the full post-event-processed state.
// NOTE: we don't use a RAII object because
// we cannot guarantee that no exception will be thrown by the callbacks.
if(m_observer != nullptr) {
for(; !m_post_process_cbs.empty(); m_post_process_cbs.pop()) {
auto cb = m_post_process_cbs.front();
cb(m_observer, evt);
}
}
}

// Finally set output evt;
Expand Down
3 changes: 3 additions & 0 deletions userspace/libsinsp/sinsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ limitations under the License.
#include <string>
#include <unordered_set>
#include <vector>
#include <queue>

#define ONE_SECOND_IN_NS 1000000000LL

Expand Down Expand Up @@ -1242,6 +1243,8 @@ class SINSP_PUBLIC sinsp : public capture_stats_source {

sinsp_observer* m_observer{nullptr};

std::queue<std::function<void(sinsp_observer* observer, sinsp_evt* evt)>> m_post_process_cbs{};

bool m_inited;
static std::atomic<int> instance_count;
};
Expand Down
Loading
Loading