Skip to content

Commit

Permalink
Add fill value-related read/write options
Browse files Browse the repository at this point in the history
Previously, when writing chunked array formats (zarr v2/v3, n5,
neuroglancer_precomputed), TensorStore used the following behavior:

- When reading, missing chunks are replaced by the fill value.
- When writing, chunks equal to the fill value are represented as
  missing chunks.

This behavior is still the default, but there are now two options that
can be set in the spec for these drivers, `fill_missing_data_reads` and
`store_data_equal_to_fill_value` to override the default behavior.

PiperOrigin-RevId: 696228231
Change-Id: I2bc247435bbb9a916c4af7d0d1851ea6176504e0
  • Loading branch information
jbms authored and copybara-github committed Nov 13, 2024
1 parent c51db9a commit f462942
Show file tree
Hide file tree
Showing 24 changed files with 517 additions and 69 deletions.
9 changes: 7 additions & 2 deletions tensorstore/driver/chunk_cache_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ class ChunkCacheReadWriteDriverMixin : public Parent {
void Read(Driver::ReadRequest request, ReadChunkReceiver receiver) override {
static_cast<Derived*>(this)->cache()->Read(
{std::move(request), static_cast<Derived*>(this)->component_index(),
static_cast<Derived*>(this)->data_staleness_bound().time},
static_cast<Derived*>(this)->data_staleness_bound().time,
static_cast<Derived*>(this)->fill_missing_data_reads()},
std::move(receiver));
}

/// Simply forwards to `ChunkCache::Write`.
void Write(Driver::WriteRequest request,
WriteChunkReceiver receiver) override {
static_cast<Derived*>(this)->cache()->Write(
{std::move(request), static_cast<Derived*>(this)->component_index()},
{std::move(request), static_cast<Derived*>(this)->component_index(),
static_cast<Derived*>(this)->store_data_equal_to_fill_value()},
std::move(receiver));
}
};
Expand Down Expand Up @@ -144,6 +146,9 @@ class ChunkCacheDriver

public:
using Base::Base;

