Skip to content

Commit

Permalink
Vine: Fix Crash After Idle Disconnect (#4065)
Browse files Browse the repository at this point in the history
* - Commentary on poll_active_workers to clarify purpose.
- Modify idle-disconnect to send "exit" message but not disconnect immediately.  (let the worker do it)

* format
  • Loading branch information
dthain authored Feb 14, 2025
1 parent a5a5af9 commit f38ba02
Showing 1 changed file with 35 additions and 10 deletions.
45 changes: 35 additions & 10 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,30 +269,41 @@ static vine_msg_code_t handle_name(struct vine_manager *q, struct vine_worker_in
}

/*
Handle a timeout request from a worker. Check if the worker has any important data before letting it go.
Handle a request from a worker to shutdown because it has been idle for a while.
However, do not accept the request if the manager has some further information
indicating that the worker should be kept around.
*/

static void handle_worker_timeout(struct vine_manager *q, struct vine_worker_info *w)
static void handle_idle_disconnect_request(struct vine_manager *q, struct vine_worker_info *w)
{
// Look at the files and check if any are endangered temps.
char *cachename;
struct vine_file_replica *replica;
// debug(D_VINE, "Handling timeout request");

/* First check to see if this worker has any unique files that should not be lost. */

HASH_TABLE_ITERATE(w->current_files, cachename, replica)
{
if (replica->type == VINE_TEMP) {
int c = vine_file_replica_table_count_replicas(q, cachename, VINE_FILE_REPLICA_STATE_READY);
if (c == 1) {
debug(D_VINE, "Rejecting timeout request from worker %s (%s). Has unique file %s", w->hostname, w->addrport, cachename);
debug(D_VINE, "Rejecting disconnect request from worker %s (%s). Has unique file %s", w->hostname, w->addrport, cachename);
return;
}
}
}

/*
Then, if it is not running any tasks, tell it to exit and shut down cleanly.
We do not disconnect just yet because there may be more messages pending,
or other information the worker wants to send as it shuts down.
The worker will disconnect on its own.
Also, we don't want to invalidate the worker object w in an unexpected location.
*/

if (itable_size(w->current_tasks) == 0) {
debug(D_VINE, "Accepting timeout request from worker %s (%s).", w->hostname, w->addrport);
debug(D_VINE, "Accepting disconnect request from worker %s (%s).", w->hostname, w->addrport);
q->stats->workers_idled_out++;
vine_manager_shut_down_worker(q, w);
vine_manager_send(q, w, "exit\n");
}

return;
Expand All @@ -313,7 +324,7 @@ static vine_msg_code_t handle_info(struct vine_manager *q, struct vine_worker_in
if (string_prefix_is(field, "tasks_running")) {
w->dynamic_tasks_running = atoi(value);
} else if (string_prefix_is(field, "idle-disconnect-request")) {
handle_worker_timeout(q, w);
handle_idle_disconnect_request(q, w);
} else if (string_prefix_is(field, "worker-id")) {
free(w->workerid);
w->workerid = xxstrdup(value);
Expand Down Expand Up @@ -2507,6 +2518,10 @@ static vine_result_code_t handle_worker(struct vine_manager *q, struct link *l)
w = hash_table_lookup(q->worker_table, key);
free(key);

/* This should not happen, but just in case: */
if (!w)
return VINE_WORKER_FAILURE;

vine_msg_code_t mcode;
mcode = vine_manager_recv_no_retry(q, w, line, sizeof(line));

Expand Down Expand Up @@ -4961,7 +4976,12 @@ struct vine_task *vine_wait_for_task_id(struct vine_manager *q, int task_id, int
return vine_wait_internal(q, timeout, NULL, task_id);
}

/* return number of workers that failed */
/*
Consider all of the connected workers, and act open each connection
that has pending input data, until the input buffer is empty.
Return the number of workers that *failed* and disconnected.
*/

static int poll_active_workers(struct vine_manager *q, int stoptime)
{
BEGIN_ACCUM_TIME(q, time_polling);
Expand Down Expand Up @@ -4993,9 +5013,14 @@ static int poll_active_workers(struct vine_manager *q, int stoptime)

int i;
int workers_failed = 0;
// Then consider all existing active workers

/* Consider all active connections of any kind. */
for (i = 1; i < n; i++) {

/* If there is pending input data on that connection. */
if (q->poll_table[i].revents) {

/* Act on the next input message, until there is no more buffered data. */
do {
if (handle_worker(q, q->poll_table[i].link) == VINE_WORKER_FAILURE) {
workers_failed++;
Expand Down

0 comments on commit f38ba02

Please sign in to comment.