Skip to content

Commit

Permalink
feat: sqlite-based transcript store (#3402)
Browse files Browse the repository at this point in the history
Change swing-store's `streamStore` implementation from a collection of flat files to a SQLite-backed database. Hopefully a workaround for #3405.

* feat: sqlite3 stream works in 1 case (WIP)

* feat: SQL based streamStore passes 1 complete test (cont.)

* feat: SQL bases streamStore: factor implementation out of tests

* feat: let caller override ambient sqlite3

* chore: harden() sqlStreamStore

 - refactor: hoist concise methods

* chore: integrate sqlStringStore into lmdbSwingStore

* feat: sqlStreamStore exclusive use checking

 - fix start / end position assertion
 - lint in verbose comment
 - never mind checking type of item in writeStreamItem;
   it causes a test to fail
 - resolve auto-commit issue

* chore: prune redundant test-stream-sql

* chore: lock packages

* rename DB file, enforce exclusive access, use path.join

explain auto-commit

* capitalize SQL statements

* remove dependency on n-readlines

Co-authored-by: Brian Warner <warner@lothar.com>
  • Loading branch information
dckc and warner authored Jun 24, 2021
1 parent c4b2d8e commit 960b013
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 217 deletions.
2 changes: 1 addition & 1 deletion packages/swing-store-lmdb/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
},
"dependencies": {
"@agoric/assert": "^0.3.2",
"n-readlines": "^1.0.0",
"better-sqlite3": "^7.4.1",
"node-lmdb": "^0.9.4"
},
"devDependencies": {
Expand Down
214 changes: 5 additions & 209 deletions packages/swing-store-lmdb/src/lmdbSwingStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@
import fs from 'fs';
import os from 'os';
import path from 'path';
import util from 'util';
import Readlines from 'n-readlines';

import lmdb from 'node-lmdb';
import sqlite3 from 'better-sqlite3';

import { assert, details as X, q } from '@agoric/assert';
import { assert } from '@agoric/assert';

const encoder = new util.TextEncoder();
import { sqlStreamStore } from './sqlStreamStore.js';

/**
* @typedef { import('@agoric/swing-store-simple').KVStore } KVStore
* @typedef { import('@agoric/swing-store-simple').StreamPosition } StreamPosition
* @typedef { import('./sqlStreamStore.js').StreamPosition } StreamPosition
* @typedef { import('@agoric/swing-store-simple').StreamStore } StreamStore
* @typedef { import('@agoric/swing-store-simple').SwingStore } SwingStore
*/
Expand Down Expand Up @@ -167,200 +166,7 @@ function makeLMDBSwingStore(dirPath, forceReset, options) {
delete: del,
};

/** @type {Set<number>} */
const activeStreamFds = new Set();
/** @type {Map<string, number>} */
const streamFds = new Map();
/** @type {Map<string, string>} */
const streamStatus = new Map();
let statusCounter = 0;

const STREAM_START = harden({ offset: 0, itemCount: 0 });

function insistStreamName(streamName) {
assert.typeof(streamName, 'string');
assert(
streamName.match(/^[-\w]+$/),
X`invalid stream name ${q(streamName)}`,
);
}

function insistStreamPosition(position) {
assert.typeof(position.itemCount, 'number');
assert(position.itemCount >= 0);
assert.typeof(position.offset, 'number');
assert(position.offset >= 0);
}

function closefd(fd) {
try {
fs.closeSync(fd);
} catch (e) {
// closing an already closed fd is OK, but any other errors are probably bad
if (e.code !== 'EBADF') {
throw e;
}
}
}

/**
* Close a stream that's open for read or write.
*
* @param {string} streamName The stream to close
*/
function closeStream(streamName) {
insistStreamName(streamName);
const fd = streamFds.get(streamName);
if (fd) {
closefd(fd);
streamFds.delete(streamName);
activeStreamFds.delete(fd);
streamStatus.delete(streamName);
}
}

/**
* Generator function that returns an iterator over the items in a stream.
*
* @param {string} streamName The stream to read
* @param {Object} startPosition The position to start reading from
* @param {Object} endPosition The position of the end of the stream
*
* @returns {Iterable<string>} an iterator for the items in the named stream
*/
function readStream(streamName, startPosition, endPosition) {
insistStreamName(streamName);
assert(
!streamStatus.get(streamName),
X`can't read stream ${q(streamName)} because it's already in use`,
);
insistStreamPosition(startPosition);
insistStreamPosition(endPosition);
assert(startPosition.itemCount <= endPosition.itemCount);

let itemCount = endPosition.itemCount;
if (endPosition.offset === 0) {
assert(itemCount === 0);
return [];
} else {
const filePath = `${dirPath}/streams/${streamName}.sss`;
fs.truncateSync(filePath, endPosition.offset);
const fd = fs.openSync(filePath, 'r');
streamFds.set(streamName, fd);
activeStreamFds.add(fd);

const readStatus = `read-${statusCounter}`;
statusCounter += 1;
streamStatus.set(streamName, readStatus);
// let startOffset = 0;
let skipCount = startPosition.itemCount;

// itemCount -= startPosition.itemCount;
// startOffset = startPosition.offset;

// We would like to be able to seek Readlines to a particular position
// in the file before it starts reading. Unfortunately, it is hardcoded
// to reset to 0 at the start and then manually walk itself through the
// file, ignoring whatever current position the fd is set to.
// Investigation has revealed that giving the Readlines constructor a
// 'position' option for where to start reading is a trivial (~4 lines
// of code) change, but that would cause us to diverge from the official
// npm version. There are even a couple of forks on NPM that do this,
// but they have like 2 downloads per week so I don't trust them. Until
// this is resolved, the only way to realize a different starting point
// than 0 is to simply ignore records that are read until we catch up to
// where we really want to start, which the following code does. It's
// not ideal, but it works.
//
// const innerReader = new Readlines(fd, { position: startOffset });
const innerReader = new Readlines(fd);
function* reader() {
try {
while (true) {
assert(
streamStatus.get(streamName) === readStatus,
X`can't read stream ${q(streamName)}, it's been closed`,
);
const line = /** @type {string|false} */ (innerReader.next());
// N.b.: since uncommitted writes may leave an overhang of data in
// the stream file, the itemCount is the true indicator of the end
// of the stream, not the point at which the line reader reaches the
// end-of-file.
if (line && itemCount > 0) {
itemCount -= 1;
const result = line.toString();
if (skipCount > 0) {
skipCount -= 1;
} else {
yield result;
}
} else {
closefd(fd);
break;
}
}
} catch (e) {
console.log(e);
} finally {
assert(
streamStatus.get(streamName) === readStatus,
X`can't read stream ${q(streamName)}, it's been closed`,
);
closeStream(streamName);
assert(itemCount === 0, X`leftover item count ${q(itemCount)}`);
}
}
return reader();
}
}

/**
* Write to a stream.
*
* @param {string} streamName The stream to be written
* @param {string} item The item to write
* @param {Object} position The position to write the item
*
* @returns {Object} the new position after writing
*/
function writeStreamItem(streamName, item, position) {
insistStreamName(streamName);
insistStreamPosition(position);

let fd = streamFds.get(streamName);
if (!fd) {
const filePath = `${dirPath}/streams/${streamName}.sss`;
const mode = fs.existsSync(filePath) ? 'r+' : 'w';
fd = fs.openSync(filePath, mode);
streamFds.set(streamName, fd);
streamStatus.set(streamName, 'write');
} else {
const status = streamStatus.get(streamName);
if (!status) {
streamStatus.set(streamName, 'write');
} else {
assert(
status === 'write',
X`can't write stream ${q(streamName)} because it's already in use`,
);
}
}
activeStreamFds.add(fd);

const buf = encoder.encode(`${item}\n`);
fs.writeSync(fd, buf, 0, buf.length, position.offset);
return harden({
offset: position.offset + buf.length,
itemCount: position.itemCount + 1,
});
}

const streamStore = harden({
readStream,
writeStreamItem,
closeStream,
STREAM_START,
});
const streamStore = sqlStreamStore(dirPath, { sqlite3 });

/**
* Commit unsaved changes.
Expand All @@ -370,10 +176,6 @@ function makeLMDBSwingStore(dirPath, forceReset, options) {
txn.commit();
txn = null;
}
for (const fd of activeStreamFds) {
fs.fsyncSync(fd);
}
activeStreamFds.clear();
}

/**
Expand All @@ -389,12 +191,6 @@ function makeLMDBSwingStore(dirPath, forceReset, options) {
dbi = null;
lmdbEnv.close();
lmdbEnv = null;

for (const fd of streamFds.values()) {
closefd(fd);
}
streamFds.clear();
activeStreamFds.clear();
}

return harden({ kvStore, streamStore, commit, close, diskUsage });
Expand Down
Loading

0 comments on commit 960b013

Please sign in to comment.