From c1a8305f0687bd8a7957078855230e66a21ddb4e Mon Sep 17 00:00:00 2001 From: Silviu Caragea Date: Fri, 22 Mar 2019 22:55:37 +0200 Subject: [PATCH 1/3] Fix critical memory issue --- CHANGELOG.md | 4 ++++ c_src/nif_cass_session.cc | 2 +- src/erlcass.app.src | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 636dd81..0c21762 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ### Changelog: +##### v3.2.4 + +- Fix a critical memory issue discovered by running the VM in debug mode. + ##### v3.2.3 - Add Trevis CI (Thanks to Gonçalo Tomás) diff --git a/c_src/nif_cass_session.cc b/c_src/nif_cass_session.cc index 1158355..8215a43 100644 --- a/c_src/nif_cass_session.cc +++ b/c_src/nif_cass_session.cc @@ -275,7 +275,7 @@ ERL_NIF_TERM nif_cass_session_prepare(ErlNifEnv* env, int argc, const ERL_NIF_TE if(!enif_is_identical(ATOMS.atomOk, parse_result)) return parse_result; - callback_statement_info* callback = static_cast(enif_alloc(sizeof(callback_info))); + callback_statement_info* callback = static_cast(enif_alloc(sizeof(callback_statement_info))); callback->pid = pid; callback->prepared_res = data->resCassPrepared; callback->env = enif_alloc_env(); diff --git a/src/erlcass.app.src b/src/erlcass.app.src index fbdf664..1f1d042 100644 --- a/src/erlcass.app.src +++ b/src/erlcass.app.src @@ -3,7 +3,7 @@ {maintainers, ["Silviu Caragea"]}, {licenses, ["MIT"]}, {links,[{"Github","/~https://github.com/silviucpp/erlcass"}]}, - {vsn, "3.2.3"}, + {vsn, "3.2.4"}, {registered, []}, {applications, [kernel, stdlib, lager]}, {mod, {erlcass_app, []}}, From ccb4b7e1cab74606b3ce917fd29679f32a1e0843 Mon Sep 17 00:00:00 2001 From: Silviu Caragea Date: Sat, 23 Mar 2019 08:30:17 +0200 Subject: [PATCH 2/3] Update lager --- rebar.config | 2 +- rebar.config.script | 2 +- rebar.lock | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rebar.config b/rebar.config index bafc91f..f9b60d1 100644 --- a/rebar.config +++ b/rebar.config @@ -4,7 +4,7 @@ {plugins, [rebar3_hex]}. {deps, [ - {lager, "~>3.6"} + {lager, "3.6.9"} ]}. {erl_opts, [ diff --git a/rebar.config.script b/rebar.config.script index b25cd58..089b94d 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -1,7 +1,7 @@ IsRebar3 = erlang:function_exported(rebar3, main, 1), Rebar2Deps0 = [ - {lager, ".*", {git, "/~https://github.com/erlang-lager/lager.git", {tag, "3.6.1"}}} + {lager, ".*", {git, "/~https://github.com/erlang-lager/lager.git", {tag, "3.6.9"}}} ], Rebar2Deps = case os:getenv("TEST") of diff --git a/rebar.lock b/rebar.lock index 4a9b55c..d40eee7 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,8 +1,8 @@ {"1.1.0", [{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, - {<<"lager">>,{pkg,<<"lager">>,<<"3.6.7">>},0}]}. + {<<"lager">>,{pkg,<<"lager">>,<<"3.6.9">>},0}]}. [ {pkg_hash,[ {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, - {<<"lager">>, <<"2FBF823944CAA0FC10DF5EC13F3F047524A249BB32F0D801B7900C9610264286">>}]} + {<<"lager">>, <<"387BCD836DC0C8AD9C6D90A0E0CE5B29676847950CBC527BCCC194A02028DE8E">>}]} ]. From 464a98b0b07dbd46cf43814cb4f8ff3957687434 Mon Sep 17 00:00:00 2001 From: Cibin George Date: Fri, 25 Jan 2019 08:01:32 -0500 Subject: [PATCH 3/3] Multiple clusters capability. The api interface has changed because all apis now need an atom identifying the connection. --- benchmarks/benchmark.config | 30 +++-- c_src/erlcass.cc | 14 +- c_src/erlcass.h | 2 +- c_src/nif_cass_cluster.cc | 118 ++++++++++------- c_src/nif_cass_cluster.h | 4 + c_src/nif_cass_session.cc | 21 +-- src/erlcass.erl | 233 ++++++++++++++++++---------------- src/erlcass_app.erl | 16 +-- src/erlcass_cluster.erl | 14 +- src/erlcass_nif.erl | 14 +- src/erlcass_stm_cache.erl | 35 +++-- src/erlcass_stm_sessions.erl | 29 +++-- src/erlcass_sup.erl | 13 +- src/erlcass_utils.erl | 19 ++- test/integrity_test_SUITE.erl | 144 ++++++++++----------- 15 files changed, 395 insertions(+), 311 deletions(-) diff --git a/benchmarks/benchmark.config b/benchmarks/benchmark.config index 0c9efc3..5af44bd 100644 --- a/benchmarks/benchmark.config +++ b/benchmarks/benchmark.config @@ -13,20 +13,22 @@ ]}, {erlcass, [ - {keyspace, <<"load_test_erlcass">>}, - {cluster_options,[ - {contact_points, <<"172.17.3.129">>}, - {latency_aware_routing, true}, - {token_aware_routing, true}, - {number_threads_io, 8}, - {queue_size_io, 128000}, - {core_connections_host, 5}, - {max_connections_host, 5}, - {tcp_nodelay, true}, - {tcp_keepalive, {true, 60}}, - {connect_timeout, 5000}, - {request_timeout, 20000}, - {retry_policy, {default, true}} + {clusters, [ + {erlcass_conn, [ + {keyspace, <<"load_test_erlcass">>}, + {contact_points, <<"127.0.0.1">>}, + {latency_aware_routing, true}, + {token_aware_routing, true}, + {number_threads_io, 8}, + {queue_size_io, 128000}, + {core_connections_host, 5}, + {max_connections_host, 5}, + {tcp_nodelay, true}, + {tcp_keepalive, {true, 60}}, + {connect_timeout, 5000}, + {request_timeout, 20000}, + {retry_policy, {default, true}} + ]} ]} ]} ]. diff --git a/c_src/erlcass.cc b/c_src/erlcass.cc index 0e82241..7da9f9e 100755 --- a/c_src/erlcass.cc +++ b/c_src/erlcass.cc @@ -13,6 +13,8 @@ atoms ATOMS; void open_resources(ErlNifEnv* env, cassandra_data* data) { ErlNifResourceFlags flags = static_cast(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); + + data->resCassCluster = enif_open_resource_type(env, NULL, "enif_cass_cluster", nif_cass_cluster_free, flags, NULL); data->resCassSession = enif_open_resource_type(env, NULL, "enif_cass_session", nif_cass_session_free, flags, NULL); data->resCassPrepared = enif_open_resource_type(env, NULL, "enif_cass_prepared", nif_cass_prepared_free, flags, NULL); data->resCassStatement = enif_open_resource_type(env, NULL, "enif_cass_statement", nif_cass_statement_free, flags, NULL); @@ -120,7 +122,6 @@ int on_nif_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) ATOMS.atomClusterSettingRetryPolicyFallthrough = make_atom(env, erlcass::kAtomClusterSettingRetryPolicyFallthrough); cassandra_data* data = static_cast(enif_alloc(sizeof(cassandra_data))); - data->cluster = NULL; data->uuid_gen = cass_uuid_gen_new(); data->defaultConsistencyLevel = CASS_CONSISTENCY_LOCAL_QUORUM; @@ -134,9 +135,6 @@ void on_nif_unload(ErlNifEnv* env, void* priv_data) { cassandra_data* data = static_cast(priv_data); - if(data->cluster) - cass_cluster_free(data->cluster); - if(data->uuid_gen) cass_uuid_gen_free(data->uuid_gen); @@ -148,12 +146,10 @@ int on_nif_upgrade(ErlNifEnv* env, void** priv, void** old_priv, ERL_NIF_TERM in cassandra_data* old_data = static_cast(*old_priv); cassandra_data* data = static_cast(enif_alloc(sizeof(cassandra_data))); - data->cluster = old_data->cluster; data->uuid_gen = old_data->uuid_gen; data->defaultConsistencyLevel = old_data->defaultConsistencyLevel; open_resources(env, data); - old_data->cluster = NULL; old_data->uuid_gen = NULL; *priv = data; @@ -165,16 +161,16 @@ static ErlNifFunc nif_funcs[] = //CassCluster {"cass_cluster_create", 0, nif_cass_cluster_create}, - {"cass_cluster_release", 0, nif_cass_cluster_release}, + {"cass_cluster_release", 1, nif_cass_cluster_release}, {"cass_log_set_callback", 1, nif_cass_log_set_callback}, {"cass_log_set_level", 1, nif_cass_log_set_level}, - {"cass_cluster_set_options", 1, nif_cass_cluster_set_options}, + {"cass_cluster_set_options", 2, nif_cass_cluster_set_options}, //CassSession {"cass_session_new", 0, nif_cass_session_new}, - {"cass_session_connect", 2, nif_cass_session_connect}, {"cass_session_connect", 3, nif_cass_session_connect}, + {"cass_session_connect", 4, nif_cass_session_connect}, {"cass_session_close", 2, nif_cass_session_close}, {"cass_session_prepare", 4, nif_cass_session_prepare}, diff --git a/c_src/erlcass.h b/c_src/erlcass.h index d10a1b5..9d8f4d8 100644 --- a/c_src/erlcass.h +++ b/c_src/erlcass.h @@ -108,11 +108,11 @@ struct atoms struct cassandra_data { - CassCluster* cluster; CassUuidGen* uuid_gen; ErlNifPid log_pid; CassConsistency defaultConsistencyLevel; + ErlNifResourceType* resCassCluster; ErlNifResourceType* resCassSession; ErlNifResourceType* resCassPrepared; ErlNifResourceType* resCassStatement; diff --git a/c_src/nif_cass_cluster.cc b/c_src/nif_cass_cluster.cc index 606b66c..4583483 100644 --- a/c_src/nif_cass_cluster.cc +++ b/c_src/nif_cass_cluster.cc @@ -8,6 +8,10 @@ #include //CassCluster +struct enif_cass_cluster +{ + CassCluster* cluster; +}; #define STRING_SETTING(Key, Func) \ if(enif_is_identical(term_key, Key)) \ @@ -15,7 +19,7 @@ ErlNifBinary value; \ if(!get_bstring(env, term_value, &value)) \ return make_bad_options(env, term_option); \ - return cass_error_to_nif_term(env, Func(data->cluster, BIN_TO_STR(value.data), value.size)); \ + return cass_error_to_nif_term(env, Func(cluster, BIN_TO_STR(value.data), value.size)); \ } #define INT_SETTING(Key, Func) \ @@ -24,7 +28,7 @@ int value; \ if(!enif_get_int(env, term_value, &value)) \ return make_bad_options(env, term_option); \ - return cass_error_to_nif_term(env, Func(data->cluster, value)); \ + return cass_error_to_nif_term(env, Func(cluster, value)); \ } #define UNSIGNED_INT_SETTING(Key, Func) \ @@ -33,12 +37,23 @@ unsigned int value; \ if(!enif_get_uint(env, term_value, &value)) \ return make_bad_options(env, term_option); \ - return cass_error_to_nif_term(env, Func(data->cluster, value)); \ + return cass_error_to_nif_term(env, Func(cluster, value)); \ } #define CUSTOM_SETTING(Key, Func) \ if(enif_is_identical(term_key, Key)) \ - return Func(env, term_option, term_value, data); + return Func(env, cluster, term_option, term_value, data); + + +CassCluster* get_cass_cluster(ErlNifEnv* env, ErlNifResourceType* resource_type, const ERL_NIF_TERM arg) +{ + enif_cass_cluster * enif_cluster = NULL; + + if(!enif_get_resource(env, arg, resource_type, (void**) &enif_cluster)) + return NULL; + + return enif_cluster->cluster; +} CassError internal_cass_cluster_set_reconnect_wait_time(CassCluster* cluster, unsigned wait_time) @@ -71,35 +86,35 @@ CassError internal_cass_cluster_set_connection_idle_timeout(CassCluster* cluster return CASS_OK; } -ERL_NIF_TERM internal_cass_cluster_set_token_aware_routing(ErlNifEnv* env, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) +ERL_NIF_TERM internal_cass_cluster_set_token_aware_routing(ErlNifEnv* env, CassCluster* cluster, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) { cass_bool_t token_aware_routing; if(!get_boolean(term_value, &token_aware_routing)) return make_bad_options(env, term_option); - cass_cluster_set_token_aware_routing(data->cluster, token_aware_routing); + cass_cluster_set_token_aware_routing(cluster, token_aware_routing); return ATOMS.atomOk; } -ERL_NIF_TERM internal_cass_cluster_set_tcp_nodelay(ErlNifEnv* env, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) +ERL_NIF_TERM internal_cass_cluster_set_tcp_nodelay(ErlNifEnv* env, CassCluster* cluster, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) { cass_bool_t nodelay; if(!get_boolean(term_value, &nodelay)) return make_bad_options(env, term_option); - cass_cluster_set_tcp_nodelay(data->cluster, nodelay); + cass_cluster_set_tcp_nodelay(cluster, nodelay); return ATOMS.atomOk; } -ERL_NIF_TERM internal_cass_cluster_set_load_balance_round_robin(ErlNifEnv* env, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) +ERL_NIF_TERM internal_cass_cluster_set_load_balance_round_robin(ErlNifEnv* env, CassCluster* cluster, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) { - cass_cluster_set_load_balance_round_robin(data->cluster); + cass_cluster_set_load_balance_round_robin(cluster); return ATOMS.atomOk; } -ERL_NIF_TERM internal_cass_cluster_set_credentials(ErlNifEnv* env, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) +ERL_NIF_TERM internal_cass_cluster_set_credentials(ErlNifEnv* env, CassCluster* cluster, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) { const ERL_NIF_TERM *items; int arity; @@ -113,11 +128,11 @@ ERL_NIF_TERM internal_cass_cluster_set_credentials(ErlNifEnv* env, ERL_NIF_TERM if(!get_bstring(env, items[0], &username) || !get_bstring(env, items[1], &pwd)) return make_bad_options(env, term_option); - cass_cluster_set_credentials_n(data->cluster, BIN_TO_STR(username.data), username.size, BIN_TO_STR(pwd.data), pwd.size); + cass_cluster_set_credentials_n(cluster, BIN_TO_STR(username.data), username.size, BIN_TO_STR(pwd.data), pwd.size); return ATOMS.atomOk; } -ERL_NIF_TERM internal_cass_cluster_set_load_balance_dc_aware(ErlNifEnv* env, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) +ERL_NIF_TERM internal_cass_cluster_set_load_balance_dc_aware(ErlNifEnv* env, CassCluster* cluster, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) { const ERL_NIF_TERM *items; int arity; @@ -136,14 +151,14 @@ ERL_NIF_TERM internal_cass_cluster_set_load_balance_dc_aware(ErlNifEnv* env, ERL if(!get_boolean(items[2], &allow_remote_dcs_for_local_cl)) return make_bad_options(env, term_option); - return cass_error_to_nif_term(env, cass_cluster_set_load_balance_dc_aware_n(data->cluster, + return cass_error_to_nif_term(env, cass_cluster_set_load_balance_dc_aware_n(cluster, BIN_TO_STR(local_dc.data), local_dc.size, used_hosts_per_remote_dc, allow_remote_dcs_for_local_cl)); } -ERL_NIF_TERM internal_cass_cluster_set_tcp_keepalive(ErlNifEnv* env, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) +ERL_NIF_TERM internal_cass_cluster_set_tcp_keepalive(ErlNifEnv* env, CassCluster* cluster, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) { const ERL_NIF_TERM *items; int arity; @@ -160,11 +175,11 @@ ERL_NIF_TERM internal_cass_cluster_set_tcp_keepalive(ErlNifEnv* env, ERL_NIF_TER if(!enif_get_uint(env, items[1], &delay_sec)) return make_bad_options(env, term_option); - cass_cluster_set_tcp_keepalive(data->cluster, tcp_keepalive, delay_sec); + cass_cluster_set_tcp_keepalive(cluster, tcp_keepalive, delay_sec); return ATOMS.atomOk; } -ERL_NIF_TERM internal_cluster_set_default_consistency_level(ErlNifEnv* env, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) +ERL_NIF_TERM internal_cluster_set_default_consistency_level(ErlNifEnv* env, CassCluster* cluster, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) { int level; @@ -175,7 +190,7 @@ ERL_NIF_TERM internal_cluster_set_default_consistency_level(ErlNifEnv* env, ERL_ return ATOMS.atomOk; } -ERL_NIF_TERM internal_cluster_set_retry_policy(ErlNifEnv* env, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) +ERL_NIF_TERM internal_cluster_set_retry_policy(ErlNifEnv* env, CassCluster* cluster, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) { cass_bool_t log_enable = cass_false; ERL_NIF_TERM rp = term_value; @@ -209,17 +224,17 @@ ERL_NIF_TERM internal_cluster_set_retry_policy(ErlNifEnv* env, ERL_NIF_TERM term if(log_enable) { retry_policy_log.reset(cass_retry_policy_logging_new(retry_policy.get())); - cass_cluster_set_retry_policy(data->cluster, retry_policy_log.get()); + cass_cluster_set_retry_policy(cluster, retry_policy_log.get()); } else { - cass_cluster_set_retry_policy(data->cluster, retry_policy.get()); + cass_cluster_set_retry_policy(cluster, retry_policy.get()); } return ATOMS.atomOk; } -ERL_NIF_TERM internal_cass_cluster_set_ssl(ErlNifEnv* env, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) +ERL_NIF_TERM internal_cass_cluster_set_ssl(ErlNifEnv* env, CassCluster* cluster, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) { if(!enif_is_list(env, term_value)) return make_bad_options(env, term_option); @@ -302,12 +317,12 @@ ERL_NIF_TERM internal_cass_cluster_set_ssl(ErlNifEnv* env, ERL_NIF_TERM term_opt } } - cass_cluster_set_ssl(data->cluster, ssl.get()); + cass_cluster_set_ssl(cluster, ssl.get()); return ATOMS.atomOk; } -ERL_NIF_TERM internal_cluster_set_latency_aware_routing(ErlNifEnv* env, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) +ERL_NIF_TERM internal_cluster_set_latency_aware_routing(ErlNifEnv* env, CassCluster* cluster, ERL_NIF_TERM term_option, ERL_NIF_TERM term_value, cassandra_data* data) { if(enif_is_atom(env, term_value)) { @@ -317,7 +332,7 @@ ERL_NIF_TERM internal_cluster_set_latency_aware_routing(ErlNifEnv* env, ERL_NIF_ if(!get_boolean(term_value, &latency_aware_routing)) return make_bad_options(env, term_option); - cass_cluster_set_latency_aware_routing(data->cluster, latency_aware_routing); + cass_cluster_set_latency_aware_routing(cluster, latency_aware_routing); return ATOMS.atomOk; } @@ -332,7 +347,7 @@ ERL_NIF_TERM internal_cluster_set_latency_aware_routing(ErlNifEnv* env, ERL_NIF_ if(!get_boolean(items[0], &latency_aware_routing)) return make_bad_options(env, term_option); - cass_cluster_set_latency_aware_routing(data->cluster, latency_aware_routing); + cass_cluster_set_latency_aware_routing(cluster, latency_aware_routing); //set also the settings @@ -360,11 +375,11 @@ ERL_NIF_TERM internal_cluster_set_latency_aware_routing(ErlNifEnv* env, ERL_NIF_ if(!enif_get_uint64(env, items[4], &min_measured)) return make_bad_options(env, term_option); - cass_cluster_set_latency_aware_routing_settings(data->cluster, exclusion_threshold, scale_ms, retry_period_ms, update_rate_ms, min_measured); + cass_cluster_set_latency_aware_routing_settings(cluster, exclusion_threshold, scale_ms, retry_period_ms, update_rate_ms, min_measured); return ATOMS.atomOk; } -ERL_NIF_TERM apply_cluster_settings(ErlNifEnv* env, ERL_NIF_TERM term_option, ERL_NIF_TERM term_key, ERL_NIF_TERM term_value, cassandra_data* data) +ERL_NIF_TERM apply_cluster_settings(ErlNifEnv* env, CassCluster* cluster, ERL_NIF_TERM term_option, ERL_NIF_TERM term_key, ERL_NIF_TERM term_value, cassandra_data* data) { CUSTOM_SETTING(ATOMS.atomClusterDefaultConsistencyLevel, internal_cluster_set_default_consistency_level); @@ -398,46 +413,61 @@ ERL_NIF_TERM apply_cluster_settings(ErlNifEnv* env, ERL_NIF_TERM term_option, ER return make_bad_options(env, term_option); } + +void nif_cass_cluster_free(ErlNifEnv* env, void* obj) +{ + enif_cass_cluster *enif_cluster = static_cast(obj); + + if(enif_cluster->cluster != NULL) + cass_cluster_free(enif_cluster->cluster); +} + ERL_NIF_TERM nif_cass_cluster_create(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { cassandra_data* data = static_cast(enif_priv_data(env)); - if(data->cluster) - { - //this can happened in case the erlcass gen_server is crashes - cass_cluster_free(data->cluster); - data->cluster = NULL; - } + enif_cass_cluster *enif_cluster = static_cast(enif_alloc_resource(data->resCassCluster, sizeof(enif_cass_cluster))); - data->cluster = cass_cluster_new(); + if(enif_cluster == NULL) + return make_error(env, erlcass::kFailedToAllocResourceMsg); - if(data->cluster == NULL) - return make_error(env, erlcass::kClusterObjectFailedToCreateMsg); + enif_cluster->cluster = cass_cluster_new(); - return ATOMS.atomOk; + ERL_NIF_TERM term = enif_make_resource(env, enif_cluster); + enif_release_resource(enif_cluster); + + return enif_make_tuple2(env, ATOMS.atomOk, term); } ERL_NIF_TERM nif_cass_cluster_release(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { cassandra_data* data = static_cast(enif_priv_data(env)); - if(!data->cluster) - return make_error(env, erlcass::kClusterObjectNotCreatedMsg); + enif_cass_cluster * enif_cluster = NULL; + + if(!enif_get_resource(env, argv[0], data->resCassCluster, (void**) &enif_cluster)) + return make_badarg(env); - cass_cluster_free(data->cluster); - data->cluster = NULL; + cass_cluster_free(enif_cluster->cluster); + enif_cluster->cluster = NULL; return ATOMS.atomOk; } ERL_NIF_TERM nif_cass_cluster_set_options(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { - ERL_NIF_TERM options = argv[0]; + cassandra_data* data = static_cast(enif_priv_data(env)); + + enif_cass_cluster * enif_cluster = NULL; + + if(!enif_get_resource(env, argv[0], data->resCassCluster, (void**) &enif_cluster)) + return make_badarg(env); + + ERL_NIF_TERM options = argv[1]; if(!enif_is_list(env, options)) return make_bad_options(env, options); - cassandra_data* data = static_cast(enif_priv_data(env)); ERL_NIF_TERM head; const ERL_NIF_TERM *items; @@ -454,7 +484,7 @@ ERL_NIF_TERM nif_cass_cluster_set_options(ErlNifEnv* env, int argc, const ERL_NI if(!enif_is_atom(env, key)) return make_bad_options(env, head); - ERL_NIF_TERM result = apply_cluster_settings(env, head, key, value, data); + ERL_NIF_TERM result = apply_cluster_settings(env, enif_cluster->cluster, head, key, value, data); if(!enif_is_identical(ATOMS.atomOk, result)) return result; diff --git a/c_src/nif_cass_cluster.h b/c_src/nif_cass_cluster.h index 7aaf2f6..e1f71e5 100644 --- a/c_src/nif_cass_cluster.h +++ b/c_src/nif_cass_cluster.h @@ -2,6 +2,10 @@ #define ERLCASS_C_SRC_NIF_CASS_CLUSTER_H #include "erl_nif.h" +#include "cassandra.h" + +void nif_cass_cluster_free(ErlNifEnv* env, void* obj); +CassCluster* get_cass_cluster(ErlNifEnv* env, ErlNifResourceType* resource_type, const ERL_NIF_TERM arg); ERL_NIF_TERM nif_cass_cluster_create(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM nif_cass_cluster_release(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); diff --git a/c_src/nif_cass_session.cc b/c_src/nif_cass_session.cc index 8215a43..e911342 100644 --- a/c_src/nif_cass_session.cc +++ b/c_src/nif_cass_session.cc @@ -1,3 +1,4 @@ +#include "nif_cass_cluster.h" #include "nif_cass_session.h" #include "nif_cass_prepared.h" #include "nif_cass_statement.h" @@ -6,7 +7,6 @@ #include "constants.h" #include "macros.h" #include "logger.hpp" -#include "cassandra.h" #include #include @@ -200,26 +200,31 @@ ERL_NIF_TERM nif_cass_session_connect(ErlNifEnv* env, int argc, const ERL_NIF_TE ErlNifBinary keyspace; ErlNifPid pid; - if(!enif_get_resource(env, argv[0], data->resCassSession, (void**) &enif_session)) + CassCluster* cluster = get_cass_cluster(env, data->resCassCluster, argv[0]); + + if(cluster == NULL) return make_badarg(env); - if(!enif_get_local_pid(env, argv[1], &pid)) + if(!enif_get_resource(env, argv[1], data->resCassSession, (void**) &enif_session)) return make_badarg(env); - if(argc == 3 && !get_bstring(env, argv[2], &keyspace)) + if(!enif_get_local_pid(env, argv[2], &pid)) return make_badarg(env); - callback_info* callback = callback_info_alloc(env, pid, argv[1]); + if(argc == 4 && !get_bstring(env, argv[3], &keyspace)) + return make_badarg(env); + + callback_info* callback = callback_info_alloc(env, pid, argv[2]); if(callback == NULL) return make_error(env, erlcass::kFailedToCreateCallbackInfoMsg); CassFuture* future; - if(argc == 3) - future = cass_session_connect_keyspace_n(enif_session->session, data->cluster, BIN_TO_STR(keyspace.data), keyspace.size); + if(argc == 4) + future = cass_session_connect_keyspace_n(enif_session->session, cluster, BIN_TO_STR(keyspace.data), keyspace.size); else - future = cass_session_connect(enif_session->session, data->cluster); + future = cass_session_connect(enif_session->session, cluster); CassError error = cass_future_set_callback(future, on_session_connect, callback); diff --git a/src/erlcass.erl b/src/erlcass.erl index 194ed2c..72ac605 100644 --- a/src/erlcass.erl +++ b/src/erlcass.erl @@ -6,47 +6,47 @@ -behaviour(gen_server). -export([ - start_link/0, + start_link/1, % metrics - get_metrics/0, + get_metrics/1, % queries - query/1, - query_async/1, - query_async/3, - query_new_statement/1, + query/2, + query_async/2, + query_async/4, + query_new_statement/2, % prepared statements - add_prepare_statement/2, - async_execute/1, + add_prepare_statement/3, async_execute/2, async_execute/3, async_execute/4, async_execute/5, - execute/1, + async_execute/6, execute/2, execute/3, + execute/4, % batch - batch_execute/3, - batch_async_execute/3, + batch_execute/4, + batch_async_execute/4, % schema metadata - get_schema_metadata/0, get_schema_metadata/1, get_schema_metadata/2, + get_schema_metadata/3, % low level methods to deal with statements - bind_prepared_statement/1, - bind_prepared_params_by_name/2, - bind_prepared_params_by_index/2, + bind_prepared_statement/2, + bind_prepared_params_by_name/3, + bind_prepared_params_by_index/3, % gen_server callbacks @@ -59,7 +59,7 @@ ]). -record(erlcass_stm, {session, stm}). --record(state, {session}). +-record(state, {cluster, session, config, stm_cache_table, stm_session_table}). -type query() :: binary() | {binary(), integer()} | {binary(), list()}. -type statement_ref() :: #erlcass_stm{}. @@ -69,51 +69,51 @@ -type recv_pid() :: pid() | null. -type query_result() :: ok | {ok, list(), list()} | {error, reason()}. --spec get_metrics() -> +-spec get_metrics(pid()) -> {ok, list()} | {error, reason()}. -get_metrics() -> - call(get_metrics). +get_metrics(Pid) -> + call(Pid, get_metrics). --spec get_schema_metadata() -> +-spec get_schema_metadata(pid()) -> {ok, list()} | {error, reason()}. -get_schema_metadata() -> - call(get_schema_metadata). +get_schema_metadata(Pid) -> + call(Pid, get_schema_metadata). --spec get_schema_metadata(binary()) -> +-spec get_schema_metadata(pid(), binary()) -> {ok, list()} | {error, reason()}. -get_schema_metadata(Keyspace) -> - call({get_schema_metadata, Keyspace}). +get_schema_metadata(Pid, Keyspace) -> + call(Pid, {get_schema_metadata, Keyspace}). --spec get_schema_metadata(binary(), binary()) -> +-spec get_schema_metadata(pid(), binary(), binary()) -> {ok, list()} | {error, reason()}. -get_schema_metadata(Keyspace, Table) -> - call({get_schema_metadata, Keyspace, Table}). +get_schema_metadata(Pid, Keyspace, Table) -> + call(Pid, {get_schema_metadata, Keyspace, Table}). %non prepared query statements --spec query_new_statement(query()) -> +-spec query_new_statement(pid(), query()) -> {ok, tag()} | {error, reason()}. -query_new_statement(Query) -> +query_new_statement(_Pid, Query) -> erlcass_nif:cass_statement_new(Query). --spec query_async(query()) -> +-spec query_async(pid(), query()) -> {ok, tag()} | {error, reason()}. -query_async(Q) -> - query_async(Q, self(), make_ref()). +query_async(Pid, Q) -> + query_async(Pid, Q, self(), make_ref()). --spec query_async(query(), recv_pid(), any()) -> +-spec query_async(pid(), query(), recv_pid(), any()) -> {ok, tag()} | {error, reason()}. -query_async(Q, ReceiverPid, Tag) -> - case query_new_statement(Q) of +query_async(Pid, Q, ReceiverPid, Tag) -> + case query_new_statement(Pid, Q) of {ok, Stm} -> - case call({execute_normal_statements, get_identifier(ReceiverPid, Q), Stm, ReceiverPid, Tag}) of + case call(Pid, {execute_normal_statements, get_identifier(ReceiverPid, Q), Stm, ReceiverPid, Tag}) of ok -> {ok, Tag}; Error -> @@ -123,11 +123,11 @@ query_async(Q, ReceiverPid, Tag) -> Error end. --spec query(query()) -> +-spec query(pid(), query()) -> query_result(). -query(Q) -> - case query_async(Q) of +query(Pid, Q) -> + case query_async(Pid, Q) of {ok, Tag} -> receive_response(Tag); Error -> @@ -136,17 +136,19 @@ query(Q) -> %prepared statements --spec add_prepare_statement(atom(), query()) -> +-spec add_prepare_statement(pid(), atom(), query()) -> ok | {error, reason()}. -add_prepare_statement(Identifier, Query) -> - call({add_prepare_statement, Identifier, Query}, ?RESPONSE_TIMEOUT). +add_prepare_statement(Pid, Identifier, Query) -> + call(Pid, {add_prepare_statement, Identifier, Query}, ?RESPONSE_TIMEOUT). --spec bind_prepared_statement(atom()) -> +-spec bind_prepared_statement(pid(), atom()) -> {ok, statement_ref()} | {error, reason()}. -bind_prepared_statement(Identifier) -> - case erlcass_stm_sessions:get(Identifier) of +bind_prepared_statement(Pid, Identifier) when is_atom(Pid) -> + StmSessionTable = erlcass_stm_sessions:get_existing_table_name(Pid), + + case erlcass_stm_sessions:get(StmSessionTable, Identifier) of {Session, PrepStatement} -> case erlcass_nif:cass_prepared_bind(PrepStatement) of {ok, StmRef} -> @@ -160,23 +162,23 @@ bind_prepared_statement(Identifier) -> Error end. --spec bind_prepared_params_by_name(statement_ref(), list()) -> +-spec bind_prepared_params_by_name(pid(), statement_ref(), list()) -> ok | {error, reason()}. -bind_prepared_params_by_name(Stm, Params) -> +bind_prepared_params_by_name(_Pid, Stm, Params) -> erlcass_nif:cass_statement_bind_parameters(Stm#erlcass_stm.stm, ?BIND_BY_NAME, Params). --spec bind_prepared_params_by_index(statement_ref(), list()) -> +-spec bind_prepared_params_by_index(pid(), statement_ref(), list()) -> ok | {error, reason()}. -bind_prepared_params_by_index(Stm, Params) -> +bind_prepared_params_by_index(_Pid, Stm, Params) -> erlcass_nif:cass_statement_bind_parameters(Stm#erlcass_stm.stm, ?BIND_BY_INDEX, Params). --spec async_execute(atom()) -> +-spec async_execute(pid(), atom()) -> {ok, tag()} | {error, reason()}. -async_execute(Identifier) -> - case bind_prepared_statement(Identifier) of +async_execute(Pid, Identifier) -> + case bind_prepared_statement(Pid, Identifier) of {ok, Stm} -> Tag = make_ref(), ReceiverPid = self(), @@ -190,35 +192,35 @@ async_execute(Identifier) -> Error end. --spec async_execute(atom() | query(), list()) -> +-spec async_execute(pid(), atom() | query(), list()) -> {ok, tag()} | {error, reason()}. -async_execute(Identifier, Params) -> - async_execute(Identifier, ?BIND_BY_INDEX, Params). +async_execute(Pid, Identifier, Params) -> + async_execute(Pid, Identifier, ?BIND_BY_INDEX, Params). --spec async_execute(atom() | binary(), bind_type(), list()) -> +-spec async_execute(pid(), atom() | binary(), bind_type(), list()) -> {ok, tag()} | {error, reason()}. -async_execute(Identifier, BindType, Params) -> +async_execute(Pid, Identifier, BindType, Params) -> Tag = make_ref(), - case async_execute(Identifier, BindType, Params, self(), Tag) of + case async_execute(Pid, Identifier, BindType, Params, self(), Tag) of ok -> {ok, Tag}; Error -> Error end. --spec async_execute(atom() | binary(), bind_type(), list(), any()) -> +-spec async_execute(pid(), atom() | binary(), bind_type(), list(), any()) -> ok | {error, reason()}. -async_execute(Identifier, BindType, Params, Tag) -> - async_execute(Identifier, BindType, Params, self(), Tag). +async_execute(Pid, Identifier, BindType, Params, Tag) -> + async_execute(Pid, Identifier, BindType, Params, self(), Tag). --spec async_execute(atom(), bind_type(), list(), recv_pid(), any()) -> +-spec async_execute(pid(), atom(), bind_type(), list(), recv_pid(), any()) -> ok | {error, reason()}. -async_execute(Identifier, BindType, Params, ReceiverPid, Tag) -> - case bind_prepared_statement(Identifier) of +async_execute(Pid, Identifier, BindType, Params, ReceiverPid, Tag) -> + case bind_prepared_statement(Pid, Identifier) of {ok, Stm} -> case erlcass_nif:cass_statement_bind_parameters(Stm#erlcass_stm.stm, BindType, Params) of ok -> @@ -230,67 +232,80 @@ async_execute(Identifier, BindType, Params, ReceiverPid, Tag) -> Error end. --spec execute(atom()) -> +-spec execute(pid(), atom()) -> query_result(). -execute(Identifier) -> - case async_execute(Identifier) of +execute(Pid, Identifier) -> + case async_execute(Pid, Identifier) of {ok, Tag} -> receive_response(Tag); Error -> Error end. --spec execute(atom(), list()) -> +-spec execute(pid(), atom(), list()) -> query_result(). -execute(Identifier, Params) -> - execute(Identifier, ?BIND_BY_INDEX, Params). +execute(Pid, Identifier, Params) -> + execute(Pid, Identifier, ?BIND_BY_INDEX, Params). --spec execute(atom(), bind_type(), list()) -> +-spec execute(pid(), atom(), bind_type(), list()) -> query_result(). -execute(Identifier, BindType, Params) -> - case async_execute(Identifier, BindType, Params) of +execute(Pid, Identifier, BindType, Params) -> + case async_execute(Pid, Identifier, BindType, Params) of {ok, Tag} -> receive_response(Tag); Error -> Error end. --spec batch_async_execute(batch_type(), list(), list()) -> +-spec batch_async_execute(pid(), batch_type(), list(), list()) -> {ok, tag()} | {error, reason()}. -batch_async_execute(BatchType, StmList, Options) -> - call({batch_execute, BatchType, StmList, Options}). +batch_async_execute(Pid, BatchType, StmList, Options) -> + call(Pid, {batch_execute, BatchType, StmList, Options}). --spec batch_execute(batch_type(), list(), list()) -> +-spec batch_execute(pid(), batch_type(), list(), list()) -> query_result(). -batch_execute(BatchType, StmList, Options) -> - case batch_async_execute(BatchType, StmList, Options) of +batch_execute(Pid, BatchType, StmList, Options) -> + case batch_async_execute(Pid, BatchType, StmList, Options) of {ok, Tag} -> receive_response(Tag); Error -> Error end. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +start_link({Name, Config}) -> + gen_server:start_link({local, Name}, ?MODULE, {Name, Config}, []). %internal functions -init([]) -> +init({Name, Config}) -> process_flag(trap_exit, true), - ok = erlcass_stm_sessions:create(), + {ok, Cluster} = erlcass_cluster:create(), + + ClusterOptions = erlcass_utils:except(keyspace, Config), + erlcass_cluster:set_options(Cluster, ClusterOptions), + + StmCacheTable = erlcass_stm_cache:get_table_name(Name), - case session_create() of + StmSessionTable = erlcass_stm_sessions:get_table_name(Name), + ok = erlcass_stm_sessions:create(StmSessionTable), + + case session_create(Cluster, Config) of {ok, SessionRef} -> - session_prepare_cached_statements(SessionRef), - {ok, #state{session = SessionRef}}; + session_prepare_cached_statements(SessionRef, StmSessionTable, StmCacheTable), + Response = {ok, #state{session = SessionRef, + cluster = Cluster, + config = Config, + stm_cache_table = StmCacheTable, + stm_session_table = StmSessionTable}}, + Response; Error -> - {stop, Error, shutdown, #state{}} + {stop, Error, shutdown, #state{cluster = Cluster, config = Config}} end. handle_call({execute_normal_statements, Identifier, StmRef, ReceiverPid, Tag}, _From, State) -> @@ -301,7 +316,7 @@ handle_call({batch_execute, BatchType, StmList, Options}, From, State) -> {reply, erlcass_nif:cass_session_execute_batch(State#state.session, BatchType, filter_stm_list(StmList), Options, FromPid), State}; handle_call({add_prepare_statement, Identifier, Query}, From, State) -> - case erlcass_stm_cache:find(Identifier) of + case erlcass_stm_cache:find(State#state.stm_cache_table, Identifier) of false -> ok = erlcass_nif:cass_session_prepare(State#state.session, self(), Query, {From, Identifier, Query}), {noreply, State}; @@ -331,8 +346,8 @@ handle_info({prepared_statement_result, Result, {From, Identifier, Query}}, #sta case Result of {ok, StmRef} -> - erlcass_stm_cache:set(Identifier, Query), - erlcass_stm_sessions:set(Identifier, Session, StmRef), + erlcass_stm_cache:set(State#state.stm_cache_table, Identifier, Query), + erlcass_stm_sessions:set(State#state.stm_session_table, Identifier, Session, StmRef), gen_server:reply(From, ok); _ -> gen_server:reply(From, Result) @@ -343,7 +358,7 @@ handle_info(Info, State) -> ?ERROR_MSG("session ~p received unexpected message: ~p", [self(), Info]), {noreply, State}. -terminate(Reason, #state {session = Session}) -> +terminate(Reason, #state {session = Session, cluster = Cluster}) -> Self = self(), ?INFO_MSG("closing session ~p with reason: ~p", [Self, Reason]), @@ -352,7 +367,8 @@ terminate(Reason, #state {session = Session}) -> ?INFO_MSG("session ~p closed completed", [Self]); Error -> ?ERROR_MSG("session ~p closed with error: ~p", [Self, Error]) - end. + end, + ok = erlcass_cluster:release(Cluster). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -368,13 +384,11 @@ receive_response(Tag) -> {error, timeout} end. -do_connect(Session, Pid) -> - case erlcass_utils:get_env(keyspace) of - {ok, Keyspace} -> - erlcass_nif:cass_session_connect(Session, Pid, Keyspace); - _ -> - erlcass_nif:cass_session_connect(Session, Pid) - end. +do_connect(Cluster, Session, Pid, {ok, Keyspace}) -> + erlcass_nif:cass_session_connect(Cluster, Session, Pid, Keyspace); + +do_connect(Cluster, Session, Pid, _Keyspace) -> + erlcass_nif:cass_session_connect(Cluster, Session, Pid). do_close(undefined, _Pid, _Timeout) -> ok; @@ -389,11 +403,12 @@ do_close(Session, Pid, Timeout) -> end end. -session_create() -> +session_create(Cluster, Config) -> case erlcass_nif:cass_session_new() of {ok, Session} -> Self = self(), - case do_connect(Session, Self) of + Keyspace = erlcass_utils:lookup(keyspace, Config), + case do_connect(Cluster, Session, Self, Keyspace) of ok -> receive {session_connected, Self, Result} -> @@ -411,7 +426,7 @@ session_create() -> Error end. -session_prepare_cached_statements(SessionRef) -> +session_prepare_cached_statements(SessionRef, StmSessionTable, StmCacheTable) -> FunPrepareStm = fun({Identifier, Query}) -> Tag = make_ref(), @@ -425,7 +440,7 @@ session_prepare_cached_statements(SessionRef) -> case Result of {ok, StmRef} -> - erlcass_stm_sessions:set(Identifier, SessionRef, StmRef); + erlcass_stm_sessions:set(StmSessionTable, Identifier, SessionRef, StmRef); _ -> throw({error, {failed_to_prepare_cached_stm, Identifier, Result}}) end @@ -437,7 +452,7 @@ session_prepare_cached_statements(SessionRef) -> throw(Error) end end, - ok = lists:foreach(FunPrepareStm, erlcass_stm_cache:to_list()). + ok = lists:foreach(FunPrepareStm, erlcass_stm_cache:to_list(StmCacheTable)). filter_stm_list(StmList) -> filter_stm_list(StmList, []). @@ -452,12 +467,12 @@ filter_stm_list([H|T], Acc) -> filter_stm_list([], Acc) -> Acc. -call(Message) -> - call(Message, 5000). +call(Pid, Message) -> + call(Pid, Message, 5000). -call(Message, Timeout) -> +call(Pid, Message, Timeout) -> try - gen_server:call(?MODULE, Message, Timeout) + gen_server:call(Pid, Message, Timeout) catch exit:{noproc, _} -> {error, erlcass_not_started}; diff --git a/src/erlcass_app.erl b/src/erlcass_app.erl index 4c2a066..d8179dc 100644 --- a/src/erlcass_app.erl +++ b/src/erlcass_app.erl @@ -10,20 +10,16 @@ ]). start(_StartType, _StartArgs) -> - - ok = erlcass_cluster:create(), erlcass_cluster:set_log_level(erlcass_utils:get_env(log_level, ?CASS_LOG_WARN)), - case erlcass_utils:get_env(cluster_options) of - {ok, ClusterOptions} -> - erlcass_cluster:set_options(ClusterOptions); - _ -> - ok - end, - - ok = erlcass_stm_cache:create(), + {ok, Clusters} = erlcass_utils:get_env(clusters), + lists:map(fun({Name, _Config}) -> start_stm_caches(Name) end, Clusters), erlcass_sup:start_link(). stop(_State) -> ok = erlcass_cluster:release(), ok. + +start_stm_caches(Name) -> + TableName = erlcass_stm_cache:get_table_name(Name), + ok = erlcass_stm_cache:create(TableName). diff --git a/src/erlcass_cluster.erl b/src/erlcass_cluster.erl index 7cf62d8..f27ecdd 100644 --- a/src/erlcass_cluster.erl +++ b/src/erlcass_cluster.erl @@ -2,17 +2,17 @@ -export([ create/0, - set_options/1, + set_options/2, set_log_level/1, set_log_process_receiver/1, - release/0 + release/1 ]). create() -> - ok = erlcass_nif:cass_cluster_create(). + {ok, _Cluster} = erlcass_nif:cass_cluster_create(). -set_options(Options) -> - ok = erlcass_nif:cass_cluster_set_options(Options). +set_options(Cluster, Options) -> + ok = erlcass_nif:cass_cluster_set_options(Cluster, Options). set_log_level(Level) -> ok = erlcass_nif:cass_log_set_level(Level). @@ -20,5 +20,5 @@ set_log_level(Level) -> set_log_process_receiver(Pid) -> ok = erlcass_nif:cass_log_set_callback(Pid). -release() -> - erlcass_nif:cass_cluster_release(). \ No newline at end of file +release(Cluster) -> + erlcass_nif:cass_cluster_release(Cluster). \ No newline at end of file diff --git a/src/erlcass_nif.erl b/src/erlcass_nif.erl index c79226f..4eb8b1e 100644 --- a/src/erlcass_nif.erl +++ b/src/erlcass_nif.erl @@ -10,11 +10,11 @@ cass_log_set_level/1, cass_log_set_callback/1, cass_cluster_create/0, - cass_cluster_release/0, - cass_cluster_set_options/1, + cass_cluster_release/1, + cass_cluster_set_options/2, cass_session_new/0, - cass_session_connect/2, cass_session_connect/3, + cass_session_connect/4, cass_session_close/2, cass_session_prepare/4, cass_session_execute/5, @@ -67,19 +67,19 @@ cass_log_set_callback(_LogPid) -> cass_cluster_create() -> ?NOT_LOADED. -cass_cluster_release() -> +cass_cluster_release(_Cluster) -> ?NOT_LOADED. -cass_cluster_set_options(_OptionList) -> +cass_cluster_set_options(_Cluster, _OptionList) -> ?NOT_LOADED. cass_session_new() -> ?NOT_LOADED. -cass_session_connect(_SessionRef, _FromPid) -> +cass_session_connect(_Cluster, _SessionRef, _FromPid) -> ?NOT_LOADED. -cass_session_connect(_SessionRef, _FromPid, _Keyspace) -> +cass_session_connect(_Cluster, _SessionRef, _FromPid, _Keyspace) -> ?NOT_LOADED. cass_session_close(_SessionRef, _Pid) -> diff --git a/src/erlcass_stm_cache.erl b/src/erlcass_stm_cache.erl index ec4cbcd..b376be6 100644 --- a/src/erlcass_stm_cache.erl +++ b/src/erlcass_stm_cache.erl @@ -1,28 +1,37 @@ -module(erlcass_stm_cache). -export([ - create/0, - set/2, - find/1, - to_list/0 + get_table_name/1, + get_existing_table_name/1, + create/1, + set/3, + find/2, + to_list/1 ]). --define(ETS_PREPARED_STM_CACHE, erlcass_ets_prepared_stm_cache). +table_name(Name) -> + erlcass_utils:concat_atoms(Name, '_erlcass_ets_prepared_stm_cache'). -create() -> - ?ETS_PREPARED_STM_CACHE = ets:new(?ETS_PREPARED_STM_CACHE, [set, named_table, public, {read_concurrency, true}]), +get_table_name(Name) -> + binary_to_atom(table_name(Name), utf8). + +get_existing_table_name(Name) -> + binary_to_existing_atom(table_name(Name), utf8). + +create(Name) -> + Name = ets:new(Name, [set, named_table, public, {read_concurrency, true}]), ok. -set(Identifier, Query) -> - true = ets:insert(?ETS_PREPARED_STM_CACHE, {Identifier, Query}). +set(Name, Identifier, Query) -> + true = ets:insert(Name, {Identifier, Query}). -find(Identifier) -> - case ets:lookup(?ETS_PREPARED_STM_CACHE, Identifier) of +find(Name, Identifier) -> + case ets:lookup(Name, Identifier) of [{Identifier, _Query}] -> true; [] -> false end. -to_list() -> - ets:tab2list(?ETS_PREPARED_STM_CACHE). \ No newline at end of file +to_list(Name) -> + ets:tab2list(Name). \ No newline at end of file diff --git a/src/erlcass_stm_sessions.erl b/src/erlcass_stm_sessions.erl index a58bc70..c7ef8a3 100644 --- a/src/erlcass_stm_sessions.erl +++ b/src/erlcass_stm_sessions.erl @@ -1,22 +1,31 @@ -module(erlcass_stm_sessions). -export([ - create/0, - set/3, - get/1 + get_table_name/1, + get_existing_table_name/1, + create/1, + set/4, + get/2 ]). --define(ETS_PREPARED_STM_SESSIONS, erlcass_ets_prepared_stm_sessions). +table_name(Name) -> + erlcass_utils:concat_atoms(Name, '_erlcass_ets_prepared_stm_sessions'). -create() -> - ?ETS_PREPARED_STM_SESSIONS = ets:new(?ETS_PREPARED_STM_SESSIONS, [set, named_table, protected, {read_concurrency, true}]), +get_table_name(Name) -> + binary_to_atom(table_name(Name), utf8). + +get_existing_table_name(Name) -> + binary_to_existing_atom(table_name(Name), utf8). + +create(Name) -> + Name = ets:new(Name, [set, named_table, protected, {read_concurrency, true}]), ok. -set(Identifier, Session, StatementRef) -> - true = ets:insert(?ETS_PREPARED_STM_SESSIONS, {Identifier, {Session, StatementRef}}). +set(Name, Identifier, Session, StatementRef) -> + true = ets:insert(Name, {Identifier, {Session, StatementRef}}). -get(Identifier) -> - case ets:lookup(?ETS_PREPARED_STM_SESSIONS, Identifier) of +get(Name, Identifier) -> + case ets:lookup(Name, Identifier) of [{Identifier, Value}] -> Value; [] -> diff --git a/src/erlcass_sup.erl b/src/erlcass_sup.erl index 11ccad4..05f16f6 100644 --- a/src/erlcass_sup.erl +++ b/src/erlcass_sup.erl @@ -11,12 +11,15 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Children = [ - proccess(erlcass_log, infinity), - proccess(erlcass, infinity) + {ok, Clusters} = erlcass_utils:get_env(clusters), + + Logger = [ + {erlcass_log, {erlcass_log, start_link, []}, permanent, infinity, worker, [erlcass_log]} ], + Workers = lists:map(fun(Cluster) -> process(Cluster, infinity) end, Clusters), + Children = Logger ++ Workers, {ok, {{one_for_one, 10, 1}, Children}}. -proccess(Name, WaitForClose) -> - {Name, {Name, start_link, []}, permanent, WaitForClose, worker, [Name]}. \ No newline at end of file +process({Name, Config}, WaitForClose) -> + {Name, {erlcass, start_link, [{Name, Config}]}, permanent, WaitForClose, worker, [erlcass]}. \ No newline at end of file diff --git a/src/erlcass_utils.erl b/src/erlcass_utils.erl index bbe82a9..1ec9f36 100644 --- a/src/erlcass_utils.erl +++ b/src/erlcass_utils.erl @@ -4,7 +4,9 @@ get_env/1, get_env/2, lookup/2, - lookup/3 + lookup/3, + except/2, + concat_atoms/2 ]). get_env(Key) -> @@ -22,4 +24,17 @@ lookup(Key, List, Default) -> end. lookup(Key, List) -> - lookup(Key, List, null). \ No newline at end of file + lookup(Key, List, null). + +except(Key, List) -> + lists:filter(fun(Elem) -> + case Elem of + {Key, _} -> false; + _Other -> true + end + end, List). + +concat_atoms(Atom1, Atom2) -> + A = atom_to_binary(Atom1, utf8), + B = atom_to_binary(Atom2, utf8), + <>. \ No newline at end of file diff --git a/test/integrity_test_SUITE.erl b/test/integrity_test_SUITE.erl index e3ef08b..4b4304e 100644 --- a/test/integrity_test_SUITE.erl +++ b/test/integrity_test_SUITE.erl @@ -45,35 +45,35 @@ end_per_suite(_Config) -> application:stop(erlcass). create_keyspace(_Config) -> - erlcass:query(<<"DROP KEYSPACE erlang_driver_test">>), - ok = erlcass:query(<<"CREATE KEYSPACE erlang_driver_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}">>). + erlcass:query(erlcass_conn, <<"DROP KEYSPACE erlang_driver_test">>), + ok = erlcass:query(erlcass_conn, <<"CREATE KEYSPACE erlang_driver_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}">>). create_table(_Config) -> - ok = erlcass:query(<<"CREATE TABLE erlang_driver_test.entries1 (id varchar, age int, email varchar, PRIMARY KEY(id))">>). + ok = erlcass:query(erlcass_conn, <<"CREATE TABLE erlang_driver_test.entries1 (id varchar, age int, email varchar, PRIMARY KEY(id))">>). simple_insertion_roundtrip(_Config) -> Id = <<"hello">>, Age = 18, Email = <<"test@test.com">>, - ok = erlcass:query(<<"INSERT INTO erlang_driver_test.entries1(id, age, email) VALUES ('", + ok = erlcass:query(erlcass_conn, <<"INSERT INTO erlang_driver_test.entries1(id, age, email) VALUES ('", Id/binary,"',", (integer_to_binary(18))/binary, ", '", Email/binary, "')">>), Cols = [{<<"id">>, text}, {<<"age">>, int}, {<<"email">>, text}], - {ok, Cols, [[Id, Age, Email]]} = erlcass:query(<<"SELECT id, age, email FROM erlang_driver_test.entries1">>). + {ok, Cols, [[Id, Age, Email]]} = erlcass:query(erlcass_conn, <<"SELECT id, age, email FROM erlang_driver_test.entries1">>). emptiness(_Config) -> - ok = erlcass:query(<<"update erlang_driver_test.entries1 set email = null where id = 'hello';">>), - {ok, [{<<"email">>, text}], [[null]]} = erlcass:query(<<"select email from erlang_driver_test.entries1 where id = 'hello';">>). + ok = erlcass:query(erlcass_conn, <<"update erlang_driver_test.entries1 set email = null where id = 'hello';">>), + {ok, [{<<"email">>, text}], [[null]]} = erlcass:query(erlcass_conn, <<"select email from erlang_driver_test.entries1 where id = 'hello';">>). async_insertion_roundtrip(_Config) -> Id = <<"hello_async">>, Age = 32, Email = <<"zz@test.com">>, - {ok, Tag} = erlcass:query_async(<<"INSERT INTO erlang_driver_test.entries1(id, age, email) VALUES ('", + {ok, Tag} = erlcass:query_async(erlcass_conn, <<"INSERT INTO erlang_driver_test.entries1(id, age, email) VALUES ('", Id/binary,"',", (integer_to_binary(Age))/binary, ", '", Email/binary, "')">>), @@ -86,7 +86,7 @@ async_insertion_roundtrip(_Config) -> ct:fail("Timeout on executing query ~n", []) end, - {ok, Tag2} = erlcass:query_async(<<"SELECT id, age, email FROM erlang_driver_test.entries1">>), + {ok, Tag2} = erlcass:query_async(erlcass_conn, <<"SELECT id, age, email FROM erlang_driver_test.entries1">>), receive {execute_statement_result, Tag2, Result2} -> @@ -115,7 +115,7 @@ all_datatypes(_Config) -> {<<"col8">>, int}, {<<"col9">>, timestamp}, {<<"col10">>, uuid}, {<<"col11">>, text}, {<<"col12">>, varint}, {<<"col13">>, timeuuid}, {<<"col14">>, inet}, {<<"col15">>,tinyint}, {<<"col16">>, smallint}, {<<"col17">>,date}, {<<"col18">>, time}], - ok = erlcass:query(<<"CREATE TABLE erlang_driver_test.entries2(", Cols/binary, " PRIMARY KEY(col1));">>), + ok = erlcass:query(erlcass_conn, <<"CREATE TABLE erlang_driver_test.entries2(", Cols/binary, " PRIMARY KEY(col1));">>), AsciiValBin = <<"hello">>, BigIntPositive = 9223372036854775807, @@ -139,10 +139,10 @@ all_datatypes(_Config) -> InsertQuery = <<"INSERT INTO erlang_driver_test.entries2(col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, col14, col15, col16, col17, col18) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)">>, SelectQuery = <<"SELECT col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, col14, col15, col16, col17, col18 FROM erlang_driver_test.entries2 WHERE col1 =?">>, - ok = erlcass:add_prepare_statement(insert_all_datatypes, InsertQuery), - ok = erlcass:add_prepare_statement(select_all_datatypes, SelectQuery), + ok = erlcass:add_prepare_statement(erlcass_conn, insert_all_datatypes, InsertQuery), + ok = erlcass:add_prepare_statement(erlcass_conn, select_all_datatypes, SelectQuery), - ok = erlcass:execute(insert_all_datatypes, [ + ok = erlcass:execute(erlcass_conn, insert_all_datatypes, [ AsciiValBin, BigIntPositive, Blob, @@ -163,7 +163,7 @@ all_datatypes(_Config) -> Time ]), - {ok, SelectCols, [Result]} = erlcass:execute(select_all_datatypes, [AsciiValBin]), + {ok, SelectCols, [Result]} = erlcass:execute(erlcass_conn, select_all_datatypes, [AsciiValBin]), [AsciiValBin, BigIntPositive, Blob, BooleanTrue, DecimalPositive, DoublePositive, _, IntPositive, Timestamp, Uuid, Varchar1, Varint1, Timeuuid, Inet, TinyIntPositive, SmallIntPositive, Date, Time] = Result, @@ -179,7 +179,7 @@ all_datatypes(_Config) -> Varchar2 = <<"åäö"/utf8>>, Varint2 = erlang:integer_to_binary(123124211928301970128391280192830198049113123), - ok = erlcass:execute(insert_all_datatypes, ?BIND_BY_NAME, [ + ok = erlcass:execute(erlcass_conn, insert_all_datatypes, ?BIND_BY_NAME, [ {<<"col1">>, AsciiString}, {<<"col2">>, BigIntNegative}, {<<"col3">>, Blob}, @@ -200,7 +200,7 @@ all_datatypes(_Config) -> {<<"col18">>, Time} ]), - {ok, _, [Result2]} = erlcass:execute(select_all_datatypes, ?BIND_BY_NAME, [{<<"col1">>, AsciiString}]), + {ok, _, [Result2]} = erlcass:execute(erlcass_conn, select_all_datatypes, ?BIND_BY_NAME, [{<<"col1">>, AsciiString}]), BinAsciiString = list_to_binary(AsciiString), [BinAsciiString, BigIntNegative, Blob, BooleanFalse, DecimalNegative, DoubleNegative, _, IntNegative, Timestamp, Uuid, Varchar2, Varint2, Timeuuid, Inet, TinyIntNegative, SmallIntNegative, Date, Time] = Result2, @@ -208,13 +208,13 @@ all_datatypes(_Config) -> async_custom_tag(_Config) -> CreationQ = <<"CREATE TABLE erlang_driver_test.async_custom_tag(key int PRIMARY KEY, value map)">>, - ok = erlcass:query(CreationQ), + ok = erlcass:query(erlcass_conn, CreationQ), QuerySelect = <<"SELECT value FROM erlang_driver_test.async_custom_tag where key = ?">>, Tag1 = {mytag, 1}, - ok = erlcass:add_prepare_statement(select_async_custom_tag, QuerySelect), - ok = erlcass:async_execute(select_async_custom_tag, ?BIND_BY_INDEX, [1], self(), Tag1), + ok = erlcass:add_prepare_statement(erlcass_conn, select_async_custom_tag, QuerySelect), + ok = erlcass:async_execute(erlcass_conn, select_async_custom_tag, ?BIND_BY_INDEX, [1], self(), Tag1), receive {execute_statement_result, Tag1, Rs} -> @@ -228,7 +228,7 @@ async_custom_tag(_Config) -> prepared_bind_by_name_index(_Config) -> CreationQ = <<"CREATE TABLE erlang_driver_test.test_map(key int PRIMARY KEY, value map)">>, - ok = erlcass:query(CreationQ), + ok = erlcass:query(erlcass_conn, CreationQ), CollectionIndex1 = <<"my_index">>, CollectionValue1 = <<"my_value">>, @@ -243,24 +243,24 @@ prepared_bind_by_name_index(_Config) -> ]}, QuerySelect = <<"SELECT value FROM erlang_driver_test.test_map where key = ?">>, - ok = erlcass:add_prepare_statement(insert_test_bind, QueryInsertDefaultConsistencyLevel), - ok = erlcass:add_prepare_statement(insert_test_bind_local_q, QueryInsertLocalQuorum), - ok = erlcass:add_prepare_statement(insert_test_bind_serial_q, QueryInsertSerialConsistency), - ok = erlcass:add_prepare_statement(select_test_bind, QuerySelect), + ok = erlcass:add_prepare_statement(erlcass_conn, insert_test_bind, QueryInsertDefaultConsistencyLevel), + ok = erlcass:add_prepare_statement(erlcass_conn, insert_test_bind_local_q, QueryInsertLocalQuorum), + ok = erlcass:add_prepare_statement(erlcass_conn, insert_test_bind_serial_q, QueryInsertSerialConsistency), + ok = erlcass:add_prepare_statement(erlcass_conn, select_test_bind, QuerySelect), - ok = erlcass:execute(insert_test_bind, ?BIND_BY_NAME, [ + ok = erlcass:execute(erlcass_conn, insert_test_bind, ?BIND_BY_NAME, [ {<<"key(value)">>, CollectionIndex1}, {<<"value(value)">>, CollectionValue1}, {<<"key">>, Key1} ]), - ok = erlcass:execute(insert_test_bind, [CollectionIndex2, CollectionValue2, Key2]), - {ok, _, [[[{CollectionIndex1, CollectionValue1}]]]} = erlcass:execute(select_test_bind, ?BIND_BY_NAME, [{<<"key">>, Key1}]), - {ok, _, [[[{CollectionIndex2, CollectionValue2}]]]} = erlcass:execute(select_test_bind, [Key2]), + ok = erlcass:execute(erlcass_conn, insert_test_bind, [CollectionIndex2, CollectionValue2, Key2]), + {ok, _, [[[{CollectionIndex1, CollectionValue1}]]]} = erlcass:execute(erlcass_conn, select_test_bind, ?BIND_BY_NAME, [{<<"key">>, Key1}]), + {ok, _, [[[{CollectionIndex2, CollectionValue2}]]]} = erlcass:execute(erlcass_conn, select_test_bind, [Key2]), ok. fire_and_forget(_Config) -> - ok = erlcass:async_execute(insert_test_bind, ?BIND_BY_NAME, [ + ok = erlcass:async_execute(erlcass_conn, insert_test_bind, ?BIND_BY_NAME, [ {<<"key(value)">>, <<"my_index">>}, {<<"value(value)">>, <<"my_value">>}, {<<"key">>, 5} @@ -269,7 +269,7 @@ fire_and_forget(_Config) -> collection_types(_Config) -> CreationQ = <<"CREATE TABLE erlang_driver_test.entries3(key varchar, numbers list, names set, phones map, PRIMARY KEY(key));">>, - ok = erlcass:query(CreationQ), + ok = erlcass:query(erlcass_conn, CreationQ), Key2 = <<"somekeyhere_2">>, Key3 = <<"somekeyhere_3">>, @@ -279,25 +279,25 @@ collection_types(_Config) -> InsertQ = <<"INSERT INTO erlang_driver_test.entries3(key, numbers, names, phones) values (?, ?, ?, ?);">>, SelectQ = <<"SELECT key, numbers, names, phones FROM erlang_driver_test.entries3 WHERE key = ?;">>, SelectCols = [{<<"key">>, text}, {<<"numbers">>, {list, int}}, {<<"names">>, {set, text}}, {<<"phones">>, {map, int, text}}], - ok = erlcass:add_prepare_statement(insert_collection_types, InsertQ), - ok = erlcass:add_prepare_statement(select_collection_types, SelectQ), + ok = erlcass:add_prepare_statement(erlcass_conn, insert_collection_types, InsertQ), + ok = erlcass:add_prepare_statement(erlcass_conn, select_collection_types, SelectQ), - ok = erlcass:execute(insert_collection_types, ?BIND_BY_NAME, [ + ok = erlcass:execute(erlcass_conn, insert_collection_types, ?BIND_BY_NAME, [ {<<"key">>, Key2}, {<<"numbers">>, List}, {<<"names">>, Set}, {<<"phones">>, Map} ]), - ok = erlcass:execute(insert_collection_types, [Key3, List, Set, Map]), + ok = erlcass:execute(erlcass_conn, insert_collection_types, [Key3, List, Set, Map]), - {ok, SelectCols, [[Key2, List, Set, Map]]} = erlcass:execute(select_collection_types, ?BIND_BY_NAME, [{<<"key">>, Key2}]), - {ok, SelectCols, [[Key3, List, Set, Map]]} = erlcass:execute(select_collection_types, ?BIND_BY_INDEX, [Key3]), + {ok, SelectCols, [[Key2, List, Set, Map]]} = erlcass:execute(erlcass_conn, select_collection_types, ?BIND_BY_NAME, [{<<"key">>, Key2}]), + {ok, SelectCols, [[Key3, List, Set, Map]]} = erlcass:execute(erlcass_conn, select_collection_types, ?BIND_BY_INDEX, [Key3]), ok. nested_collections(_Config) -> CreationQ = <<"CREATE TABLE erlang_driver_test.nested_collections(key varchar, numbers list>>, PRIMARY KEY(key))">>, - ok = erlcass:query(CreationQ), + ok = erlcass:query(erlcass_conn, CreationQ), Key2 = <<"somekeyhere_2">>, Key3 = <<"somekeyhere_3">>, @@ -306,19 +306,19 @@ nested_collections(_Config) -> SelectQ = <<"SELECT key, numbers FROM erlang_driver_test.nested_collections WHERE key = ?">>, SelectCols = [{<<"key">>, text}, {<<"numbers">>, {list, {map, int, text}}}], - ok = erlcass:add_prepare_statement(nest_insert_collection_types, InsertQ), - ok = erlcass:add_prepare_statement(nest_select_collection_types, SelectQ), + ok = erlcass:add_prepare_statement(erlcass_conn, nest_insert_collection_types, InsertQ), + ok = erlcass:add_prepare_statement(erlcass_conn, nest_select_collection_types, SelectQ), - ok = erlcass:execute(nest_insert_collection_types, ?BIND_BY_NAME, [{<<"key">>, Key2}, {<<"numbers">>, List}]), - ok = erlcass:execute(nest_insert_collection_types, [Key3, List]), + ok = erlcass:execute(erlcass_conn, nest_insert_collection_types, ?BIND_BY_NAME, [{<<"key">>, Key2}, {<<"numbers">>, List}]), + ok = erlcass:execute(erlcass_conn, nest_insert_collection_types, [Key3, List]), - {ok, SelectCols, [[Key2, List]]} = erlcass:execute(nest_select_collection_types, ?BIND_BY_NAME, [{<<"key">>, Key2}]), - {ok, SelectCols, [[Key3, List]]} = erlcass:execute(nest_select_collection_types, ?BIND_BY_INDEX, [Key3]), + {ok, SelectCols, [[Key2, List]]} = erlcass:execute(erlcass_conn, nest_select_collection_types, ?BIND_BY_NAME, [{<<"key">>, Key2}]), + {ok, SelectCols, [[Key3, List]]} = erlcass:execute(erlcass_conn, nest_select_collection_types, ?BIND_BY_INDEX, [Key3]), ok. tuples(_Config) -> CreationQ = <<"CREATE TABLE erlang_driver_test.tuples (key varchar, item1 frozen>>, item2 frozen< tuple< tuple > >, PRIMARY KEY(key));">>, - ok = erlcass:query(CreationQ), + ok = erlcass:query(erlcass_conn, CreationQ), Key2 = <<"somekeyhere_2">>, Key3 = <<"somekeyhere_3">>, @@ -330,19 +330,19 @@ tuples(_Config) -> SelectQ = <<"SELECT key, item1, item2 FROM erlang_driver_test.tuples WHERE key = ?">>, SelectCols = [{<<"key">>, text}, {<<"item1">>, {tuple, [text, {list, int}]}}, {<<"item2">>, {tuple, [{tuple, [text, bigint]}]}}], - ok = erlcass:add_prepare_statement(insert_tuple_types, InsertQ), - ok = erlcass:add_prepare_statement(select_tuple_types, SelectQ), + ok = erlcass:add_prepare_statement(erlcass_conn, insert_tuple_types, InsertQ), + ok = erlcass:add_prepare_statement(erlcass_conn, select_tuple_types, SelectQ), - ok = erlcass:execute(insert_tuple_types, ?BIND_BY_NAME, [ + ok = erlcass:execute(erlcass_conn, insert_tuple_types, ?BIND_BY_NAME, [ {<<"key">>, Key2}, {<<"item1">>, Item1}, {<<"item2">>, Item2} ]), - ok = erlcass:execute(insert_tuple_types, [Key3, Item1, Item2]), + ok = erlcass:execute(erlcass_conn, insert_tuple_types, [Key3, Item1, Item2]), - {ok, SelectCols, [[Key2, Item1, Item2]]} = erlcass:execute(select_tuple_types, ?BIND_BY_NAME, [{<<"key">>, Key2}]), - {ok, SelectCols, [[Key3, Item1, Item2]]} = erlcass:execute(select_tuple_types, ?BIND_BY_INDEX, [Key3]), + {ok, SelectCols, [[Key2, Item1, Item2]]} = erlcass:execute(erlcass_conn, select_tuple_types, ?BIND_BY_NAME, [{<<"key">>, Key2}]), + {ok, SelectCols, [[Key3, Item1, Item2]]} = erlcass:execute(erlcass_conn, select_tuple_types, ?BIND_BY_INDEX, [Key3]), ok. test_udt(_Config) -> @@ -363,51 +363,51 @@ test_udt(_Config) -> {<<"phones">>, [<<"33 6 78 90 12 34">>]} ], - ok = erlcass:query(<<"CREATE TYPE erlang_driver_test.address (street text, city text, zip_code int, phones set)">>), - ok = erlcass:query(<<"CREATE TYPE erlang_driver_test.fullname (firstname text, lastname text)">>), - ok = erlcass:query(<<"CREATE TABLE erlang_driver_test.users (id uuid PRIMARY KEY, name frozen , direct_reports set>, addresses map>)">>), + ok = erlcass:query(erlcass_conn, <<"CREATE TYPE erlang_driver_test.address (street text, city text, zip_code int, phones set)">>), + ok = erlcass:query(erlcass_conn, <<"CREATE TYPE erlang_driver_test.fullname (firstname text, lastname text)">>), + ok = erlcass:query(erlcass_conn, <<"CREATE TABLE erlang_driver_test.users (id uuid PRIMARY KEY, name frozen , direct_reports set>, addresses map>)">>), - ok = erlcass:add_prepare_statement(insert_udt, <<"INSERT INTO erlang_driver_test.users (id, name) VALUES (?, ?)">>), - ok = erlcass:add_prepare_statement(update_address_udt, <<"UPDATE erlang_driver_test.users SET addresses = addresses + ? WHERE id=?;">>), - ok = erlcass:add_prepare_statement(select_name_udt, <<"SELECT name FROM erlang_driver_test.users WHERE id=?">>), - ok = erlcass:add_prepare_statement(select_last_name_udt, <<"SELECT name.lastname FROM erlang_driver_test.users WHERE id= ?">>), + ok = erlcass:add_prepare_statement(erlcass_conn, insert_udt, <<"INSERT INTO erlang_driver_test.users (id, name) VALUES (?, ?)">>), + ok = erlcass:add_prepare_statement(erlcass_conn, update_address_udt, <<"UPDATE erlang_driver_test.users SET addresses = addresses + ? WHERE id=?;">>), + ok = erlcass:add_prepare_statement(erlcass_conn, select_name_udt, <<"SELECT name FROM erlang_driver_test.users WHERE id=?">>), + ok = erlcass:add_prepare_statement(erlcass_conn, select_last_name_udt, <<"SELECT name.lastname FROM erlang_driver_test.users WHERE id= ?">>), - ok = erlcass:execute(insert_udt, [UserId, Username]), - ok = erlcass:execute(update_address_udt, [[{<<"home">>, Address}], UserId]), + ok = erlcass:execute(erlcass_conn, insert_udt, [UserId, Username]), + ok = erlcass:execute(erlcass_conn, update_address_udt, [[{<<"home">>, Address}], UserId]), - {ok, _, [[Username]]} = erlcass:execute(select_name_udt, [UserId]), - {ok, _, [[Lname]]} = erlcass:execute(select_last_name_udt, [UserId]). + {ok, _, [[Username]]} = erlcass:execute(erlcass_conn, select_name_udt, [UserId]), + {ok, _, [[Lname]]} = erlcass:execute(erlcass_conn, select_last_name_udt, [UserId]). batches(_Config) -> - ok = erlcass:query(<<"TRUNCATE erlang_driver_test.entries1;">>), + ok = erlcass:query(erlcass_conn, <<"TRUNCATE erlang_driver_test.entries1;">>), InsertStatement = <<"INSERT INTO erlang_driver_test.entries1(id, age, email) VALUES (?, ?, ?)">>, Id2 = <<"id_2">>, Age2 = 12, Email2 = <<"test2@test.com">>, - {ok, Stm1} = erlcass:query_new_statement(<<"INSERT INTO erlang_driver_test.entries1(id, age, email) VALUES ('id_1', 11, 'test1@test.com')">>), - ok = erlcass:add_prepare_statement(insert_prep, InsertStatement), + {ok, Stm1} = erlcass:query_new_statement(erlcass_conn, <<"INSERT INTO erlang_driver_test.entries1(id, age, email) VALUES ('id_1', 11, 'test1@test.com')">>), + ok = erlcass:add_prepare_statement(erlcass_conn, insert_prep, InsertStatement), - {ok, Stm2} = erlcass:bind_prepared_statement(insert_prep), - ok = erlcass:bind_prepared_params_by_name(Stm2, [{<<"id">>, Id2}, {<<"age">>, Age2}, {<<"email">>, Email2}]), + {ok, Stm2} = erlcass:bind_prepared_statement(erlcass_conn, insert_prep), + ok = erlcass:bind_prepared_params_by_name(erlcass_conn, Stm2, [{<<"id">>, Id2}, {<<"age">>, Age2}, {<<"email">>, Email2}]), - ok = erlcass:batch_execute(?CASS_BATCH_TYPE_LOGGED, [Stm1, Stm2], [ + ok = erlcass:batch_execute(erlcass_conn, ?CASS_BATCH_TYPE_LOGGED, [Stm1, Stm2], [ {consistency_level, ?CASS_CONSISTENCY_QUORUM}, {serial_consistency_level, ?CASS_CONSISTENCY_LOCAL_SERIAL} ]), Cols = [{<<"id">>, text}, {<<"age">>, int}, {<<"email">>, text}], - {ok, Cols, Result} = erlcass:query(<<"SELECT id, age, email FROM erlang_driver_test.entries1">>), + {ok, Cols, Result} = erlcass:query(erlcass_conn, <<"SELECT id, age, email FROM erlang_driver_test.entries1">>), ListLength = 2, ListLength = length(Result), ok. erlcass_crash(_Config) -> - exit(whereis(erlcass), kill), + exit(whereis(erlcass_conn), kill), Key3 = <<"somekeyhere_3">>, timer:sleep(2000), - {ok, _, _} = erlcass:execute(select_tuple_types, ?BIND_BY_INDEX, [Key3]), + {ok, _, _} = erlcass:execute(erlcass_conn, select_tuple_types, ?BIND_BY_INDEX, [Key3]), ok. uuid_testing(_Config) -> @@ -431,10 +431,10 @@ date_time_testing(_Config) -> Ts = erlcass_time:date_time_to_epoch(Date, Time). get_metrics(_Config) -> - {ok, _} = erlcass:get_metrics(), + {ok, _} = erlcass:get_metrics(erlcass_conn), ok. drop_keyspace(_Config) -> - ok = erlcass:query(<<"DROP KEYSPACE erlang_driver_test">>). + ok = erlcass:query(erlcass_conn, <<"DROP KEYSPACE erlang_driver_test">>).