Skip to content

Commit

Permalink
fix result wrong when commands call redis interface (#2596)
Browse files Browse the repository at this point in the history
Co-authored-by: chejinge <chejinge@360.cn>
  • Loading branch information
chejinge and brother-jin authored Apr 15, 2024
1 parent b58beb0 commit b102635
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 0 deletions.
14 changes: 14 additions & 0 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ Status Redis::HDel(const Slice& key, const std::vector<std::string>& fields, int
std::string meta_value;
int32_t del_cnt = 0;
uint64_t version = 0;
ScopeRecordLock l(lock_mgr_, key);
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;

Expand Down Expand Up @@ -280,6 +281,7 @@ Status Redis::HGetallWithTTL(const Slice& key, std::vector<FieldValue>* fvs, int
Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -357,6 +359,7 @@ Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64
Status Redis::HIncrbyfloat(const Slice& key, const Slice& field, const Slice& by, std::string* new_value) {
new_value->clear();
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -550,6 +553,7 @@ Status Redis::HMSet(const Slice& key, const std::vector<FieldValue>& fvs) {
}

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
std::string meta_value;
Expand Down Expand Up @@ -613,6 +617,7 @@ Status Redis::HMSet(const Slice& key, const std::vector<FieldValue>& fvs) {

Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int32_t* res) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -677,6 +682,7 @@ Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int

Status Redis::HSetnx(const Slice& key, const Slice& field, const Slice& value, int32_t* ret) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
std::string meta_value;
Expand Down Expand Up @@ -1024,6 +1030,8 @@ Status Redis::PKHRScanRange(const Slice& key, const Slice& field_start, const st

Status Redis::HashesExpire(const Slice& key, int64_t ttl) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1047,6 +1055,8 @@ Status Redis::HashesExpire(const Slice& key, int64_t ttl) {

Status Redis::HashesDel(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1067,6 +1077,8 @@ Status Redis::HashesDel(const Slice& key) {

Status Redis::HashesExpireat(const Slice& key, int64_t timestamp) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1089,6 +1101,8 @@ Status Redis::HashesExpireat(const Slice& key, int64_t timestamp) {

Status Redis::HashesPersist(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand Down
17 changes: 17 additions & 0 deletions src/storage/src/redis_lists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Status Redis::LInsert(const Slice& key, const BeforeOrAfter& before_or_after, co
const std::string& value, int64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
std::string meta_value;

BaseMetaKey base_meta_key(key);
Expand Down Expand Up @@ -265,6 +266,7 @@ Status Redis::LPop(const Slice& key, int64_t count, std::vector<std::string>* el
elements->clear();

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

std::string meta_value;

Expand Down Expand Up @@ -310,6 +312,7 @@ Status Redis::LPop(const Slice& key, int64_t count, std::vector<std::string>* el
Status Redis::LPush(const Slice& key, const std::vector<std::string>& values, uint64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t index = 0;
uint64_t version = 0;
Expand Down Expand Up @@ -357,6 +360,7 @@ Status Redis::LPush(const Slice& key, const std::vector<std::string>& values, ui
Status Redis::LPushx(const Slice& key, const std::vector<std::string>& values, uint64_t* len) {
*len = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

std::string meta_value;

Expand Down Expand Up @@ -501,6 +505,7 @@ Status Redis::LRangeWithTTL(const Slice& key, int64_t start, int64_t stop, std::
Status Redis::LRem(const Slice& key, int64_t count, const Slice& value, uint64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
std::string meta_value;

BaseMetaKey base_meta_key(key);
Expand Down Expand Up @@ -622,6 +627,7 @@ Status Redis::LRem(const Slice& key, int64_t count, const Slice& value, uint64_t

Status Redis::LSet(const Slice& key, int64_t index, const Slice& value) {
uint32_t statistic = 0;
ScopeRecordLock l(lock_mgr_, key);
std::string meta_value;

BaseMetaKey base_meta_key(key);
Expand Down Expand Up @@ -653,6 +659,7 @@ Status Redis::LSet(const Slice& key, int64_t index, const Slice& value) {

Status Redis::LTrim(const Slice& key, int64_t start, int64_t stop) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint32_t statistic = 0;
std::string meta_value;
Expand Down Expand Up @@ -716,6 +723,7 @@ Status Redis::RPop(const Slice& key, int64_t count, std::vector<std::string>* el
elements->clear();

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

std::string meta_value;

Expand Down Expand Up @@ -763,6 +771,7 @@ Status Redis::RPoplpush(const Slice& source, const Slice& destination, std::stri
uint32_t statistic = 0;
Status s;
rocksdb::WriteBatch batch;
MultiScopeRecordLock l(lock_mgr_, {source.ToString(), destination.ToString()});
if (source.compare(destination) == 0) {
std::string meta_value;
BaseMetaKey base_source(source);
Expand Down Expand Up @@ -928,6 +937,7 @@ Status Redis::RPushx(const Slice& key, const std::vector<std::string>& values, u
*len = 0;
rocksdb::WriteBatch batch;

ScopeRecordLock l(lock_mgr_, key);
std::string meta_value;

BaseMetaKey base_meta_key(key);
Expand Down Expand Up @@ -958,6 +968,8 @@ Status Redis::RPushx(const Slice& key, const std::vector<std::string>& values, u

Status Redis::ListsExpire(const Slice& key, int64_t ttl) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kListsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -981,6 +993,8 @@ Status Redis::ListsExpire(const Slice& key, int64_t ttl) {

Status Redis::ListsDel(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kListsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1001,6 +1015,8 @@ Status Redis::ListsDel(const Slice& key) {

Status Redis::ListsExpireat(const Slice& key, int64_t timestamp) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kListsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1023,6 +1039,7 @@ Status Redis::ListsExpireat(const Slice& key, int64_t timestamp) {

Status Redis::ListsPersist(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kListsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand Down
16 changes: 16 additions & 0 deletions src/storage/src/redis_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ rocksdb::Status Redis::SAdd(const Slice& key, const std::vector<std::string>& me
}

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
uint64_t version = 0;
std::string meta_value;

Expand Down Expand Up @@ -282,6 +283,7 @@ rocksdb::Status Redis::SDiffstore(const Slice& destination, const std::vector<st

std::string meta_value;
uint64_t version = 0;
ScopeRecordLock l(lock_mgr_, destination);
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;
std::vector<KeyVersion> vaild_sets;
Expand Down Expand Up @@ -462,6 +464,7 @@ rocksdb::Status Redis::SInterstore(const Slice& destination, const std::vector<s
std::string meta_value;
uint64_t version = 0;
bool have_invalid_sets = false;
ScopeRecordLock l(lock_mgr_, destination);
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;
std::vector<KeyVersion> vaild_sets;
Expand Down Expand Up @@ -684,6 +687,8 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons
uint32_t statistic = 0;
std::string meta_value;
std::vector<std::string> keys{source.ToString(), destination.ToString()};
MultiScopeRecordLock ml(lock_mgr_, keys);

if (source == destination) {
*ret = 1;
return rocksdb::Status::OK();
Expand Down Expand Up @@ -775,6 +780,7 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector<std::string>* members,

std::string meta_value;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t start_us = pstd::NowMicros();

Expand Down Expand Up @@ -880,6 +886,7 @@ rocksdb::Status Redis::SRandmember(const Slice& key, int32_t count, std::vector<

std::string meta_value;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
std::vector<int32_t> targets;
std::unordered_set<int32_t> unique;

Expand Down Expand Up @@ -942,6 +949,7 @@ rocksdb::Status Redis::SRandmember(const Slice& key, int32_t count, std::vector<
rocksdb::Status Redis::SRem(const Slice& key, const std::vector<std::string>& members, int32_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -1192,6 +1200,8 @@ rocksdb::Status Redis::SScan(const Slice& key, int64_t cursor, const std::string

rocksdb::Status Redis::SetsExpire(const Slice& key, int64_t ttl) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
rocksdb::Status s = db_->Get(default_read_options_, handles_[kSetsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1215,6 +1225,8 @@ rocksdb::Status Redis::SetsExpire(const Slice& key, int64_t ttl) {

rocksdb::Status Redis::SetsDel(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
rocksdb::Status s = db_->Get(default_read_options_, handles_[kSetsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1235,6 +1247,8 @@ rocksdb::Status Redis::SetsDel(const Slice& key) {

rocksdb::Status Redis::SetsExpireat(const Slice& key, int64_t timestamp) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
rocksdb::Status s = db_->Get(default_read_options_, handles_[kSetsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1257,6 +1271,8 @@ rocksdb::Status Redis::SetsExpireat(const Slice& key, int64_t timestamp) {

rocksdb::Status Redis::SetsPersist(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
rocksdb::Status s = db_->Get(default_read_options_, handles_[kSetsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand Down
Loading

0 comments on commit b102635

Please sign in to comment.