bool fill_missing_data_reads() const { return true; }
bool store_data_equal_to_fill_value() const { return false; }
};

} // namespace internal
Expand Down
10 changes: 10 additions & 0 deletions tensorstore/driver/kvs_backed_chunk_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ Result<IndexTransform<>> KvsMetadataDriverBase::GetBoundSpecData(
spec.store.path = cache->GetBaseKvstorePath();
spec.data_copy_concurrency = metadata_cache->data_copy_concurrency_;
spec.cache_pool = cache->cache_pool_;
spec.fill_value_mode = fill_value_mode_;
if (spec.cache_pool != metadata_cache->metadata_cache_pool_) {
spec.metadata_cache_pool = metadata_cache->metadata_cache_pool_;
}
Expand Down Expand Up @@ -906,6 +907,7 @@ Result<internal::Driver::Handle> CreateTensorStoreFromMetadata(
state->AllocateDriver(std::move(initializer)), read_write_mode);
driver->metadata_staleness_bound_ =
base.spec_->staleness.metadata.BoundAtOpen(base.request_time_);
driver->fill_value_mode_ = base.spec_->fill_value_mode;
if (base.spec_->assume_metadata || base.spec_->assume_cached_metadata) {
driver->assumed_metadata_ = metadata;
driver->assumed_metadata_time_ = base.spec_->assume_cached_metadata
Expand Down Expand Up @@ -1486,6 +1488,14 @@ TENSORSTORE_DEFINE_JSON_BINDER(
jb::Member("recheck_cached_data",
jb::Projection(&StalenessBounds::data,
jb::DefaultInitializedValue())))),
jb::Projection<&KvsDriverSpec::fill_value_mode>(jb::Sequence(
jb::Member("fill_missing_data_reads",
jb::Projection<&FillValueMode::fill_missing_data_reads>(
jb::DefaultValue([](auto* obj) { *obj = true; }))),
jb::Member(
"store_data_equal_to_fill_value",
jb::Projection<&FillValueMode::store_data_equal_to_fill_value>(
jb::DefaultInitializedValue())))),
internal::OpenModeSpecJsonBinder));

} // namespace internal_kvs_backed_chunk_driver
Expand Down
22 changes: 21 additions & 1 deletion tensorstore/driver/kvs_backed_chunk_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@
namespace tensorstore {
namespace internal_kvs_backed_chunk_driver {

struct FillValueMode {
bool fill_missing_data_reads;
bool store_data_equal_to_fill_value;
static constexpr auto ApplyMembers = [](auto& x, auto f) {
return f(x.fill_missing_data_reads, x.store_data_equal_to_fill_value);
};
};

/// Base class for specification representations used by drivers, for use with
/// the driver registry.
///
Expand All @@ -71,12 +79,13 @@ struct KvsDriverSpec : public internal::DriverSpec,
std::optional<Context::Resource<internal::CachePoolResource>>
metadata_cache_pool;
StalenessBounds staleness;
FillValueMode fill_value_mode;

static constexpr auto ApplyMembers = [](auto& x, auto f) {
return f(internal::BaseCast<internal::DriverSpec>(x),
internal::BaseCast<internal::OpenModeSpec>(x), x.store,
x.data_copy_concurrency, x.cache_pool, x.metadata_cache_pool,
x.staleness);
x.staleness, x.fill_value_mode);
};

kvstore::Spec GetKvstore() const override;
Expand Down Expand Up @@ -567,6 +576,15 @@ class KvsMetadataDriverBase : public internal::Driver {

virtual const StalenessBound& data_staleness_bound() const = 0;

// Accessors for use by ChunkCacheReadWriteDriverMixin, if the derived class
// happens to use it.
bool fill_missing_data_reads() const {
return fill_value_mode_.fill_missing_data_reads;
}
bool store_data_equal_to_fill_value() const {
return fill_value_mode_.store_data_equal_to_fill_value;
}

// Treat as private:

StalenessBound metadata_staleness_bound_;
Expand All @@ -579,6 +597,8 @@ class KvsMetadataDriverBase : public internal::Driver {
/// the open request. If `OpenMode::assume_metadata` was specified, set to
/// `absl::InfiniteFuture()`. Otherwise, set to `absl::InfinitePast()`.
absl::Time assumed_metadata_time_ = absl::InfinitePast();

FillValueMode fill_value_mode_;
};

/// Abstract driver base class for use with `ChunkedDataCacheBase`.
Expand Down
17 changes: 17 additions & 0 deletions tensorstore/driver/kvs_backed_chunk_driver_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,23 @@ allOf:
a `~Context.cache_pool` with a non-zero
`~Context.cache_pool.total_bytes_limit` and also specify ``false``,
``"open"``, or an explicit time bound for `.recheck_cached_data`.
fill_missing_data_reads:
default: true
title: Replace missing chunks with the fill value when reading.
description: |
If disabled, reading a missing chunk will result in an error. Note
that the fill value may still be used when writing a partial chunk.
Typically this should only be set to ``false`` in the case that
`.store_data_equal_to_fill_value` was enabled when writing.
store_data_equal_to_fill_value:
default: false
title: |
Store all explicitly written data, even if it is equal to the fill
value.
description: |
This ensures that explicitly written data, even if it is equal to the
fill value, can be distinguished from missing data. If disabled,
chunks equal to the fill value may be represented as missing chunks.
required:
- kvstore
definitions:
Expand Down
69 changes: 69 additions & 0 deletions tensorstore/driver/n5/driver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,8 @@ TENSORSTORE_GLOBAL_INITIALIZER {
tensorstore::internal::TestTensorStoreDriverSpecRoundtripOptions options;
options.test_name = "n5";
options.create_spec = GetJsonSpec();
options.create_spec["fill_missing_data_reads"] = false;
options.create_spec["store_data_equal_to_fill_value"] = true;
options.full_spec = {
{"dtype", "int16"},
{"driver", "n5"},
Expand All @@ -887,6 +889,8 @@ TENSORSTORE_GLOBAL_INITIALIZER {
{"driver", "file"},
{"path", "${TEMPDIR}/prefix/"},
}},
{"fill_missing_data_reads", false},
{"store_data_equal_to_fill_value", true},
{"transform",
{{"input_labels", {"x", "y"}},
{"input_exclusive_max", {{10}, {11}}},
Expand All @@ -901,6 +905,8 @@ TENSORSTORE_GLOBAL_INITIALIZER {
{"driver", "file"},
{"path", "${TEMPDIR}/prefix/"},
}},
{"fill_missing_data_reads", false},
{"store_data_equal_to_fill_value", true},
{"transform",
{{"input_labels", {"x", "y"}},
{"input_exclusive_max", {{10}, {11}}},
Expand Down Expand Up @@ -1814,4 +1820,67 @@ TEST(DriverTest, ResolutionOnlyMetadataMismatch) {
MatchesStatus(absl::StatusCode::kFailedPrecondition, ".*\"units\".*"));
}

TEST(DriverTest, FillMissingDataReads) {
for (bool fill_missing_data_reads : {false, true}) {
SCOPED_TRACE(tensorstore::StrCat("fill_missing_data_reads=",
fill_missing_data_reads));
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
auto store,
tensorstore::Open(
{
{"driver", "n5"},
{"kvstore", "memory://"},
{"fill_missing_data_reads", fill_missing_data_reads},
},
dtype_v<int16_t>, Schema::Shape({1}), tensorstore::OpenMode::create)
.result());
{
auto read_result = tensorstore::Read(store).result();
if (fill_missing_data_reads) {
EXPECT_THAT(read_result,
::testing::Optional(tensorstore::MakeArray<int16_t>({0})));
} else {
EXPECT_THAT(read_result,
MatchesStatus(absl::StatusCode::kNotFound,
"chunk \\{0\\} stored at \"0\" is missing"));
}
}
TENSORSTORE_ASSERT_OK(
tensorstore::Write(tensorstore::MakeArray<int16_t>({1}), store)
.result());
EXPECT_THAT(tensorstore::Read(store).result(),
::testing::Optional(tensorstore::MakeArray<int16_t>({1})));
}
}

// Tests that all-zero chunks are written if
// `store_data_equal_to_fill_value=true`.
TEST(DriverTest, StoreDataEqualToFillValue) {
for (bool store_data_equal_to_fill_value : {false, true}) {
SCOPED_TRACE(tensorstore::StrCat("store_data_equal_to_fill_value=",
store_data_equal_to_fill_value));
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
auto store, tensorstore::Open({{"driver", "n5"},
{"kvstore", "memory://"},
{"store_data_equal_to_fill_value",
store_data_equal_to_fill_value}},
tensorstore::dtype_v<uint8_t>,
tensorstore::RankConstraint{0},
tensorstore::OpenMode::create)
.result());
TENSORSTORE_ASSERT_OK(
tensorstore::Write(tensorstore::MakeScalarArray<uint8_t>(0), store));
if (store_data_equal_to_fill_value) {
EXPECT_THAT(
GetMap(store.kvstore()),
::testing::Optional(::testing::UnorderedElementsAre(
Pair("attributes.json", ::testing::_), Pair("0", ::testing::_))));
} else {
EXPECT_THAT(GetMap(store.kvstore()),
::testing::Optional(::testing::UnorderedElementsAre(
Pair("attributes.json", ::testing::_))));
}
}
}

} // namespace
67 changes: 67 additions & 0 deletions tensorstore/driver/neuroglancer_precomputed/driver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3026,4 +3026,71 @@ TEST(DriverTest, SeparateMetadataCache) {
}
}

TEST(DriverTest, FillMissingDataReads) {
for (bool fill_missing_data_reads : {false, true}) {
SCOPED_TRACE(tensorstore::StrCat("fill_missing_data_reads=",
fill_missing_data_reads));
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
auto store,
tensorstore::Open(
{
{"driver", "neuroglancer_precomputed"},
{"kvstore", "memory://"},
{"fill_missing_data_reads", fill_missing_data_reads},
},
dtype_v<int16_t>, Schema::Shape({1, 1, 1, 1}),
tensorstore::OpenMode::create)
.result());
{
auto read_result = tensorstore::Read(store).result();
if (fill_missing_data_reads) {
EXPECT_THAT(
read_result,
::testing::Optional(tensorstore::MakeArray<int16_t>({{{{0}}}})));
} else {
EXPECT_THAT(read_result,
MatchesStatus(absl::StatusCode::kNotFound,
"chunk \\{0, 0, 0\\} stored at "
"\"1_1_1/0-1_0-1_0-1\" is missing"));
}
}
TENSORSTORE_ASSERT_OK(
tensorstore::Write(tensorstore::MakeArray<int16_t>({1}), store)
.result());
EXPECT_THAT(
tensorstore::Read(store).result(),
::testing::Optional(tensorstore::MakeArray<int16_t>({{{{1}}}})));
}
}

