Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.1.0 - Timeouts rework, command method #167

Merged
merged 9 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .docker/clickhouse/single_node_tls/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM clickhouse/clickhouse-server:23.2-alpine
FROM clickhouse/clickhouse-server:23.5-alpine
COPY .docker/clickhouse/single_node_tls/certificates /etc/clickhouse-server/certs
RUN chown clickhouse:clickhouse -R /etc/clickhouse-server/certs \
&& chmod 600 /etc/clickhouse-server/certs/* \
Expand Down
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,34 @@
## 0.1.0

## Breaking changes

* Use `abort_controller` instead of `abort_signal` in `query` / `exec` / `insert` methods.
* `exec` method does not return the response stream by default. However, if needed, the response stream can be requested by `returnResponseStream` parameter.
```ts
// returns { query_id }
await client.exec({
query: `SELECT 1 FORMAT CSV`,
})

// returns { query_id }
await client.exec({
query: `SELECT 1 FORMAT CSV`,
returnResponseStream: false,
})

// returns { query_id, stream } - pre-0.1.0 behavior
// Important: if the response stream is requested, the user is expected to consume the stream
// otherwise, the request will be eventually timed out
await client.exec({
query: `SELECT 1 FORMAT CSV`,
returnResponseStream: true,
slvrtrn marked this conversation as resolved.
Show resolved Hide resolved
})
```

### Bug fixes

* Fixed delays on subsequent requests after calling `insert` / `exec` that happened due to unclosed stream instance when using low number of `max_open_connections`. See [#161](/~https://github.com/ClickHouse/clickhouse-js/issues/161) for more details.

## 0.0.16
* Fix NULL parameter binding.
As HTTP interface expects `\N` instead of `'NULL'` string, it is now correctly handled for both `null`
Expand Down
21 changes: 9 additions & 12 deletions __tests__/integration/abort_request.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { AbortController } from 'node-abort-controller'
import type { Row } from '../../src'
import { type ClickHouseClient, type ResponseJSON } from '../../src'
import { createTestClient, guid, makeObjectStream } from '../utils'
Expand All @@ -23,7 +22,7 @@ describe('abort request', () => {
const selectPromise = client.query({
query: 'SELECT sleep(3)',
format: 'CSV',
abort_signal: controller.signal as AbortSignal,
abort_controller: controller,
})
controller.abort()

Expand All @@ -39,7 +38,7 @@ describe('abort request', () => {
const selectPromise = client.query({
query: 'SELECT sleep(3)',
format: 'CSV',
abort_signal: controller.signal as AbortSignal,
abort_controller: controller,
})

setTimeout(() => {
Expand All @@ -59,7 +58,7 @@ describe('abort request', () => {
.query({
query: 'SELECT * from system.numbers',
format: 'JSONCompactEachRow',
abort_signal: controller.signal as AbortSignal,
slvrtrn marked this conversation as resolved.
Show resolved Hide resolved
abort_controller: controller,
})
.then(async (rows) => {
const stream = rows.stream()
Expand Down Expand Up @@ -118,7 +117,7 @@ describe('abort request', () => {
console.log(`Long running query: ${longRunningQuery}`)
void client.query({
query: longRunningQuery,
abort_signal: controller.signal as AbortSignal,
abort_controller: controller,
format: 'JSONCompactEachRow',
})

Expand Down Expand Up @@ -147,9 +146,9 @@ describe('abort request', () => {
.query({
query: `SELECT sleep(0.5), ${i} AS foo`,
format: 'JSONEachRow',
abort_signal:
abort_controller:
// we will cancel the request that should've yielded '3'
shouldAbort ? (controller.signal as AbortSignal) : undefined,
shouldAbort ? controller : undefined,
})
.then((r) => r.json<Res>())
.then((r) => results.push(r[0].foo))
Expand Down Expand Up @@ -183,7 +182,7 @@ describe('abort request', () => {
const insertPromise = client.insert({
table: tableName,
values: stream,
abort_signal: controller.signal as AbortSignal,
abort_controller: controller,
})
controller.abort()

Expand Down Expand Up @@ -216,7 +215,7 @@ describe('abort request', () => {
const insertPromise = client.insert({
table: tableName,
values: stream,
abort_signal: controller.signal as AbortSignal,
abort_controller: controller,
})

setTimeout(() => {
Expand Down Expand Up @@ -248,9 +247,7 @@ describe('abort request', () => {
values: stream,
format: 'JSONEachRow',
table: tableName,
abort_signal: shouldAbort(i)
? (controller.signal as AbortSignal)
: undefined,
abort_controller: shouldAbort(i) ? controller : undefined,
})
if (shouldAbort(i)) {
return insertPromise.catch(() => {
Expand Down
5 changes: 5 additions & 0 deletions __tests__/integration/exec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ describe('exec', () => {
val1: 10,
val2: 20,
},
returnResponseStream: true,
})
expect(await getAsText(result.stream)).toEqual('30\n')
})
Expand All @@ -88,20 +89,23 @@ describe('exec', () => {
it('should allow commands with semi in select clause', async () => {
const result = await client.exec({
query: `SELECT ';' FORMAT CSV`,
returnResponseStream: true,
})
expect(await getAsText(result.stream)).toEqual('";"\n')
})

it('should allow commands with trailing semi', async () => {
const result = await client.exec({
query: 'EXISTS system.databases;',
returnResponseStream: true,
})
expect(await getAsText(result.stream)).toEqual('1\n')
})

it('should allow commands with multiple trailing semi', async () => {
const result = await client.exec({
query: 'EXISTS system.foobar;;;;;;',
returnResponseStream: true,
})
expect(await getAsText(result.stream)).toEqual('0\n')
})
Expand Down Expand Up @@ -182,6 +186,7 @@ describe('exec', () => {
// ClickHouse responds to a command when it's completely finished
wait_end_of_query: 1,
},
returnResponseStream: true,
})
}
})
Expand Down
50 changes: 50 additions & 0 deletions __tests__/integration/exec_without_stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { createTestClient } from '../utils'
import type { ClickHouseClient } from '../../src/client'

/**
* See /~https://github.com/ClickHouse/clickhouse-js/issues/161 for more details
*
* This test makes sure that the consequent requests are not blocked by exec calls
* that do not include the response stream in the return type
*/
describe('exec without streaming', () => {
let client: ClickHouseClient
beforeEach(() => {
client = createTestClient({
max_open_connections: 1,
request_timeout: 10000,
})
})
afterEach(async () => {
await client.close()
})

it('should destroy the response stream immediately (parameter is omitted)', async () => {
const timeout = setTimeout(() => {
throw new Error('Timeout was triggered')
}, 3000).unref()
function exec() {
return client.exec({
query: `SELECT 1 FORMAT CSV`,
})
}
await exec()
await exec()
clearTimeout(timeout)
})

it('should destroy the response stream immediately (explicit false)', async () => {
const timeout = setTimeout(() => {
throw new Error('Timeout was triggered')
}, 3000).unref()
function exec() {
return client.exec({
query: `SELECT 1 FORMAT CSV`,
returnResponseStream: false,
})
}
await exec()
await exec()
clearTimeout(timeout)
})
})
3 changes: 3 additions & 0 deletions __tests__/unit/http_adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ describe('HttpAdapter', () => {

const execPromise1 = adapter.exec({
query: 'SELECT * FROM system.numbers LIMIT 5',
returnResponseStream: true,
})
const responseBody1 = 'foobar'
request1.emit(
Expand All @@ -357,6 +358,7 @@ describe('HttpAdapter', () => {
const request2 = stubRequest()
const execPromise2 = adapter.exec({
query: 'SELECT * FROM system.numbers LIMIT 5',
returnResponseStream: true,
})
const responseBody2 = 'qaz'
request2.emit(
Expand Down Expand Up @@ -391,6 +393,7 @@ describe('HttpAdapter', () => {
const execPromise = adapter.exec({
query: 'SELECT * FROM system.numbers LIMIT 5',
query_id,
returnResponseStream: true,
})
const responseBody = 'foobar'
request.emit(
Expand Down
2 changes: 1 addition & 1 deletion __tests__/utils/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export async function retryOnFailure<T>(
return await attempt()
}

function sleep(ms: number): Promise<void> {
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => {
setTimeout(resolve, ms).unref()
})
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '2.3'

services:
clickhouse1:
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.2-alpine}'
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.5-alpine}'
ulimits:
nofile:
soft: 262144
Expand All @@ -19,7 +19,7 @@ services:
- './.docker/clickhouse/users.xml:/etc/clickhouse-server/users.xml'

clickhouse2:
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.2-alpine}'
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.5-alpine}'
ulimits:
nofile:
soft: 262144
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3.8'
services:
clickhouse:
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.2-alpine}'
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.5-alpine}'
container_name: 'clickhouse-js-clickhouse-server'
ports:
- '8123:8123'
Expand Down
4 changes: 2 additions & 2 deletions examples/abort_request.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { createClient } from '@clickhouse/client'
import { AbortController } from 'node-abort-controller'

void (async () => {
const client = createClient()
const controller = new AbortController()
const selectPromise = client
.query({
query: 'SELECT sleep(3)',
format: 'CSV',
abort_signal: controller.signal as AbortSignal,
abort_controller: controller,
})
.catch((e) => {
console.info('Select was aborted')
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
"dist"
],
"dependencies": {
"node-abort-controller": "^3.0.1",
"uuid": "^9.0.0"
},
"devDependencies": {
Expand Down
34 changes: 26 additions & 8 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import Stream from 'stream'
import type { InsertResult, QueryResult, TLSParams } from './connection'
import type {
ExecResult,
InsertResult,
QueryResult,
TLSParams,
} from './connection'
import { type Connection, createConnection } from './connection'
import type { Logger } from './logger'
import { DefaultLogger, LogWriter } from './logger'
Expand All @@ -16,8 +21,6 @@ import type { InputJSON, InputJSONObjectEachRow } from './clickhouse_types'
export interface ClickHouseClientConfigOptions {
/** A ClickHouse instance URL. Default value: `http://localhost:8123`. */
host?: string
/** The timeout to set up a connection in milliseconds. Default value: `10_000`. */
connect_timeout?: number
slvrtrn marked this conversation as resolved.
Show resolved Hide resolved
/** The request timeout in milliseconds. Default value: `30_000`. */
request_timeout?: number
/** Maximum number of sockets to allow per host. Default value: `Infinity`. */
Expand Down Expand Up @@ -62,8 +65,8 @@ export interface BaseParams {
clickhouse_settings?: ClickHouseSettings
/** Parameters for query binding. https://clickhouse.com/docs/en/interfaces/http/#cli-queries-with-parameters */
query_params?: Record<string, unknown>
/** AbortSignal instance (using `node-abort-controller` package) to cancel a request in progress. */
abort_signal?: AbortSignal
/** AbortController instance to cancel a request in progress. */
abort_controller?: AbortController
/** A specific `query_id` that will be sent with this request.
* If it is not set, a random identifier will be generated automatically by the client. */
query_id?: string
Expand All @@ -79,6 +82,12 @@ export interface QueryParams extends BaseParams {
export interface ExecParams extends BaseParams {
/** Statement to execute. */
query: string
/** If set to true, the response stream instance will be included
* in the return type, and the user will be expected to consume it.
* If set to false, the response stream is not included
* in the return type and destroyed immediately.
* Default: false */
returnResponseStream?: boolean
}

type InsertValues<T> =
Expand Down Expand Up @@ -131,7 +140,6 @@ function normalizeConfig(config: ClickHouseClientConfigOptions) {
return {
application_id: config.application,
url: createUrl(config.host ?? 'http://localhost:8123'),
connect_timeout: config.connect_timeout ?? 10_000,
request_timeout: config.request_timeout ?? 300_000,
max_open_connections: config.max_open_connections ?? Infinity,
tls,
Expand Down Expand Up @@ -173,7 +181,7 @@ export class ClickHouseClient {
...params.clickhouse_settings,
},
query_params: params.query_params,
abort_signal: params.abort_signal,
abort_controller: params.abort_controller,
session_id: this.config.session_id,
query_id: params.query_id,
}
Expand All @@ -189,10 +197,20 @@ export class ClickHouseClient {
return new ResultSet(stream, format, query_id)
}

async exec(params: ExecParams): Promise<QueryResult> {
async exec(
params: Omit<ExecParams, 'returnResponseStream'>
): Promise<ExecResult>
async exec(
params: ExecParams & { returnResponseStream: true }
): Promise<QueryResult>
async exec(
params: ExecParams & { returnResponseStream: false }
): Promise<ExecResult>
async exec(params: ExecParams): Promise<QueryResult | ExecResult> {
const query = removeTrailingSemi(params.query.trim())
return await this.connection.exec({
query,
returnResponseStream: params.returnResponseStream,
...this.getBaseParams(params),
})
}
Expand Down
Loading