-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[fs] Expose an experimental
fs
module with an open
function and a…
… `File` abstraction (1/3) (#3165) Co-authored-by: Ivan <2103732+codebien@users.noreply.github.com> Co-authored-by: Mihail Stoykov <312246+mstoykov@users.noreply.github.com>
- Loading branch information
1 parent
348aadd
commit 24ae4d9
Showing
11 changed files
with
1,849 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Bonjour, tout le monde! |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
import { open, SeekMode } from "k6/experimental/fs"; | ||
|
||
export const options = { | ||
vus: 100, | ||
iterations: 1000, | ||
}; | ||
|
||
// k6 doesn't support async in the init context. We use a top-level async function for `await`. | ||
// | ||
// Each Virtual User gets its own `file` copy. | ||
// So, operations like `seek` or `read` won't impact other VUs. | ||
let file; | ||
(async function () { | ||
file = await open("bonjour.txt"); | ||
})(); | ||
|
||
export default async function () { | ||
// About information about the file | ||
const fileinfo = await file.stat(); | ||
if (fileinfo.name != "bonjour.txt") { | ||
throw new Error("Unexpected file name"); | ||
} | ||
|
||
const buffer = new Uint8Array(4); | ||
|
||
let totalBytesRead = 0; | ||
while (true) { | ||
// Read into the buffer | ||
const bytesRead = await file.read(buffer); | ||
if (bytesRead == null) { | ||
// EOF | ||
break; | ||
} | ||
|
||
// Do something useful with the content of the buffer | ||
totalBytesRead += bytesRead; | ||
|
||
// If bytesRead is less than the buffer size, we've read the whole file | ||
if (bytesRead < buffer.byteLength) { | ||
break; | ||
} | ||
} | ||
|
||
// Check that we read the expected number of bytes | ||
if (totalBytesRead != fileinfo.size) { | ||
throw new Error("Unexpected number of bytes read"); | ||
} | ||
|
||
// Seek back to the beginning of the file | ||
await file.seek(0, SeekMode.Start); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package fs | ||
|
||
import ( | ||
"fmt" | ||
"io" | ||
"path/filepath" | ||
"sync" | ||
|
||
"go.k6.io/k6/lib/fsext" | ||
) | ||
|
||
// cache is a cache of opened files, designed to minimize redundant file reads, and | ||
// avoid replicating the content of the files in memory as much as possible. | ||
// | ||
// Unlike the underlying [fsext.Fs] which also caches file contents, this cache minimizes | ||
// synchronization overhead. [fsext.Fs], using `afero`, employs a [sync.RWMutex] for each | ||
// file access, involving lock/unlock operations. Our cache, however, utilizes a concurrent-safe | ||
// map (openedFiles), bypassing the need for these locks and enhancing performance. | ||
// | ||
// This cache could be seen as redundant, as the underlying [fsext.Fs] implementation | ||
// already caches the content of the files it opens. However, the current implementation of | ||
// [fsext.Fs] relies on `afero` under the hood, which in turn relies on a [sync.RWMutex] to | ||
// protect access to the cached file content. This means that every time a file is opened, | ||
// the `fsext.Fs` cache is accessed, and the [sync.RWMutex] is locked and unlocked. | ||
// | ||
// This cache is designed to avoid this synchronization overhead, by caching the content of | ||
// the files in a map that is safe for concurrent use, and thus avoid the need for a lock. | ||
// | ||
// This leads to a performance improvement, at the cost of holding the content of the files | ||
// in memory twice, once in the cache's `openedFiles` map, and once in the `fsext.Fs` cache. | ||
// | ||
// Note that the current implementation of the cache diverges from the guarantees expressed in the | ||
// [design document] defining the `fs` module, as it we effectively hold the file's content in memory | ||
// twice as opposed to once. | ||
// | ||
// Future updates (see [#1079](/~https://github.com/grafana/k6/issues/1079)) may phase out reliance on `afero`. | ||
// Depending on our new choice for [fsext] implementation, this cache might become obsolete, allowing us | ||
// to solely depend on [fsext.Fs.Open]. | ||
// | ||
// [#1079]: /~https://github.com/grafana/k6/issues/1079 | ||
type cache struct { | ||
// openedFiles holds a safe for concurrent use map, holding the content | ||
// of the files that were opened by the user. | ||
// | ||
// Keys are expected to be strings holding the openedFiles' path. | ||
// Values are expected to be byte slices holding the content of the opened file. | ||
// | ||
// That way, we can cache the file's content and avoid opening too many | ||
// file descriptor, and re-reading its content every time the file is opened. | ||
// | ||
// Importantly, this also means that if the | ||
// file is modified from outside of k6, the changes will not be reflected in the file's data. | ||
openedFiles sync.Map | ||
} | ||
|
||
// open retrieves the content of a given file from the specified filesystem (fromFs) and | ||
// stores it in the cache's internal `openedFiles` map. | ||
// | ||
// The function cleans the provided filename using filepath.Clean before using it. | ||
// | ||
// If the file was previously "opened" (and thus cached), it | ||
// returns the cached content. Otherwise, it reads the file from the | ||
// filesystem, caches its content, and then returns it. | ||
// | ||
// The function is designed to minimize redundant file reads by leveraging an internal cache (openedFiles). | ||
// In case the cached value is not a byte slice (which should never occur in regular use), it | ||
// panics with a descriptive error. | ||
// | ||
// Parameters: | ||
// - filename: The name of the file to be retrieved. This should be a relative or absolute path. | ||
// - fromFs: The filesystem (from the fsext package) from which the file should be read if not already cached. | ||
// | ||
// Returns: | ||
// - A byte slice containing the content of the specified file. | ||
// - An error if there's any issue opening or reading the file. If the file content is | ||
// successfully cached and returned once, subsequent calls will not produce | ||
// file-related errors for the same file, as the cached value will be used. | ||
func (fr *cache) open(filename string, fromFs fsext.Fs) (data []byte, err error) { | ||
filename = filepath.Clean(filename) | ||
|
||
if f, ok := fr.openedFiles.Load(filename); ok { | ||
data, ok = f.([]byte) | ||
if !ok { | ||
panic(fmt.Errorf("cache's file %s is not stored as a byte slice", filename)) | ||
} | ||
|
||
return data, nil | ||
} | ||
|
||
// TODO: re-evaluate opening from the FS this once #1079 is resolved. | ||
f, err := fromFs.Open(filename) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer func() { | ||
cerr := f.Close() | ||
if cerr != nil { | ||
err = fmt.Errorf("failed to close file %s: %w", filename, cerr) | ||
} | ||
}() | ||
|
||
data, err = io.ReadAll(f) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to read the content of file %s: %w", filename, err) | ||
} | ||
|
||
fr.openedFiles.Store(filename, data) | ||
|
||
return data, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package fs | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"go.k6.io/k6/lib/fsext" | ||
) | ||
|
||
func TestFileCacheOpen(t *testing.T) { | ||
t.Parallel() | ||
|
||
t.Run("open succeeds", func(t *testing.T) { | ||
t.Parallel() | ||
|
||
cache := &cache{} | ||
fs := newTestFs(t, func(fs fsext.Fs) error { | ||
return fsext.WriteFile(fs, "bonjour.txt", []byte("Bonjour, le monde"), 0o644) | ||
}) | ||
|
||
_, gotBeforeOk := cache.openedFiles.Load("bonjour.txt") | ||
gotData, gotErr := cache.open("bonjour.txt", fs) | ||
_, gotAfterOk := cache.openedFiles.Load("bonjour.txt") | ||
|
||
assert.False(t, gotBeforeOk) | ||
assert.NoError(t, gotErr) | ||
assert.Equal(t, []byte("Bonjour, le monde"), gotData) | ||
assert.True(t, gotAfterOk) | ||
}) | ||
|
||
t.Run("double open succeeds", func(t *testing.T) { | ||
t.Parallel() | ||
|
||
cache := &cache{} | ||
fs := newTestFs(t, func(fs fsext.Fs) error { | ||
return fsext.WriteFile(fs, "bonjour.txt", []byte("Bonjour, le monde"), 0o644) | ||
}) | ||
|
||
firstData, firstErr := cache.open("bonjour.txt", fs) | ||
_, gotFirstOk := cache.openedFiles.Load("bonjour.txt") | ||
secondData, secondErr := cache.open("bonjour.txt", fs) | ||
_, gotSecondOk := cache.openedFiles.Load("bonjour.txt") | ||
|
||
assert.True(t, gotFirstOk) | ||
assert.NoError(t, firstErr) | ||
assert.Equal(t, []byte("Bonjour, le monde"), firstData) | ||
assert.True(t, gotSecondOk) | ||
assert.NoError(t, secondErr) | ||
assert.True(t, sameUnderlyingArray(firstData, secondData)) | ||
assert.Equal(t, []byte("Bonjour, le monde"), secondData) | ||
}) | ||
} | ||
|
||
// sameUnderlyingArray returns true if the underlying array of lhs and rhs are the same. | ||
// | ||
// This is done by checking that the two slices have a capacity greater than 0 and that | ||
// the last element of the underlying array is the same for both slices. | ||
// | ||
// Once a slice is created, its starting address can move forward, but can never move | ||
// behond its starting address + its capacity, which is a fixed value for any Go slice. | ||
// | ||
// Hence, if the last element of the underlying array is the same for both slices, it | ||
// means that the underlying array is the same. | ||
// | ||
// See [explanation] for more details. | ||
// | ||
// [explanation]: https://groups.google.com/g/golang-nuts/c/ks1jvoyMYuc?pli=1 | ||
func sameUnderlyingArray(lhs, rhs []byte) bool { | ||
return cap(lhs) > 0 && cap(rhs) > 0 && &lhs[0:cap(lhs)][cap(lhs)-1] == &rhs[0:cap(rhs)][cap(rhs)-1] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
package fs | ||
|
||
// newFsError creates a new Error object of the provided kind and with the | ||
// provided message. | ||
func newFsError(k errorKind, message string) *fsError { | ||
return &fsError{ | ||
Name: k.String(), | ||
Message: message, | ||
kind: k, | ||
} | ||
} | ||
|
||
// errorKind indicates the kind of file system error that has occurred. | ||
// | ||
// Its string representation is generated by the `enumer` tool. The | ||
// `enumer` tool is run by the `go generate` command. See the `go generate` | ||
// command documentation. | ||
// The tool itself is not tracked as part of the k6 go.mod file, and | ||
// therefore must be installed manually using `go install github.com/dmarkham/enumer`. | ||
// | ||
//go:generate enumer -type=errorKind -output errors_gen.go | ||
type errorKind uint8 | ||
|
||
const ( | ||
// NotFoundError is emitted when a file is not found. | ||
NotFoundError errorKind = iota + 1 | ||
|
||
// InvalidResourceError is emitted when a resource is invalid: for | ||
// instance when attempting to open a directory, which is not supported. | ||
InvalidResourceError | ||
|
||
// ForbiddenError is emitted when an operation is forbidden. | ||
ForbiddenError | ||
|
||
// TypeError is emitted when an incorrect type has been used. | ||
TypeError | ||
|
||
// EOFError is emitted when the end of a file has been reached. | ||
EOFError | ||
) | ||
|
||
// fsError represents a custom error object emitted by the fs module. | ||
// | ||
// It is used to provide a more detailed error message to the user, and | ||
// provide a concrete error type that can be used to differentiate between | ||
// different types of errors. | ||
// | ||
// Exposing error types to the user in a way that's compatible with some | ||
// JavaScript error handling constructs such as `instanceof` is still non-trivial | ||
// in Go. See the [dedicated goja issue] with have opened for more details. | ||
// | ||
// [dedicated goja issue]: /~https://github.com/dop251/goja/issues/529 | ||
type fsError struct { | ||
// Name contains the name of the error as formalized by the [ErrorKind] | ||
// type. | ||
Name string `json:"name"` | ||
|
||
// Message contains the error message as presented to the user. | ||
Message string `json:"message"` | ||
|
||
// kind contains the kind of error that has occurred. | ||
kind errorKind | ||
} | ||
|
||
// Ensure that the Error type implements the Go `error` interface. | ||
var _ error = (*fsError)(nil) | ||
|
||
// Error implements the Go `error` interface. | ||
func (e *fsError) Error() string { | ||
return e.Name + ": " + e.Message | ||
} |
Oops, something went wrong.