// Tests that all-zero chunks are written if
// `store_data_equal_to_fill_value=true`.
TEST(DriverTest, StoreDataEqualToFillValue) {
for (bool store_data_equal_to_fill_value : {false, true}) {
SCOPED_TRACE(tensorstore::StrCat("store_data_equal_to_fill_value=",
store_data_equal_to_fill_value));
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
auto store, tensorstore::Open({{"driver", "neuroglancer_precomputed"},
{"kvstore", "memory://"},
{"store_data_equal_to_fill_value",
store_data_equal_to_fill_value}},
tensorstore::dtype_v<uint8_t>,
tensorstore::Schema::Shape({1, 1, 1, 1}),
tensorstore::OpenMode::create)
.result());
TENSORSTORE_ASSERT_OK(
tensorstore::Write(tensorstore::MakeScalarArray<uint8_t>(0), store));
if (store_data_equal_to_fill_value) {
EXPECT_THAT(GetMap(store.kvstore()),
::testing::Optional(::testing::UnorderedElementsAre(
Pair("info", ::testing::_),
Pair("1_1_1/0-1_0-1_0-1", ::testing::_))));
} else {
EXPECT_THAT(GetMap(store.kvstore()),
::testing::Optional(::testing::UnorderedElementsAre(
Pair("info", ::testing::_))));
}
}
}

} // namespace
5 changes: 5 additions & 0 deletions tensorstore/driver/virtual_chunked/virtual_chunked.cc
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,11 @@ class VirtualChunkedDriver : public VirtualChunkedDriverBase {
return internal::GetChunkLayoutFromGrid(cache()->grid().components[0]) |
transform;
}

