From e993416a8fe85cabb75154d84b50a6097b8775b1 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 3 Sep 2024 15:44:12 -0700 Subject: [PATCH 01/11] t: fix typo Problem: An accidental 'd' was added to remove, making it "removed". Fix spelling. --- t/t0028-content-backing-none.t | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/t0028-content-backing-none.t b/t/t0028-content-backing-none.t index 129cd5404114..57862544236e 100755 --- a/t/t0028-content-backing-none.t +++ b/t/t0028-content-backing-none.t @@ -88,7 +88,7 @@ test_expect_success 'content.backing-module input of none works' ' flux start -Scontent.backing-module=none true ' -test_expect_success 'removedcontent module' ' +test_expect_success 'remove content module' ' flux exec flux module remove content ' From 3f04c1c22c48ce7e16c62d82989b33d1b61ce7b7 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Wed, 4 Sep 2024 13:16:51 -0700 Subject: [PATCH 02/11] t: fix invalid test Problem: A test in t0028-content-backing-none.t incorrectly calls checkpoint_put when it should call checkpoint_get. Fix invalid test. --- t/t0028-content-backing-none.t | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/t0028-content-backing-none.t b/t/t0028-content-backing-none.t index 57862544236e..ba88fdd38b75 100755 --- a/t/t0028-content-backing-none.t +++ b/t/t0028-content-backing-none.t @@ -16,7 +16,7 @@ test_expect_success 'loaded content module' ' ' test_expect_success 'checkpoint-get fails, no checkpoints yet' ' - checkpoint_put foo bar + test_must_fail checkpoint_get foo ' test_expect_success 'checkpoint-put foo w/ rootref bar' ' From fd0cff42503c81676362e1f1092c1f463d6c8bbe Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Fri, 30 Aug 2024 11:26:05 -0700 Subject: [PATCH 03/11] content: correct message unpack style Problem: The typical message unpack style is to place key names and storage pointers on the same line, but that is not done in several locations in the content and content backing modules. Correct code style to be more consistent to the rest of flux-core. --- src/modules/content-files/content-files.c | 9 +++------ src/modules/content-s3/content-s3.c | 21 +++++++-------------- src/modules/content-sqlite/content-sqlite.c | 9 +++------ src/modules/content/checkpoint.c | 6 ++---- 4 files changed, 15 insertions(+), 30 deletions(-) diff --git a/src/modules/content-files/content-files.c b/src/modules/content-files/content-files.c index b5c3fad8db6e..e38d18dfb864 100644 --- a/src/modules/content-files/content-files.c +++ b/src/modules/content-files/content-files.c @@ -227,8 +227,7 @@ void checkpoint_get_cb (flux_t *h, if (flux_respond_pack (h, msg, "{s:O}", - "value", - o) < 0) + "value", o) < 0) flux_log_error (h, "error responding to checkpoint-get request"); free (data); json_decref (o); @@ -257,10 +256,8 @@ void checkpoint_put_cb (flux_t *h, if (flux_request_unpack (msg, NULL, "{s:s s:o}", - "key", - &key, - "value", - &o) < 0) + "key", &key, + "value", &o) < 0) goto error; if (!(value = json_dumps (o, JSON_COMPACT))) { errstr = "failed to encode checkpoint value"; diff --git a/src/modules/content-s3/content-s3.c b/src/modules/content-s3/content-s3.c index 4f39e140851f..3a7521a0935c 100644 --- a/src/modules/content-s3/content-s3.c +++ b/src/modules/content-s3/content-s3.c @@ -158,14 +158,10 @@ static struct s3_config *parse_config (const flux_conf_t *conf, &error, "{s:{s:s, s:s, s:s, s?b !} }", "content-s3", - "credential-file", - &cred_file, - "bucket", - &bucket, - "uri", - &uri, - "virtual-host-style", - &is_virtual_host) < 0) { + "credential-file", &cred_file, + "bucket", &bucket, + "uri", &uri, + "virtual-host-style", &is_virtual_host) < 0) { errprintf (errp, "%s", error.text); goto error; } @@ -367,8 +363,7 @@ void checkpoint_get_cb (flux_t *h, if (flux_respond_pack (h, msg, "{s:O}", - "value", - o) < 0) { + "value", o) < 0) { errno = EIO; flux_log_error (h, "error responding to checkpoint-get request (pack)"); } @@ -400,10 +395,8 @@ void checkpoint_put_cb (flux_t *h, if (flux_request_unpack (msg, NULL, "{s:s s:o}", - "key", - &key, - "value", - &o) < 0) + "key", &key, + "value", &o) < 0) goto error; if (!(value = json_dumps (o, JSON_COMPACT))) { errstr = "failed to encode checkpoint value"; diff --git a/src/modules/content-sqlite/content-sqlite.c b/src/modules/content-sqlite/content-sqlite.c index 312d8060e1dc..764324915fba 100644 --- a/src/modules/content-sqlite/content-sqlite.c +++ b/src/modules/content-sqlite/content-sqlite.c @@ -413,8 +413,7 @@ void checkpoint_get_cb (flux_t *h, if (flux_respond_pack (h, msg, "{s:O}", - "value", - o) < 0) + "value", o) < 0) flux_log_error (h, "flux_respond_pack"); (void )sqlite3_reset (ctx->checkpt_get_stmt); json_decref (o); @@ -440,10 +439,8 @@ void checkpoint_put_cb (flux_t *h, if (flux_request_unpack (msg, NULL, "{s:s s:o}", - "key", - &key, - "value", - &o) < 0) + "key", &key, + "value", &o) < 0) goto error; if (!(value = json_dumps (o, JSON_COMPACT))) { errstr = "failed to encode checkpoint value"; diff --git a/src/modules/content/checkpoint.c b/src/modules/content/checkpoint.c index ad181eadef23..d2cba8cebdd9 100644 --- a/src/modules/content/checkpoint.c +++ b/src/modules/content/checkpoint.c @@ -282,10 +282,8 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh, if (flux_request_unpack (msg, NULL, "{s:s s:o}", - "key", - &key, - "value", - &value) < 0) + "key", &key, + "value", &value) < 0) goto error; if (checkpoint->rank == 0) { From d61504aefb821704f4092463dbe2fa6a9bc362da Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 3 Sep 2024 11:32:50 -0700 Subject: [PATCH 04/11] content: require backing store for checkpoint Problem: A backing store is required for content.flush but it is not required for content.checkpoint-put. This is inconsistent and can lead to checkpointing problems done the line. Require content.checkpoint-put to only work if there is a backing store available. As a consequence, remove code that handled "cached" checkpoints when a backing store is not available. Fixes #6251 --- src/modules/content/cache.c | 1 - src/modules/content/checkpoint.c | 180 +++---------------------------- src/modules/content/checkpoint.h | 2 - 3 files changed, 13 insertions(+), 170 deletions(-) diff --git a/src/modules/content/cache.c b/src/modules/content/cache.c index b543f397fa7a..fa540cca3b37 100644 --- a/src/modules/content/cache.c +++ b/src/modules/content/cache.c @@ -782,7 +782,6 @@ static void content_register_backing_request (flux_t *h, if (flux_respond (h, msg, NULL) < 0) flux_log_error (h, "error responding to register-backing request"); (void)cache_flush (cache); - (void)checkpoints_flush (cache->checkpoint); return; error: if (flux_respond_error (h, msg, errno, errstr) < 0) diff --git a/src/modules/content/checkpoint.c b/src/modules/content/checkpoint.c index d2cba8cebdd9..b0d0b0c878a3 100644 --- a/src/modules/content/checkpoint.c +++ b/src/modules/content/checkpoint.c @@ -28,74 +28,8 @@ struct content_checkpoint { flux_msg_handler_t **handlers; uint32_t rank; struct content_cache *cache; - zhashx_t *hash; - unsigned int hash_dirty; }; -struct checkpoint_data { - struct content_checkpoint *checkpoint; - json_t *value; - uint8_t dirty:1; - bool in_progress; - int refcount; -}; - -static struct checkpoint_data * -checkpoint_data_incref (struct checkpoint_data *data) -{ - if (data) - data->refcount++; - return data; -} - -static void checkpoint_data_decref (struct checkpoint_data *data) -{ - if (data && --data->refcount == 0) { - if (data->dirty) - data->checkpoint->hash_dirty--; - json_decref (data->value); - free (data); - } -} - -/* zhashx_destructor_fn */ -static void checkpoint_data_decref_wrapper (void **arg) -{ - if (arg) { - struct checkpoint_data *data = *arg; - checkpoint_data_decref (data); - } -} - -static struct checkpoint_data * -checkpoint_data_create (struct content_checkpoint *checkpoint, - json_t *value) -{ - struct checkpoint_data *data = NULL; - - if (!(data = calloc (1, sizeof (*data)))) - return NULL; - data->checkpoint = checkpoint; - data->value = json_incref (value); - data->refcount = 1; - return data; -} - -static int checkpoint_data_update (struct content_checkpoint *checkpoint, - const char *key, - json_t *value) -{ - struct checkpoint_data *data = NULL; - - if (!(data = checkpoint_data_create (checkpoint, value))) - return -1; - - zhashx_update (checkpoint->hash, key, data); - data->dirty = 1; - checkpoint->hash_dirty++; - return 0; -} - static void checkpoint_get_continuation (flux_future_t *f, void *arg) { struct content_checkpoint *checkpoint = arg; @@ -111,9 +45,6 @@ static void checkpoint_get_continuation (flux_future_t *f, void *arg) if (flux_rpc_get_unpack (f, "{s:o}", "value", &value) < 0) goto error; - if (checkpoint_data_update (checkpoint, key, value) < 0) - goto error; - if (flux_respond_pack (checkpoint->h, msg, "{s:O}", "value", value) < 0) flux_log_error (checkpoint->h, "error responding to checkpoint-get"); @@ -176,24 +107,16 @@ void content_checkpoint_get_request (flux_t *h, flux_msg_handler_t *mh, const char *key; const char *errstr = NULL; - if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0) - goto error; - if (checkpoint->rank == 0 && !content_cache_backing_loaded (checkpoint->cache)) { - struct checkpoint_data *data = zhashx_lookup (checkpoint->hash, key); - if (!data) { - errstr = "checkpoint key unavailable"; - errno = ENOENT; - goto error; - } - if (flux_respond_pack (h, msg, - "{s:O}", - "value", data->value) < 0) - flux_log_error (h, "error responding to checkpoint-get"); - return; + errstr = "checkpoint get unavailable, no backing store"; + errno = ENOSYS; + goto error; } + if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0) + goto error; + if (checkpoint_get_forward (checkpoint, msg, key, @@ -279,6 +202,13 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh, json_t *value; const char *errstr = NULL; + if (checkpoint->rank == 0 + && !content_cache_backing_loaded (checkpoint->cache)) { + errstr = "checkpoint put unavailable, no backing store"; + errno = ENOSYS; + goto error; + } + if (flux_request_unpack (msg, NULL, "{s:s s:o}", @@ -286,17 +216,6 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh, "value", &value) < 0) goto error; - if (checkpoint->rank == 0) { - if (checkpoint_data_update (checkpoint, key, value) < 0) - goto error; - - if (!content_cache_backing_loaded (checkpoint->cache)) { - if (flux_respond (h, msg, NULL) < 0) - flux_log_error (checkpoint->h, "error responding to checkpoint-put"); - return; - } - } - if (checkpoint_put_forward (checkpoint, msg, key, @@ -311,72 +230,6 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh, flux_log_error (h, "error responding to checkpoint-put request"); } -static void checkpoint_flush_continuation (flux_future_t *f, void *arg) -{ - struct checkpoint_data *data = arg; - int rv; - - assert (data); - if ((rv = flux_rpc_get (f, NULL)) < 0) - flux_log_error (data->checkpoint->h, "checkpoint flush rpc"); - if (!rv) { - data->dirty = 0; - data->checkpoint->hash_dirty--; - } - data->in_progress = false; - checkpoint_data_decref (data); - flux_future_destroy (f); -} - -static int checkpoint_flush (struct content_checkpoint *checkpoint, - struct checkpoint_data *data) -{ - if (data->dirty && !data->in_progress) { - const char *key = zhashx_cursor (checkpoint->hash); - const char *topic = "content-backing.checkpoint-put"; - flux_future_t *f; - if (!(f = flux_rpc_pack (checkpoint->h, topic, 0, 0, - "{s:s s:O}", - "key", key, - "value", data->value)) - || flux_future_then (f, - -1, - checkpoint_flush_continuation, - (void *)checkpoint_data_incref (data)) < 0) { - flux_log_error (checkpoint->h, "%s: checkpoint flush", __FUNCTION__); - flux_future_destroy (f); - return -1; - } - data->in_progress = true; - } - return 0; -} - -int checkpoints_flush (struct content_checkpoint *checkpoint) -{ - int last_errno = 0; - int rc = 0; - - if (checkpoint->hash_dirty > 0) { - struct checkpoint_data *data = zhashx_first (checkpoint->hash); - while (data) { - if (checkpoint_flush (checkpoint, data) < 0) { - last_errno = errno; - rc = -1; - /* A few errors we will consider "unrecoverable", so - * break out */ - if (errno == ENOSYS - || errno == ENOMEM) - break; - } - data = zhashx_next (checkpoint->hash); - } - } - if (rc < 0) - errno = last_errno; - return rc; -} - static const struct flux_msg_handler_spec htab[] = { { FLUX_MSGTYPE_REQUEST, @@ -398,7 +251,6 @@ void content_checkpoint_destroy (struct content_checkpoint *checkpoint) if (checkpoint) { int saved_errno = errno; flux_msg_handler_delvec (checkpoint->handlers); - zhashx_destroy (&checkpoint->hash); free (checkpoint); errno = saved_errno; } @@ -417,16 +269,10 @@ struct content_checkpoint *content_checkpoint_create ( checkpoint->rank = rank; checkpoint->cache = cache; - if (!(checkpoint->hash = zhashx_new ())) - goto nomem; - zhashx_set_destructor (checkpoint->hash, checkpoint_data_decref_wrapper); - if (flux_msg_handler_addvec (h, htab, checkpoint, &checkpoint->handlers) < 0) goto error; return checkpoint; -nomem: - errno = ENOMEM; error: content_checkpoint_destroy (checkpoint); return NULL; diff --git a/src/modules/content/checkpoint.h b/src/modules/content/checkpoint.h index 7bd0c5ad6622..d0322ce6b40e 100644 --- a/src/modules/content/checkpoint.h +++ b/src/modules/content/checkpoint.h @@ -19,8 +19,6 @@ struct content_checkpoint *content_checkpoint_create ( struct content_cache *cache); void content_checkpoint_destroy (struct content_checkpoint *checkpoint); -int checkpoints_flush (struct content_checkpoint *checkpoint); - #endif /* !_CONTENT_CHECKPOINT_H */ /* From ce4d97667fe952291b8dbcb8f877903ec415d808 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 3 Sep 2024 15:43:30 -0700 Subject: [PATCH 05/11] t: remove/adjust checkpoint tests Problem: Now that the content backing store is required for checkpoints, many tests fail. Remove tests that previously assumed that checkpointing worked without a content backing store. Adjust some tests that now have an new error message. --- t/t0012-content-sqlite.t | 43 ++-------------------- t/t0018-content-files.t | 51 ++++---------------------- t/t0028-content-backing-none.t | 65 +++------------------------------- t/t2807-dump-cmd.t | 8 ++--- 4 files changed, 16 insertions(+), 151 deletions(-) diff --git a/t/t0012-content-sqlite.t b/t/t0012-content-sqlite.t index f94fba428320..4d61317b8fe4 100755 --- a/t/t0012-content-sqlite.t +++ b/t/t0012-content-sqlite.t @@ -227,47 +227,8 @@ test_expect_success 'remove content-sqlite module on rank 0' ' flux module remove content-sqlite ' -test_expect_success 'checkpoint-put foo w/ rootref spoon' ' - checkpoint_put foo spoon -' - -test_expect_success 'checkpoint-get foo returned rootref spoon' ' - echo spoon >rootref5.exp && - checkpoint_get foo | jq -r .value | jq -r .rootref >rootref5.out && - test_cmp rootref5.exp rootref5.out -' - -test_expect_success 'load content-sqlite module on rank 0' ' - flux module load content-sqlite -' - -# arg1 - expected reference -wait_checkpoint_flush() { - local expected=$1 - local i=0 - while checkpoint_backing_get foo \ - | jq -r .value \ - | jq -r .rootref > checkpointflush.out \ - && [ $i -lt 50 ] - do - checkpoint=$(cat checkpointflush.out) - if [ "${checkpoint}" = "${expected}" ] - then - return 0 - fi - sleep 0.1 - i=$((i + 1)) - done - return 1 -} - -test_expect_success 'checkpoint-backing-get foo returns spoon' ' - wait_checkpoint_flush spoon -' - -test_expect_success 'remove content-sqlite module on rank 0' ' - flux content flush && - flux module remove content-sqlite +test_expect_success 'checkpoint-put foo w/ rootref spoon fails without backing' ' + test_must_fail checkpoint_put foo spoon ' test_expect_success 'remove heartbeat module' ' diff --git a/t/t0018-content-files.t b/t/t0018-content-files.t index b0004d4cf18a..3a69affdea46 100755 --- a/t/t0018-content-files.t +++ b/t/t0018-content-files.t @@ -249,56 +249,17 @@ test_expect_success 'checkpoint-put bad request fails with EPROTO' ' grep "Protocol error" badput.err ' -test_expect_success 'remove content-files module' ' - flux module remove content-files -' - -test_expect_success 'checkpoint-put foo w/ rootref spoon' ' - checkpoint_put foo spoon -' - -test_expect_success 'checkpoint-get foo returned rootref spoon' ' - echo spoon >rootref7.exp && - checkpoint_get foo | jq -r .value | jq -r .rootref >rootref7.out && - test_cmp rootref7.exp rootref7.out -' - -test_expect_success 'load content-files module on rank 0' ' - flux module load content-files -' - -# arg1 - expected reference -wait_checkpoint_flush() { - local expected=$1 - local i=0 - while checkpoint_backing_get foo \ - | jq -r .value \ - | jq -r .rootref > checkpointflush.out \ - && [ $i -lt 50 ] - do - checkpoint=$(cat checkpointflush.out) - if [ "${checkpoint}" = "${expected}" ] - then - return 0 - fi - sleep 0.1 - i=$((i + 1)) - done - return 1 -} - -test_expect_success 'checkpoint-backing-get foo returns spoon' ' - wait_checkpoint_flush spoon -' - test_expect_success 'flux module stats content-files is open to guests' ' FLUX_HANDLE_ROLEMASK=0x2 \ flux module stats content-files >/dev/null ' -test_expect_success 'remove content-files module on rank 0' ' - flux content flush && - flux module remove content-files +test_expect_success 'remove content-files module' ' + flux module remove content-files +' + +test_expect_success 'checkpoint-put foo w/ rootref spoon fails without backing' ' + test_must_fail checkpoint_put foo spoon ' test_expect_success 'remove content module' ' diff --git a/t/t0028-content-backing-none.t b/t/t0028-content-backing-none.t index ba88fdd38b75..a30f172ff5bd 100755 --- a/t/t0028-content-backing-none.t +++ b/t/t0028-content-backing-none.t @@ -15,75 +15,18 @@ test_expect_success 'loaded content module' ' flux exec flux module load content ' -test_expect_success 'checkpoint-get fails, no checkpoints yet' ' +test_expect_success 'checkpoint-get fails, no backing store' ' test_must_fail checkpoint_get foo ' -test_expect_success 'checkpoint-put foo w/ rootref bar' ' - checkpoint_put foo bar +test_expect_success 'checkpoint-put fails, no backing store' ' + test_must_fail checkpoint_put foo bar ' -test_expect_success 'checkpoint-get foo returned rootref bar' ' - echo bar >rootref.exp && - checkpoint_get foo | jq -r .value | jq -r .rootref >rootref.out && - test_cmp rootref.exp rootref.out -' - -test_expect_success 'checkpoint-put on rank 1 forwards to rank 0' ' - o=$(checkpoint_put_msg rankone rankref) && - jq -j -c -n ${o} | flux exec -r 1 ${RPC} content.checkpoint-put -' - -test_expect_success 'checkpoint-get on rank 1 forwards to rank 0' ' - echo rankref >rankref.exp && - o=$(checkpoint_get_msg rankone) && - jq -j -c -n ${o} \ - | flux exec -r 1 ${RPC} content.checkpoint-get \ - | jq -r .value | jq -r .rootref > rankref.out && - test_cmp rankref.exp rankref.out -' - -test_expect_success 'flux-dump --checkpoint with missing checkpoint fails' ' +test_expect_success 'flux-dump --checkpoint with no backing store' ' test_must_fail flux dump --checkpoint foo.tar ' -test_expect_success 'load kvs and create some kvs data' ' - flux module load kvs && - flux kvs put a=1 && - flux kvs put b=foo -' - -test_expect_success 'reload kvs' ' - flux module reload kvs && - test $(flux kvs get a) = "1" && - test $(flux kvs get b) = "foo" -' - -test_expect_success 'unload kvs' ' - flux module remove kvs -' - -test_expect_success 'dump default=kvs-primary checkpoint works' ' - flux dump --checkpoint foo.tar -' - -test_expect_success 'restore content' ' - flux restore --checkpoint foo.tar -' - -test_expect_success 'reload kvs' ' - flux module load kvs -' - -test_expect_success 'verify KVS content restored' ' - test $(flux kvs get a) = "1" && - test $(flux kvs get b) = "foo" -' - -test_expect_success 'unload kvs' ' - flux module remove kvs -' - test_expect_success 'content.backing-module input of none works' ' flux start -Scontent.backing-module=none true ' diff --git a/t/t2807-dump-cmd.t b/t/t2807-dump-cmd.t index e8092315f139..c4abafc26e35 100755 --- a/t/t2807-dump-cmd.t +++ b/t/t2807-dump-cmd.t @@ -27,7 +27,7 @@ test_expect_success 'flux-restore with no args prints Usage message' ' ' test_expect_success 'flux-dump with no backing store fails' ' test_must_fail flux dump --checkpoint foo.tar 2>nostore.err && - grep "checkpoint key unavailable" nostore.err + grep "checkpoint get unavailable, no backing store" nostore.err ' test_expect_success 'flux-dump with bad archive file fails' ' test_must_fail flux dump /badfile.tar 2>badfile.err && @@ -181,9 +181,9 @@ test_expect_success 'restore to key fails when kvs is not loaded' ' test_expect_success 'unload content-sqlite' ' flux module remove content-sqlite ' -test_expect_success 'restore --checkpoint with no backing store cant flush' ' - flux restore --checkpoint foo.tar 2>noback.err && - grep "error flushing content cache" noback.err +test_expect_success 'restore --checkpoint with no backing store cant checkpoint' ' + test_must_fail flux restore --checkpoint foo.tar 2>noback.err && + grep "checkpoint put unavailable, no backing store" noback.err ' test_expect_success 'dump --no-cache with no backing store fails' ' test_must_fail flux dump --no-cache --checkpoint x.tar From aa681a054939b6c4518e0eb70e5aed2e3ddd9d82 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Wed, 4 Sep 2024 13:19:09 -0700 Subject: [PATCH 06/11] t: cover "none" backing store checkpoints Problem: There is no coverage to ensure that the "none" backing store works identically to when no backing store is never loaded. Add coverage in t0028-content-backing-none.t. --- t/t0028-content-backing-none.t | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/t/t0028-content-backing-none.t b/t/t0028-content-backing-none.t index a30f172ff5bd..8bd824baf93c 100755 --- a/t/t0028-content-backing-none.t +++ b/t/t0028-content-backing-none.t @@ -31,6 +31,18 @@ test_expect_success 'content.backing-module input of none works' ' flux start -Scontent.backing-module=none true ' +test_expect_success 'checkpoint-get fails, backing store is none' ' + test_must_fail checkpoint_get foo +' + +test_expect_success 'checkpoint-put fails, backing store is none' ' + test_must_fail checkpoint_put foo bar +' + +test_expect_success 'flux-dump --checkpoint with backing store is none' ' + test_must_fail flux dump --checkpoint foo.tar +' + test_expect_success 'remove content module' ' flux exec flux module remove content ' From 0f9eafffb621ae69d8ba7cad1b076ad13470fab6 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 27 Aug 2024 18:20:05 +0000 Subject: [PATCH 07/11] t: cover FLUX_KVS_SYNC on ENOSPC Problem: There is no coverage to ensure FLUX_KVS_SYNC fails when there is no longer space on disk. Add coverage to t0090-content-enospc.t. --- t/t0090-content-enospc.t | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/t/t0090-content-enospc.t b/t/t0090-content-enospc.t index fa2cfd8b8be6..34a18e754124 100755 --- a/t/t0090-content-enospc.t +++ b/t/t0090-content-enospc.t @@ -58,4 +58,16 @@ test_expect_success 'content flush returns error on ENOSPC' ' grep "content.flush: No space left on device" flush.err ' +# flux start will fail b/c rc3 will fail due to ENOSPC +test_expect_success 'kvs sync fails due to ENOSPC' ' + rm -rf /test/tmpfs-1m/* && + mkdir /test/tmpfs-1m/statedir && + test_must_fail flux start \ + -o,-Scontent.backing-module=content-sqlite \ + -o,-Sstatedir=/test/tmpfs-1m/statedir \ + "./fillstatedir.sh; flux dmesg; flux kvs put --sync foo=1" > sync.out 2> sync.err && + grep -q "No space left on device" sync.out && + grep "flux_kvs_commit: No space left on device" sync.err +' + test_done From ce4264f253cbb32b3ad5b3a496055590a33d925b Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 27 Aug 2024 18:41:34 +0000 Subject: [PATCH 08/11] t: cover FLUX_KVS_SYNC without backing store Problem: There is no coverage to ensure FLUX_KVS_SYNC does not work if there is no backing store. Add coverage in t1010-kvs-commit-sync.t. --- t/t1010-kvs-commit-sync.t | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/t/t1010-kvs-commit-sync.t b/t/t1010-kvs-commit-sync.t index db3ba60d82a9..a6092a496008 100755 --- a/t/t1010-kvs-commit-sync.t +++ b/t/t1010-kvs-commit-sync.t @@ -18,13 +18,17 @@ checkpoint_get() { jq -j -c -n "{key:\"$1\"}" | $RPC content.checkpoint-get } -test_expect_success 'load content module' ' - flux module load content +test_expect_success 'load content module and kvs' ' + flux module load content && + flux module load kvs +' + +test_expect_success 'sync does not work without a backing store' ' + test_must_fail flux kvs put --sync testsync=1 ' test_expect_success 'load content-sqlite and kvs and add some data' ' flux module load content-sqlite && - flux module load kvs && flux kvs put a=1 && flux kvs put b=2 ' From ef6a66be8010d770d49834ab700355dcf300b83c Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 27 Aug 2024 11:57:32 -0700 Subject: [PATCH 09/11] kvs: call content.flush before checkpoint Problem: When the KVS module is unloaded, a checkpoint of the root reference is attempted. However, a content.flush is not done beforehand. This could result in an invalid checkpoint reference as data is not guaranteed to be flushed to the backing store. Solution: Call content.flush before checkpointing. Fixes #6237 --- src/modules/kvs/kvs.c | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index d36229b24fe2..336295ed80c9 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -3142,15 +3142,22 @@ static int checkpoint_get (flux_t *h, char *buf, size_t len, int *seq) */ static int checkpoint_put (flux_t *h, const char *rootref, int rootseq) { - flux_future_t *f = NULL; + flux_future_t *f1 = NULL; + flux_future_t *f2 = NULL; int rv = -1; - if (!(f = kvs_checkpoint_commit (h, NULL, rootref, rootseq, 0, 0)) - || flux_rpc_get (f, NULL) < 0) + /* first must ensure all content is flushed */ + if (!(f1 = flux_rpc (h, "content.flush", NULL, 0, 0)) + || flux_rpc_get (f1, NULL) < 0) + goto error; + + if (!(f2 = kvs_checkpoint_commit (h, NULL, rootref, rootseq, 0, 0)) + || flux_rpc_get (f2, NULL) < 0) goto error; rv = 0; error: - flux_future_destroy (f); + flux_future_destroy (f1); + flux_future_destroy (f2); return rv; } From 3402b5b45d149fe954af8789e9ec86f46cf4e85b Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Thu, 29 Aug 2024 23:41:12 +0000 Subject: [PATCH 10/11] content: flush before checkpointing Problem: Before checkpointing, users need to remember to call content.flush, to ensure data has been flushed to the backing store. It is easy to forget this. Within the content module, call content.flush before checkpointing. Fixes #6242 --- src/modules/content/checkpoint.c | 85 ++++++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 5 deletions(-) diff --git a/src/modules/content/checkpoint.c b/src/modules/content/checkpoint.c index b0d0b0c878a3..a4e08df9a820 100644 --- a/src/modules/content/checkpoint.c +++ b/src/modules/content/checkpoint.c @@ -194,6 +194,83 @@ static int checkpoint_put_forward (struct content_checkpoint *checkpoint, return -1; } +static void checkpoint_content_flush_continuation (flux_future_t *f, void *arg) +{ + struct content_checkpoint *checkpoint = arg; + const flux_msg_t *msg = flux_future_aux_get (f, "msg"); + const char *key; + json_t *value; + int flags = 0; + const char *errstr = NULL; + + assert (msg); + + if (flux_rpc_get (f, NULL) < 0) { + errstr = "error flushing content"; + goto error; + } + + if (flux_request_unpack (msg, + NULL, + "{s:s s:o s?i}", + "key", &key, + "value", &value, + "flags", &flags) < 0) + goto error; + + if (checkpoint_put_forward (checkpoint, + msg, + key, + value, + &errstr) < 0) + goto error; + + flux_future_destroy (f); + return; + +error: + if (flux_respond_error (checkpoint->h, msg, errno, errstr) < 0) + flux_log_error (checkpoint->h, + "error responding to checkpoint-put request"); + flux_future_destroy (f); +} + +static int checkpoint_content_flush (struct content_checkpoint *checkpoint, + const flux_msg_t *msg, + const char **errstr) +{ + const char *topic = "content.flush"; + uint32_t rank; + flux_future_t *f = NULL; + + if (flux_get_rank (checkpoint->h, &rank) < 0) { + (*errstr) = "error retrieving rank"; + return -1; + } + + if (!(f = flux_rpc (checkpoint->h, topic, NULL, rank, 0)) + || flux_future_then (f, + -1, + checkpoint_content_flush_continuation, + checkpoint) < 0) + goto error; + + if (flux_future_aux_set (f, + "msg", + (void *)flux_msg_incref (msg), + (flux_free_f)flux_msg_decref) < 0) { + flux_msg_decref (msg); + goto error; + } + + return 0; + +error: + (*errstr) = "error starting content.flush RPC"; + flux_future_destroy (f); + return -1; +} + void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { @@ -216,11 +293,9 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh, "value", &value) < 0) goto error; - if (checkpoint_put_forward (checkpoint, - msg, - key, - value, - &errstr) < 0) + if (checkpoint_content_flush (checkpoint, + msg, + &errstr) < 0) goto error; return; From 647340f7b61e759113565235e09132953b3875de Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Fri, 30 Aug 2024 10:26:36 -0700 Subject: [PATCH 11/11] kvs: remove calls to content.flush Problem: Calls to content.checkpoint-put (via kvs_checkpoint_commit()) will automatically call content.flush. Therefore the calls to content.flush in the kvs are duplicates and now unnecessary. Remove calls to content.flush in the kvs module. --- src/modules/kvs/kvs.c | 29 +++----------------- src/modules/kvs/kvstxn.c | 51 +++-------------------------------- src/modules/kvs/kvstxn.h | 11 ++------ src/modules/kvs/test/kvstxn.c | 3 --- 4 files changed, 10 insertions(+), 84 deletions(-) diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 336295ed80c9..d8f8173fcccc 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -1068,20 +1068,6 @@ static void kvstxn_apply (kvstxn_t *kt) assert (wait_get_usecount (wait) > 0); goto stall; } - else if (ret == KVSTXN_PROCESS_SYNC_CONTENT_FLUSH) { - /* N.B. futre is managed by kvstxn, should not call - * flux_future_destroy() on it */ - flux_future_t *f = kvstxn_sync_content_flush (kt); - if (!f) { - errnum = errno; - goto done; - } - if (flux_future_then (f, -1., kvstxn_apply_cb, kt) < 0) { - errnum = errno; - goto done; - } - goto stall; - } else if (ret == KVSTXN_PROCESS_SYNC_CHECKPOINT) { /* N.B. futre is managed by kvstxn, should not call * flux_future_destroy() on it */ @@ -3142,22 +3128,15 @@ static int checkpoint_get (flux_t *h, char *buf, size_t len, int *seq) */ static int checkpoint_put (flux_t *h, const char *rootref, int rootseq) { - flux_future_t *f1 = NULL; - flux_future_t *f2 = NULL; + flux_future_t *f = NULL; int rv = -1; - /* first must ensure all content is flushed */ - if (!(f1 = flux_rpc (h, "content.flush", NULL, 0, 0)) - || flux_rpc_get (f1, NULL) < 0) - goto error; - - if (!(f2 = kvs_checkpoint_commit (h, NULL, rootref, rootseq, 0, 0)) - || flux_rpc_get (f2, NULL) < 0) + if (!(f = kvs_checkpoint_commit (h, NULL, rootref, rootseq, 0, 0)) + || flux_rpc_get (f, NULL) < 0) goto error; rv = 0; error: - flux_future_destroy (f1); - flux_future_destroy (f2); + flux_future_destroy (f); return rv; } diff --git a/src/modules/kvs/kvstxn.c b/src/modules/kvs/kvstxn.c index 1a7e1eea99c4..83fd8bb4b90d 100644 --- a/src/modules/kvs/kvstxn.c +++ b/src/modules/kvs/kvstxn.c @@ -61,7 +61,6 @@ struct kvstxn { char newroot[BLOBREF_MAX_STRING_SIZE]; zlist_t *missing_refs_list; zlist_t *dirty_cache_entries_list; - flux_future_t *f_sync_content_flush; flux_future_t *f_sync_checkpoint; bool processing; /* kvstxn is being processed */ bool merged; /* kvstxn is a merger of transactions */ @@ -77,7 +76,6 @@ struct kvstxn { * STORE - generate dirty entries for caller to store * GENERATE_KEYS - stall until stores complete * - generate keys modified in txn - * SYNC_CONTENT_FLUSH - call content.flush (for FLUX_KVS_SYNC) * SYNC_CHECKPOINT - call kvs_checkpoint_commit (for FLUX_KVS_SYNC) * FINISHED - end state * @@ -87,8 +85,7 @@ struct kvstxn { * APPLY_OPS -> STORE * STORE -> GENERATE_KEYS * GENERATE_KEYS -> FINISHED - * GENERATE_KEYS -> SYNC_CONTENT_FLUSH - * SYNC_CONTENT_FLUSH -> SYNC_CHECKPOINT + * GENERATE_KEYS -> SYNC_CHECKPOINT * SYNC_CHECKPOINT -> FINISHED */ enum { @@ -97,9 +94,8 @@ struct kvstxn { KVSTXN_STATE_APPLY_OPS = 3, KVSTXN_STATE_STORE = 4, KVSTXN_STATE_GENERATE_KEYS = 5, - KVSTXN_STATE_SYNC_CONTENT_FLUSH = 6, - KVSTXN_STATE_SYNC_CHECKPOINT = 7, - KVSTXN_STATE_FINISHED = 8, + KVSTXN_STATE_SYNC_CHECKPOINT = 6, + KVSTXN_STATE_FINISHED = 7, } state; }; @@ -116,7 +112,6 @@ static void kvstxn_destroy (kvstxn_t *kt) zlist_destroy (&kt->missing_refs_list); if (kt->dirty_cache_entries_list) zlist_destroy (&kt->dirty_cache_entries_list); - flux_future_destroy (kt->f_sync_content_flush); flux_future_destroy (kt->f_sync_checkpoint); free (kt); } @@ -1085,38 +1080,10 @@ kvstxn_process_t kvstxn_process (kvstxn_t *kt, } if (kt->flags & FLUX_KVS_SYNC) - kt->state = KVSTXN_STATE_SYNC_CONTENT_FLUSH; + kt->state = KVSTXN_STATE_SYNC_CHECKPOINT; else kt->state = KVSTXN_STATE_FINISHED; } - else if (kt->state == KVSTXN_STATE_SYNC_CONTENT_FLUSH) { - if (!(kt->f_sync_content_flush)) { - kt->f_sync_content_flush = flux_rpc (kt->ktm->h, - "content.flush", - NULL, - 0, - 0); - if (!kt->f_sync_content_flush) { - kt->errnum = errno; - return KVSTXN_PROCESS_ERROR; - } - kt->blocked = 1; - return KVSTXN_PROCESS_SYNC_CONTENT_FLUSH; - } - - /* user did not wait for future to complex */ - if (!flux_future_is_ready (kt->f_sync_content_flush)) { - kt->blocked = 1; - return KVSTXN_PROCESS_SYNC_CONTENT_FLUSH; - } - - if (flux_rpc_get (kt->f_sync_content_flush, NULL) < 0) { - kt->errnum = errno; - return KVSTXN_PROCESS_ERROR; - } - - kt->state = KVSTXN_STATE_SYNC_CHECKPOINT; - } else if (kt->state == KVSTXN_STATE_SYNC_CHECKPOINT) { if (!(kt->f_sync_checkpoint)) { @@ -1223,16 +1190,6 @@ int kvstxn_iter_dirty_cache_entries (kvstxn_t *kt, return 0; } -flux_future_t *kvstxn_sync_content_flush (kvstxn_t *kt) -{ - if (kt->state != KVSTXN_STATE_SYNC_CONTENT_FLUSH) { - errno = EINVAL; - return NULL; - } - - return kt->f_sync_content_flush; -} - flux_future_t *kvstxn_sync_checkpoint (kvstxn_t *kt) { if (kt->state != KVSTXN_STATE_SYNC_CHECKPOINT) { diff --git a/src/modules/kvs/kvstxn.h b/src/modules/kvs/kvstxn.h index 1e715cf35b12..8de66dd172fb 100644 --- a/src/modules/kvs/kvstxn.h +++ b/src/modules/kvs/kvstxn.h @@ -22,9 +22,8 @@ typedef enum { KVSTXN_PROCESS_ERROR = 1, KVSTXN_PROCESS_LOAD_MISSING_REFS = 2, KVSTXN_PROCESS_DIRTY_CACHE_ENTRIES = 3, - KVSTXN_PROCESS_SYNC_CONTENT_FLUSH = 4, - KVSTXN_PROCESS_SYNC_CHECKPOINT = 5, - KVSTXN_PROCESS_FINISHED = 6, + KVSTXN_PROCESS_SYNC_CHECKPOINT = 4, + KVSTXN_PROCESS_FINISHED = 5, } kvstxn_process_t; /* api flags, to be used with kvstxn_mgr_add_transaction() @@ -95,7 +94,6 @@ json_t *kvstxn_get_keys (kvstxn_t *kt); * KVSTXN_PROCESS_LOAD_MISSING_REFS stall & load, * KVSTXN_PROCESS_DIRTY_CACHE_ENTRIES stall & process dirty cache * entries, - * KVSTXN_PROCESS_SYNC_CONTENT_FLUSH stall & wait for future to fulfill * KVSTXN_PROCESS_SYNC_CHECKPOINT stall & wait for future to fulfill * KVSTXN_PROCESS_FINISHED all done * @@ -106,8 +104,6 @@ json_t *kvstxn_get_keys (kvstxn_t *kt); * on stall & process dirty cache entries, call * kvstxn_iter_dirty_cache_entries() to process entries. * - * on stall & content-flush, call kvstxn_sync_content_flush() to get future. - * * on stall & checkpoint, call kvstxn_sync_checkpoint() to get future. * * on completion, call kvstxn_get_newroot_ref() to get reference to @@ -140,9 +136,6 @@ int kvstxn_iter_dirty_cache_entries (kvstxn_t *kt, */ void kvstxn_cleanup_dirty_cache_entry (kvstxn_t *kt, struct cache_entry *entry); -/* on stall, get confent.flush future to wait for fulfillment on */ -flux_future_t *kvstxn_sync_content_flush (kvstxn_t *kt); - /* on stall, get checkpoint future to wait for fulfillment on */ flux_future_t *kvstxn_sync_checkpoint (kvstxn_t *kt); diff --git a/src/modules/kvs/test/kvstxn.c b/src/modules/kvs/test/kvstxn.c index 301925aaa337..77f1cda17e93 100644 --- a/src/modules/kvs/test/kvstxn.c +++ b/src/modules/kvs/test/kvstxn.c @@ -700,9 +700,6 @@ void kvstxn_basic_tests (void) ok (kvstxn_iter_dirty_cache_entries (kt, cache_noop_cb, NULL) < 0, "kvstxn_iter_dirty_cache_entries returns < 0 for call on invalid state"); - ok (kvstxn_sync_content_flush (kt) == NULL, - "kvstxn_sync_content_flush returns NULL for call on invalid state"); - ok (kvstxn_sync_checkpoint (kt) == NULL, "kvstxn_sync_checkpoint returns NULL for call on invalid state");