diff --git a/include/pika_slot_command.h b/include/pika_slot_command.h index 017fef4d62..0ddb3d9989 100644 --- a/include/pika_slot_command.h +++ b/include/pika_slot_command.h @@ -11,6 +11,7 @@ const std::string SlotKeyPrefix = "_internal:slotkey:4migrate:"; const std::string SlotTagPrefix = "_internal:slottag:4migrate:"; + const size_t MaxKeySendSize = 10 * 1024; extern uint32_t crc32tab[256]; @@ -31,7 +32,6 @@ std::string GetSlotKey(int slot); std::string GetSlotsTagKey(uint32_t crc); int GetSlotsID(const std::string &str, uint32_t *pcrc, int *phastag); void RemSlotKeyByType(const std::string &type, const std::string &key, const std::shared_ptr& slot); -void WriteSAddToBinlog(const std::string &key, const std::string &value, const std::shared_ptr& slot); class PikaMigrate { public: diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index e43f055fc8..c3d000bb9e 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -369,6 +369,13 @@ Status ConsensusCoordinator::Reset(const LogOffset& offset) { Status ConsensusCoordinator::ProposeLog(const std::shared_ptr& cmd_ptr, std::shared_ptr conn_ptr, std::shared_ptr resp_ptr) { + std::vector keys = cmd_ptr->current_key(); + // slotkey shouldn't add binlog + if (cmd_ptr->name() == kCmdNameSAdd && !keys.empty() && + (keys[0].compare(0, SlotKeyPrefix.length(), SlotKeyPrefix) == 0 || keys[0].compare(0, SlotTagPrefix.length(), SlotTagPrefix) == 0)) { + return Status::OK(); + } + LogOffset log_offset; stable_logger_->Logger()->Lock(); diff --git a/src/pika_repl_server_conn.cc b/src/pika_repl_server_conn.cc index 8af5678127..3f8251c0de 100644 --- a/src/pika_repl_server_conn.cc +++ b/src/pika_repl_server_conn.cc @@ -231,8 +231,8 @@ bool PikaReplServerConn::TrySyncOffsetCheck(const std::shared_ptrset_reply_code(InnerMessage::InnerResponse::TrySync::kSyncPointLarger); LOG(WARNING) << "Slave offset is larger than mine, Slave ip: " << node.ip() << ", Slave port: " << node.port() - << ", Slot: " << slot_name << ", filenum: " << slave_boffset.filenum() - << ", pro_offset_: " << slave_boffset.offset(); + << ", Slot: " << slot_name << ", slave filenum: " << slave_boffset.filenum() + << ", slave pro_offset_: " << slave_boffset.offset() << ", local filenum: " << boffset.filenum << ", local pro_offset_: " << boffset.offset; return false; } diff --git a/src/pika_slot_command.cc b/src/pika_slot_command.cc index e8f060941d..4ec3b9918b 100644 --- a/src/pika_slot_command.cc +++ b/src/pika_slot_command.cc @@ -816,7 +816,6 @@ void AddSlotKey(const std::string type, const std::string key, const std::shared LOG(ERROR) << "sadd key[" << key << "] to slotKey[" << slot_key << "] failed, error: " << s.ToString(); return; } - WriteSAddToBinlog(slot_key, members.front(), slot); // if res == 0, indicate the key is existed; may return, // prevent write slot_key success, but write tag_key failed, so always write tag_key @@ -827,24 +826,6 @@ void AddSlotKey(const std::string type, const std::string key, const std::shared LOG(ERROR) << "sadd key[" << key << "] to tagKey[" << tag_key << "] failed, error: " << s.ToString(); return; } - WriteSAddToBinlog(tag_key, members.front(), slot); - } -} - -// write sadd key to binlog for slave -void WriteSAddToBinlog(const std::string &key, const std::string &value, const std::shared_ptr& slot) { - std::shared_ptr cmd_ptr = g_pika_cmd_table_manager->GetCmd("sadd"); - std::unique_ptr args = std::unique_ptr(new PikaCmdArgsType()); - args->emplace_back("SADD"); - args->emplace_back(key); - args->emplace_back(value); - cmd_ptr->Initial(*args, slot->GetDBName()); - - std::shared_ptr sync_slot = - g_pika_rm->GetSyncMasterSlotByName(SlotInfo(slot->GetDBName(), slot->GetSlotID())); - Status s = sync_slot->ConsensusProposeLog(cmd_ptr); - if (!s.ok()) { - LOG(ERROR) << "write sadd key to binlog failed, key: " << key; } } diff --git a/tests/conf/pika.conf b/tests/conf/pika.conf new file mode 100644 index 0000000000..5e6e9ae27e --- /dev/null +++ b/tests/conf/pika.conf @@ -0,0 +1,393 @@ +########################### +# Pika configuration file # +########################### + +# Pika port, the default value is 9221. +# [NOTICE] Port Magic offsets of port+1000 / port+2000 are used by Pika at present. +# Port 10221 is used for Rsync, and port 11221 is used for Replication, while the listening port is 9221. +port : 9221 + +# Random value identifying the Pika server, its string length must be 40. +# If not set, Pika will generate a random string with a length of 40 random characters. +# run-id: + +# The number of threads for running Pika. +# It's not recommended to set this value exceeds +# the number of CPU cores on the deployment server. +thread-num : 1 + +# Size of the thread pool, The threads within this pool +# are dedicated to handling user requests. +thread-pool-size : 12 + +# The number of sync-thread for data replication from master, those are the threads work on slave nodes +# and are used to execute commands sent from master node when replicating. +sync-thread-num : 6 + +# Directory to store log files of Pika, which contains multiple types of logs, +# Including: INFO, WARNING, ERROR log, as well as binglog(write2fine) file which +# is used for replication. +log-path : ./log/ + +# Directory to store the data of Pika. +db-path : ./db/ + +# The size of a single RocksDB memtable at the Pika's bottom layer(Pika use RocksDB to store persist data). +# [Tip] Big write-buffer-size can improve writing performance, +# but this will generate heavier IO load when flushing from buffer to disk, +# you should configure it based on you usage scenario. +# Supported Units [K|M|G], write-buffer-size default unit is in [bytes]. +write-buffer-size : 256M + +# The size of one block in arena memory allocation. +# If <= 0, a proper value is automatically calculated. +# (usually 1/8 of writer-buffer-size, rounded up to a multiple of 4KB) +# Supported Units [K|M|G], arena-block-size default unit is in [bytes]. +arena-block-size : + +# Timeout of Pika's connection, counting down starts When there are no requests +# on a connection (it enters sleep state), when the countdown reaches 0, the connection +# will be closed by Pika. +# [Tip] The issue of running out of Pika's connections may be avoided if this value +# is configured properly. +# The Unit of timeout is in [seconds] and its default value is 60(s). +timeout : 60 + +# The [password of administrator], which is empty by default. +# [NOTICE] If this admin password is the same as user password (including both being empty), +# the value of userpass will be ignored and all users are considered as administrators, +# in this scenario, users are not subject to the restrictions imposed by the userblacklist. +# PS: "user password" refers to value of the parameter below: userpass. +requirepass : + +# Password for replication verify, used for authentication when a slave +# connects to a master to request replication. +# [NOTICE] The value of this parameter must match the "requirepass" setting on the master. +masterauth : + +# The [password of user], which is empty by default. +# [NOTICE] If this user password is the same as admin password (including both being empty), +# the value of this parameter will be ignored and all users are considered as administrators, +# in this scenario, users are not subject to the restrictions imposed by the userblacklist. +# PS: "admin password" refers to value of the parameter above: requirepass. +userpass : + +# The blacklist of commands for users that logged in by userpass, +# the commands that added to this list will not be available for users except for administrator. +# [Advice] It's recommended to add high-risk commands to this list. +# [Format] Commands should be separated by ",". For example: FLUSHALL, SHUTDOWN, KEYS, CONFIG +# By default, this list is empty. +userblacklist : + +# Running Mode of Pika, The current version only supports running in "classic mode". +# If set to 'classic', Pika will create multiple DBs whose number is the value of configure item "databases". +instance-mode : classic + +# The number of databases when Pika runs in classic mode. +# The default database id is DB 0. You can select a different one on +# a per-connection by using SELECT. The db id range is [0, 'databases' value -1]. +# The value range of this parameter is [1, 8]. +databases : 1 + +# The number of followers of a master. Only [0, 1, 2, 3, 4] is valid at present. +# By default, this num is set to 0, which means this feature is [not enabled] +# and the Pika runs in standalone mode. +replication-num : 0 + +# consensus level defines the num of confirms(ACKs) the leader node needs to receive from +# follower nodes before returning the result to the client that sent the request. +# The [value range] of this parameter is: [0, ...replicaiton-num]. +# The default value of consensus-level is 0, which means this feature is not enabled. +consensus-level : 0 + +# The Prefix of dump file's name. +# All the files that generated by command "bgsave" will be name with this prefix. +dump-prefix : + +# daemonize [yes | no]. +daemonize : yes + +# The directory to stored dump files that generated by command "bgsave". +dump-path : ./dump/ + +# TTL of dump files that generated by command "bgsave". +# Any dump files which exceed this TTL will be deleted. +# Unit of dump-expire is in [days] and the default value is 0(day), +# which means dump files never expire. +dump-expire : 0 + +# Pid file Path of Pika. +pidfile : ./pika.pid + +# The Maximum number of Pika's Connection. +maxclients : 20000 + +# The size of sst file in RocksDB(Pika is based on RocksDB). +# sst files are hierarchical, the smaller the sst file size, the higher the performance and the lower the merge cost, +# the price is that the number of sst files could be huge. On the contrary, the bigger the sst file size, the lower +# the performance and the higher the merge cost, while the number of files is fewer. +# Supported Units [K|M|G], target-file-size-base default unit is in [bytes] and the default value is 20M. +target-file-size-base : 20M + +# Expire-time of binlog(write2file) files that stored within log-path. +# Any binlog(write2file) files that exceed this expire time will be cleaned up. +# The unit of expire-logs-days is in [days] and the default value is 7(days). +# The [Minimum value] of this parameter is 1(day). +expire-logs-days : 7 + +# The maximum number of binlog(write2file) files. +# Once the total number of binlog files exceed this value, +# automatic cleaning will start to ensure the maximum number +# of binlog files is equal to expire-logs-nums. +# The [Minimum value] of this parameter is 10. +expire-logs-nums : 10 + +# The number of guaranteed connections for root user. +# This parameter guarantees that there are 2(By default) connections available +# for root user to log in Pika from 127.0.0.1, even if the maximum connection limit is reached. +# PS: The maximum connection refers to the parameter above: maxclients. +# The default value of root-connection-num is 2. +root-connection-num : 2 + +# Slowlog-write-errorlog +slowlog-write-errorlog : no + +# The time threshold for slow log recording. +# Any command whose execution time exceeds this threshold will be recorded in pika-ERROR.log, +# which is stored in log-path. +# The unit of slowlog-log-slower-than is in [microseconds(μs)] and the default value is 10000 μs / 10 ms. +slowlog-log-slower-than : 10000 + +# Slowlog-max-len +slowlog-max-len : 128 + +# Pika db sync path +db-sync-path : ./dbsync/ + +# The maximum Transmission speed during full synchronization. +# The exhaustion of network can be prevented by setting this parameter properly. +# The value range of this parameter is [1,1024] with unit in [MB/s]. +# [NOTICE] If this parameter is set to an invalid value(smaller than 0 or bigger than 1024), +# it will be automatically reset to 1024. +# The default value of db-sync-speed is -1 (1024MB/s). +db-sync-speed : -1 + +# The priority of slave node when electing new master node. +# The slave node with [lower] value of slave-priority will have [higher priority] to be elected as the new master node. +# This parameter is only used in conjunction with sentinel and serves no other purpose. +# The default value of slave-priority is 100. +slave-priority : 100 + +# Specify network interface that work with Pika. +#network-interface : eth1 + +# The IP and port of the master node are specified by this parameter for +# replication between master and slaves. +# [Format] is "ip:port" , for example: "192.168.1.2:6666" indicates that +# the slave instances that configured with this value will automatically send +# SLAVEOF command to port 6666 of 192.168.1.2 after startup. +# This parameter should be configured on slave nodes. +#slaveof : master-ip:master-port + + +# Daily/Weekly Automatic full compaction task is configured by compact-cron. +# +# [Format-daily]: start time(hour)-end time(hour)/disk-free-space-ratio, +# example: with value of "02-04/60", Pika will perform full compaction task between 2:00-4:00 AM everyday if +# the disk-free-size / disk-size > 60%. +# +# [Format-weekly]: week/start time(hour)-end time(hour)/disk-free-space-ratio, +# example: with value of "3/02-04/60", Pika will perform full compaction task between 2:00-4:00 AM every Wednesday if +# the disk-free-size / disk-size > 60%. +# +# [Tip] Automatic full compaction is suitable for scenarios with multiple data structures +# and lots of items are expired or deleted, or key names are frequently reused. +# +# [NOTICE]: If compact-interval is set, compact-cron will be masked and disabled. +# +#compact-cron : 3/02-04/60 + + +# Automatic full synchronization task between a time interval is configured by compact-interval. +# [Format]: time interval(hour)/disk-free-space-ratio, example: "6/60", Pika will perform full compaction every 6 hours, +# if the disk-free-size / disk-size > 60%. +# [NOTICE]: compact-interval is prior than compact-cron. +#compact-interval : + +# This window-size determines the amount of data that can be transmitted in a single synchronization process. +# [Tip] In the scenario of high network latency. Increasing this size can improve synchronization efficiency. +# Its default value is 9000. the [maximum] value is 90000. +sync-window-size : 9000 + +# Maximum buffer size of a client connection. +# Only three values are valid here: [67108864(64MB) | 268435456(256MB) | 536870912(512MB)]. +# [NOTICE] Master and slaves must have exactly the same value for the max-conn-rbuf-size. +# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). +max-conn-rbuf-size : 268435456 + + +#######################################################################E####### +#! Critical Settings !# +#######################################################################E####### + +# write_binlog [yes | no] +write-binlog : yes + +# The size of binlog file, which can not be modified once Pika instance started. +# [NOTICE] Master and slaves must have exactly the same value for the binlog-file-size. +# The [value range] of binlog-file-size is [1K, 2G]. +# Supported Units [K|M|G], binlog-file-size default unit is in [bytes] and the default value is 100M. +binlog-file-size : 104857600 + +# Automatically triggers a small compaction according to statistics +# Use the cache to store up to 'max-cache-statistic-keys' keys +# If 'max-cache-statistic-keys' set to '0', that means turn off the statistics function +# and this automatic small compaction feature is disabled. +max-cache-statistic-keys : 0 + +# When 'delete' or 'overwrite' a specific multi-data structure key 'small-compaction-threshold' times, +# a small compact is triggered automatically if the small compaction feature is enabled. +# small-compaction-threshold default value is 5000 and the value range is [1, 100000]. +small-compaction-threshold : 5000 + +# The maximum total size of all live memtables of the RocksDB instance that owned by Pika. +# Flushing from memtable to disk will be triggered if the actual memory usage of RocksDB +# exceeds max-write-buffer-size when next write operation is issued. +# [RocksDB-Basic-Tuning](/~https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning) +# Supported Units [K|M|G], max-write-buffer-size default unit is in [bytes]. +max-write-buffer-size : 10737418240 + +# The maximum number of write buffers(memtables) that are built up in memory for one ColumnFamily in DB. +# The default and the minimum number is 2. It means that Pika(RocksDB) will write to a write buffer +# when it flushes the data of another write buffer to storage. +# If max-write-buffer-num > 3, writing will be slowed down. +max-write-buffer-num : 2 + +# The maximum size of the response package to client to prevent memory +# exhaustion caused by commands like 'keys *' and 'Scan' which can generate huge response. +# Supported Units [K|M|G]. The default unit is in [bytes]. +max-client-response-size : 1073741824 + +# The compression algorithm. You can not change it when Pika started. +# Supported types: [snappy, zlib, lz4, zstd]. If you do not wanna compress the SST file, please set its value as none. +# [NOTICE] The Pika official binary release just linking the snappy library statically, which means that +# you should compile the Pika from the source code and then link it with other compression algorithm library statically by yourself. +compression : snappy + +# if the vector size is smaller than the level number, the undefined lower level uses the +# last option in the configurable array, for example, for 3 level +# LSM tree the following settings are the same: +# configurable array: [none:snappy] +# LSM settings: [none:snappy:snappy] +# When this configurable is enabled, compression is ignored, +# default l0 l1 noCompression, l2 and more use `compression` option +# /~https://github.com/facebook/rocksdb/wiki/Compression +#compression_per_level : [none:none:snappy:lz4:lz4] + +# The number of background flushing threads. +# max-background-flushes default value is 1 and the value range is [1, 4]. +max-background-flushes : 1 + +# The number of background compacting threads. +# max-background-compactions default value is 2 and the value range is [1, 8]. +max-background-compactions : 2 + +# maximum value of RocksDB cached open file descriptors +max-cache-files : 5000 + +# The ratio between the total size of RocksDB level-(L+1) files and the total size of RocksDB level-L files for all L. +# Its default value is 10(x). You can also change it to 5(x). +max-bytes-for-level-multiplier : 10 + +# slotmigrate is mainly used to migrate slots, usually we will set it to no. +# When you migrate slots, you need to set it to yes, and reload slotskeys before. +# slotmigrate [yes | no] +slotmigrate : no + +# BlockBasedTable block_size, default 4k +# block-size: 4096 + +# block LRU cache, default 8M, 0 to disable +# Supported Units [K|M|G], default unit [bytes] +# block-cache: 8388608 + +# num-shard-bits default -1, the number of bits from cache keys to be use as shard id. +# The cache will be sharded into 2^num_shard_bits shards. +# /~https://github.com/EighteenZi/rocksdb_wiki/blob/master/Block-Cache.md#lru-cache +# num-shard-bits: -1 + +# whether the block cache is shared among the RocksDB instances, default is per CF +# share-block-cache: no + +# The slot number of pika when used with codis. +default-slot-num : 1024 + +# whether or not index and filter blocks is stored in block cache +# cache-index-and-filter-blocks: no + +# pin_l0_filter_and_index_blocks_in_cache [yes | no] +# When `cache-index-and-filter-blocks` is enabled, `pin_l0_filter_and_index_blocks_in_cache` is suggested to be enabled +# pin_l0_filter_and_index_blocks_in_cache : no + +# when set to yes, bloomfilter of the last level will not be built +# optimize-filters-for-hits: no +# /~https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#levels-target-size +# level-compaction-dynamic-level-bytes: no + +################################## RocksDB Rate Limiter ####################### +# rocksdb rate limiter +# https://rocksdb.org/blog/2017/12/18/17-auto-tuned-rate-limiter.html +# /~https://github.com/EighteenZi/rocksdb_wiki/blob/master/Rate-Limiter.md +#######################################################################E####### + +# rate limiter bandwidth, default 200MB +#rate-limiter-bandwidth : 209715200 + +#rate-limiter-refill-period-us : 100000 +# +#rate-limiter-fairness: 10 + +# rate limiter auto tune https://rocksdb.org/blog/2017/12/18/17-auto-tuned-rate-limiter.html. the default value is false. +#rate-limiter-auto-tuned : true + +################################## RocksDB Blob Configure ##################### +# rocksdb blob configure +# https://rocksdb.org/blog/2021/05/26/integrated-blob-db.html +# wiki /~https://github.com/facebook/rocksdb/wiki/BlobDB +#######################################################################E####### + +# enable rocksdb blob, default no +# enable-blob-files : yes + +# values at or above this threshold will be written to blob files during flush or compaction. +# Supported Units [K|M|G], default unit is in [bytes]. +# min-blob-size : 4K + +# the size limit for blob files +# Supported Units [K|M|G], default unit is in [bytes]. +# blob-file-size : 256M + +# the compression type to use for blob files. All blobs in the same file are compressed using the same algorithm. +# Supported types: [snappy, zlib, lz4, zstd]. If you do not wanna compress the SST file, please set its value as none. +# [NOTICE] The Pika official binary release just link the snappy library statically, which means that +# you should compile the Pika from the source code and then link it with other compression algorithm library statically by yourself. +# blob-compression-type : lz4 + +# set this to open to make BlobDB actively relocate valid blobs from the oldest blob files as they are encountered during compaction. +# The value option is [yes | no] +# enable-blob-garbage-collection : no + +# the cutoff that the GC logic uses to determine which blob files should be considered “old“. +# This parameter can be tuned to adjust the trade-off between write amplification and space amplification. +# blob-garbage-collection-age-cutoff : 0.25 + +# if the ratio of garbage in the oldest blob files exceeds this threshold, +# targeted compactions are scheduled in order to force garbage collecting the blob files in question +# blob_garbage_collection_force_threshold : 1.0 + +# the Cache object to use for blobs, default not open +# blob-cache : 0 + +# blob-num-shard-bits default -1, the number of bits from cache keys to be use as shard id. +# The cache will be sharded into 2^blob-num-shard-bits shards. +# blob-num-shard-bits : -1 diff --git a/tests/integration/pika_replication_test.py b/tests/integration/pika_replication_test.py index 5813be6544..84db8ff413 100644 --- a/tests/integration/pika_replication_test.py +++ b/tests/integration/pika_replication_test.py @@ -95,7 +95,7 @@ def test_del_replication(): # print(m_keys) # print(s_keys) # Check if the keys in the master and slave are the same - assert set(m_keys) == set(s_keys), f'Expected: s_keys == m_keys, but got {s_keys} != {m_keys}' + assert set(m_keys) == set(s_keys), f'Expected: s_keys == m_keys, but got s_keys: {s_keys}, m_keys: {m_keys}' lists_ = ['blist1', 'blist2', 'blist3'] for this_list in lists_: @@ -371,7 +371,7 @@ def rpoplpush_thread1(): time.sleep(10) m_keys = master.keys() s_keys = slave.keys() - assert s_keys == m_keys, f'Expected: s_keys == m_keys, but got {s_keys == m_keys}' + assert s_keys == m_keys, f'Expected: s_keys == m_keys, but got s_keys: {s_keys}, m_keys: {m_keys}' for i in range(0, master.llen('blist')): # print(master.lindex('blist', i)) @@ -781,6 +781,109 @@ def random_pfmerge_thread(): assert m_hll_out == s_hll_out, f'Expected: hll_out on master == hll_out on slave, but got hll_out on slave:{s_hll_out}, hll_out on master:{m_hll_out}' print("test_pfmerge_replication OK [✓]") +def test_migrateslot_replication(): + print("start test_migrateslot_replication") + master = redis.Redis(host=master_ip, port=int(master_port), db=0) + slave = redis.Redis(host=slave_ip, port=int(slave_port), db=0) + + # open slot migrate + master.config_set("slotmigrate", "yes") + slave.config_set("slotmigrate", "no") + + setKey1 = "setKey_000" + setKey2 = "setKey_001" + setKey3 = "setKey_002" + setKey4 = "setKey_store" + + slave.slaveof(master_ip, master_port) + time.sleep(1) + + master.delete(setKey1) + master.delete(setKey2) + master.delete(setKey3) + master.delete(setKey4) + + letters = string.ascii_letters + master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) + master.sdiffstore(setKey4, setKey1, setKey2) + + time.sleep(10) + + m_set1 = master.smembers(setKey1) + m_set2 = master.smembers(setKey2) + m_set3 = master.smembers(setKey3) + m_dest_set = master.smembers(setKey4) + s_set1 = slave.smembers(setKey1) + s_set2 = slave.smembers(setKey2) + s_set3 = slave.smembers(setKey3) + s_dest_set = slave.smembers(setKey4) + + assert m_set1 == s_set1, f'Expected: set1 on master == set1 on slave, but got set1 on slave:{s_set1}, set1 on master:{m_set1}' + assert m_set2 == s_set2, f'Expected: set2 on master == set2 on slave, but got set2 on slave:{s_set2}, set2 on master:{m_set2}' + assert m_set3 == s_set3, f'Expected: set3 on master == set3 on slave, but got set3 on slave:{s_set3}, set3 on master:{m_set3}' + assert m_dest_set == s_dest_set, f'Expected: dest_set on master == dest_set on slave, but got dest_set on slave:{s_dest_set}, dest_set on master:{m_dest_set}' + + # disconnect mster and slave + slave.slaveof("no", "one") + + master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey2, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey1, ''.join(random.choice(letters) for _ in range(5))) + master.sadd(setKey3, ''.join(random.choice(letters) for _ in range(5))) + master.sdiffstore(setKey4, setKey1, setKey2) + + # reconnect mster and slave + slave.slaveof(master_ip, master_port) + time.sleep(25) + + m_set1 = master.smembers(setKey1) + m_set2 = master.smembers(setKey2) + m_set3 = master.smembers(setKey3) + m_dest_set = master.smembers(setKey4) + s_set1 = slave.smembers(setKey1) + s_set2 = slave.smembers(setKey2) + s_set3 = slave.smembers(setKey3) + s_dest_set = slave.smembers(setKey4) + + assert m_set1 == s_set1, f'Expected: set1 on master == set1 on slave, but got set1 on slave:{s_set1}, set1 on master:{m_set1}' + assert m_set2 == s_set2, f'Expected: set2 on master == set2 on slave, but got set2 on slave:{s_set2}, set2 on master:{m_set2}' + assert m_set3 == s_set3, f'Expected: set3 on master == set3 on slave, but got set3 on slave:{s_set3}, set3 on master:{m_set3}' + assert m_dest_set == s_dest_set, f'Expected: dest_set on master == dest_set on slave, but got dest_set on slave:{s_dest_set}, dest_set on master:{m_dest_set}' + + # slave node should not has slot key + s_keys = slave.keys() + for key in s_keys: + assert not (str(key).startswith("_internal:slotkey:4migrate:") or str(key).startswith("_internal:slottag:4migrate:")), f'Expected: slave should not has slot key, but got {key}' + + master.config_set("slotmigrate", "no") + print("test_migrateslot_replication OK [✓]") master_ip = '127.0.0.1' master_port = '9221' @@ -804,6 +907,7 @@ def random_pfmerge_thread(): test_zunionstore_replication() test_zinterstore_replication() test_sunionstore_replication() +test_migrateslot_replication() # ---------For Github Action---------End diff --git a/tests/integration/start_master_and_slave.sh b/tests/integration/start_master_and_slave.sh index 5051a27af3..80b054176d 100644 --- a/tests/integration/start_master_and_slave.sh +++ b/tests/integration/start_master_and_slave.sh @@ -1,8 +1,8 @@ #!/bin/bash # This script is used by .github/workflows/pika.yml, Do not modify this file unless you know what you are doing. # it's used to start pika master and slave, running path: build -cp ../conf/pika.conf ./pika_master.conf -cp ../conf/pika.conf ./pika_slave.conf +cp ../tests/conf/pika.conf ./pika_master.conf +cp ../tests/conf/pika.conf ./pika_slave.conf mkdir slave_data sed -i -e 's|databases : 1|databases : 2|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_master.conf sed -i -e 's|databases : 1|databases : 2|' -e 's|port : 9221|port : 9231|' -e 's|log-path : ./log/|log-path : ./slave_data/log/|' -e 's|db-path : ./db/|db-path : ./slave_data/db/|' -e 's|dump-path : ./dump/|dump-path : ./slave_data/dump/|' -e 's|pidfile : ./pika.pid|pidfile : ./slave_data/pika.pid|' -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_data/dbsync/|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_slave.conf