// Not applicable.
bool fill_missing_data_reads() const { return true; }

bool store_data_equal_to_fill_value() const { return true; }
};

Result<internal::TransformedDriverSpec> VirtualChunkedDriver::GetBoundSpec(
Expand Down
3 changes: 0 additions & 3 deletions tensorstore/driver/zarr/driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ internal::ChunkGridSpecification DataCache::GetChunkGridSpecification(
const auto& field = metadata.dtype.fields[field_i];
const auto& field_layout = metadata.chunk_layout.fields[field_i];
auto fill_value = metadata.fill_value[field_i];
const bool fill_value_specified = fill_value.valid();
if (!fill_value.valid()) {
// Use value-initialized rank-0 fill value.
fill_value = AllocateArray(span<const Index, 0>{}, c_order, value_init,
Expand All @@ -327,8 +326,6 @@ internal::ChunkGridSpecification DataCache::GetChunkGridSpecification(
ContiguousLayoutPermutation<>(span(
layout_order_buffer, cell_rank))},
std::move(cell_chunk_shape), chunked_to_cell_dimensions);
components.back().array_spec.store_if_equal_to_fill_value =
!fill_value_specified;
}
return internal::ChunkGridSpecification{std::move(components)};
}
Expand Down
9 changes: 9 additions & 0 deletions tensorstore/driver/zarr/driver_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ class ZarrDriver : public ZarrDriverBase {

Future<ArrayStorageStatistics> GetStorageStatistics(
GetStorageStatisticsRequest request) override;

// Used by `ChunkCacheReadWriteDriverMixin`.
bool store_data_equal_to_fill_value() const {
// If fill value was specified as `null`, always store explicitly-written
// chunks entirely equal to 0, since zarr-python fills with unspecified data
// in that case.
return this->fill_value_mode_.store_data_equal_to_fill_value ||
!metadata().fill_value[0].valid();
}
};

} // namespace internal_zarr
Expand Down
Loading

0 comments on commit f462942

Please sign in to comment.