Skip to content

Commit

Permalink
feat:incr send binlog withttl (#2833)
Browse files Browse the repository at this point in the history
Co-authored-by: chejinge <chejinge@360.cn>
  • Loading branch information
chejinge and brother-jin authored Aug 7, 2024
1 parent 4588e26 commit ada4ecf
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 40 deletions.
9 changes: 9 additions & 0 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class IncrCmd : public Cmd {
int64_t new_value_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int64_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

class IncrbyCmd : public Cmd {
Expand All @@ -138,6 +140,8 @@ class IncrbyCmd : public Cmd {
int64_t by_ = 0, new_value_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int64_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

class IncrbyfloatCmd : public Cmd {
Expand All @@ -161,6 +165,8 @@ class IncrbyfloatCmd : public Cmd {
double by_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int64_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

class DecrCmd : public Cmd {
Expand Down Expand Up @@ -251,8 +257,11 @@ class AppendCmd : public Cmd {
private:
std::string key_;
std::string value_;
std::string new_value_;
void DoInitial() override;
rocksdb::Status s_;
int64_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

class MgetCmd : public Cmd {
Expand Down
111 changes: 107 additions & 4 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ void IncrCmd::DoInitial() {
}

void IncrCmd::Do() {
s_ = db_->storage()->Incrby(key_, 1, &new_value_);
s_ = db_->storage()->Incrby(key_, 1, &new_value_, &expired_timestamp_sec_);
if (s_.ok()) {
res_.AppendContent(":" + std::to_string(new_value_));
AddSlotKey("k", key_, db_);
Expand All @@ -280,6 +280,32 @@ void IncrCmd::DoUpdateCache() {
}
}

std::string IncrCmd::ToRedisProtocol() {
std::string content;
content.reserve(RAW_ARGS_LEN);
RedisAppendLen(content, 4, "*");

// to pksetexat cmd
std::string pksetexat_cmd("pksetexat");
RedisAppendLenUint64(content, pksetexat_cmd.size(), "$");
RedisAppendContent(content, pksetexat_cmd);
// key
RedisAppendLenUint64(content, key_.size(), "$");
RedisAppendContent(content, key_);
// time_stamp
char buf[100];
auto time_stamp = expired_timestamp_sec_;
pstd::ll2string(buf, sizeof(buf), time_stamp);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);
// value
std::string new_value_str = std::to_string(new_value_);
RedisAppendLenUint64(content, new_value_str.size(), "$");
RedisAppendContent(content, new_value_str);
return content;
}

void IncrbyCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameIncrby);
Expand All @@ -293,7 +319,7 @@ void IncrbyCmd::DoInitial() {
}

void IncrbyCmd::Do() {
s_ = db_->storage()->Incrby(key_, by_, &new_value_);
s_ = db_->storage()->Incrby(key_, by_, &new_value_, &expired_timestamp_sec_);
if (s_.ok()) {
res_.AppendContent(":" + std::to_string(new_value_));
AddSlotKey("k", key_, db_);
Expand All @@ -318,6 +344,32 @@ void IncrbyCmd::DoUpdateCache() {
}
}

std::string IncrbyCmd::ToRedisProtocol() {
std::string content;
content.reserve(RAW_ARGS_LEN);
RedisAppendLen(content, 4, "*");

// to pksetexat cmd
std::string pksetexat_cmd("pksetexat");
RedisAppendLenUint64(content, pksetexat_cmd.size(), "$");
RedisAppendContent(content, pksetexat_cmd);
// key
RedisAppendLenUint64(content, key_.size(), "$");
RedisAppendContent(content, key_);
// time_stamp
char buf[100];
auto time_stamp = expired_timestamp_sec_;
pstd::ll2string(buf, sizeof(buf), time_stamp);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);
// value
std::string new_value_str = std::to_string(new_value_);
RedisAppendLenUint64(content, new_value_str.size(), "$");
RedisAppendContent(content, new_value_str);
return content;
}

void IncrbyfloatCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameIncrbyfloat);
Expand All @@ -332,7 +384,7 @@ void IncrbyfloatCmd::DoInitial() {
}

void IncrbyfloatCmd::Do() {
s_ = db_->storage()->Incrbyfloat(key_, value_, &new_value_);
s_ = db_->storage()->Incrbyfloat(key_, value_, &new_value_, &expired_timestamp_sec_);
if (s_.ok()) {
res_.AppendStringLenUint64(new_value_.size());
res_.AppendContent(new_value_);
Expand Down Expand Up @@ -361,6 +413,32 @@ void IncrbyfloatCmd::DoUpdateCache() {
}
}

std::string IncrbyfloatCmd::ToRedisProtocol() {
std::string content;
content.reserve(RAW_ARGS_LEN);
RedisAppendLen(content, 4, "*");

// to pksetexat cmd
std::string pksetexat_cmd("pksetexat");
RedisAppendLenUint64(content, pksetexat_cmd.size(), "$");
RedisAppendContent(content, pksetexat_cmd);
// key
RedisAppendLenUint64(content, key_.size(), "$");
RedisAppendContent(content, key_);
// time_stamp
char buf[100];
auto time_stamp = expired_timestamp_sec_;
pstd::ll2string(buf, sizeof(buf), time_stamp);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);
// value
RedisAppendLenUint64(content, new_value_.size(), "$");
RedisAppendContent(content, new_value_);
return content;
}


void DecrCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameDecr);
Expand Down Expand Up @@ -480,7 +558,7 @@ void AppendCmd::DoInitial() {

void AppendCmd::Do() {
int32_t new_len = 0;
s_ = db_->storage()->Append(key_, value_, &new_len);
s_ = db_->storage()->Append(key_, value_, &new_len, &expired_timestamp_sec_, new_value_);
if (s_.ok() || s_.IsNotFound()) {
res_.AppendInteger(new_len);
AddSlotKey("k", key_, db_);
Expand All @@ -501,6 +579,31 @@ void AppendCmd::DoUpdateCache() {
}
}

std::string AppendCmd::ToRedisProtocol() {
std::string content;
content.reserve(RAW_ARGS_LEN);
RedisAppendLen(content, 4, "*");

// to pksetexat cmd
std::string pksetexat_cmd("pksetexat");
RedisAppendLenUint64(content, pksetexat_cmd.size(), "$");
RedisAppendContent(content, pksetexat_cmd);
// key
RedisAppendLenUint64(content, key_.size(), "$");
RedisAppendContent(content, key_);
// time_stamp
char buf[100];
auto time_stamp = expired_timestamp_sec_;
pstd::ll2string(buf, sizeof(buf), time_stamp);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);
// value
RedisAppendLenUint64(content, new_value_.size(), "$");
RedisAppendContent(content, new_value_);
return content;
}

void MgetCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameMget);
Expand Down
6 changes: 3 additions & 3 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ class Storage {
// If key already exists and is a string, this command appends the value at
// the end of the string
// return the length of the string after the append operation
Status Append(const Slice& key, const Slice& value, int32_t* ret);
Status Append(const Slice& key, const Slice& value, int32_t* ret, int64_t* expired_timestamp_sec, std::string& out_new_value);

// Count the number of set bits (population counting) in a string.
// return the number of bits set to 1
Expand All @@ -285,11 +285,11 @@ class Storage {

// Increments the number stored at key by increment.
// If the key does not exist, it is set to 0 before performing the operation
Status Incrby(const Slice& key, int64_t value, int64_t* ret);
Status Incrby(const Slice& key, int64_t value, int64_t* ret, int64_t* expired_timestamp_sec);

// Increment the string representing a floating point number
// stored at key by the specified increment.
Status Incrbyfloat(const Slice& key, const Slice& value, std::string* ret);
Status Incrbyfloat(const Slice& key, const Slice& value, std::string* ret, int64_t* expired_timestamp_sec);

// Set key to hold the string value and set key to timeout after a given
// number of seconds
Expand Down
6 changes: 3 additions & 3 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class Redis {
virtual Status SetsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta = {});

// Strings Commands
Status Append(const Slice& key, const Slice& value, int32_t* ret);
Status Append(const Slice& key, const Slice& value, int32_t* ret, int64_t* expired_timestamp_sec, std::string& out_new_value);
Status BitCount(const Slice& key, int64_t start_offset, int64_t end_offset, int32_t* ret, bool have_range);
Status BitOp(BitOpType op, const std::string& dest_key, const std::vector<std::string>& src_keys, std::string &value_to_dest, int64_t* ret);
Status Decrby(const Slice& key, int64_t value, int64_t* ret);
Expand All @@ -163,8 +163,8 @@ class Redis {
Status GetrangeWithValue(const Slice& key, int64_t start_offset, int64_t end_offset,
std::string* ret, std::string* value, int64_t* ttl);
Status GetSet(const Slice& key, const Slice& value, std::string* old_value);
Status Incrby(const Slice& key, int64_t value, int64_t* ret);
Status Incrbyfloat(const Slice& key, const Slice& value, std::string* ret);
Status Incrby(const Slice& key, int64_t value, int64_t* ret, int64_t* expired_timestamp_sec);
Status Incrbyfloat(const Slice& key, const Slice& value, std::string* ret, int64_t* expired_timestamp_sec);
Status MSet(const std::vector<KeyValue>& kvs);
Status MSetnx(const std::vector<KeyValue>& kvs, int32_t* ret);
Status Set(const Slice& key, const Slice& value);
Expand Down
21 changes: 13 additions & 8 deletions src/storage/src/redis_strings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ Status Redis::ScanStringsKeyNum(KeyInfo* key_info) {
return Status::OK();
}

Status Redis::Append(const Slice& key, const Slice& value, int32_t* ret) {
Status Redis::Append(const Slice& key, const Slice& value, int32_t* ret, int64_t* expired_timestamp_sec, std::string& out_new_value) {
std::string old_value;
*ret = 0;
*expired_timestamp_sec = 0;
ScopeRecordLock l(lock_mgr_, key);

BaseKey base_key(key);
Expand All @@ -90,14 +91,17 @@ Status Redis::Append(const Slice& key, const Slice& value, int32_t* ret) {
uint64_t timestamp = parsed_strings_value.Etime();
std::string old_user_value = parsed_strings_value.UserValue().ToString();
std::string new_value = old_user_value + value.ToString();
out_new_value = new_value;
StringsValue strings_value(new_value);
strings_value.SetEtime(timestamp);
*ret = static_cast<int32_t>(new_value.size());
*expired_timestamp_sec = timestamp;
return db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode());
}
} else if (s.IsNotFound()) {
*ret = static_cast<int32_t>(value.size());
StringsValue strings_value(value);
*expired_timestamp_sec = 0;
return db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode());
}
return s;
Expand Down Expand Up @@ -618,22 +622,20 @@ Status Redis::GetSet(const Slice& key, const Slice& value, std::string* old_valu
return db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode());
}

Status Redis::Incrby(const Slice& key, int64_t value, int64_t* ret) {
Status Redis::Incrby(const Slice& key, int64_t value, int64_t* ret, int64_t* expired_timestamp_sec) {
std::string old_value;
std::string new_value;
ScopeRecordLock l(lock_mgr_, key);

BaseKey base_key(key);
Status s = db_->Get(default_read_options_, base_key.Encode(), &old_value);
char buf[32] = {0};
if (s.ok() && !ExpectedMetaValue(DataType::kStrings, old_value)) {
if (ExpectedStale(old_value)) {
s = Status::NotFound();
} else {
return Status::InvalidArgument(
"WRONGTYPE, key: " + key.ToString() + ", expect type: " +
DataTypeStrings[static_cast<int>(DataType::kStrings)] + ", get type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(old_value))]);
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
", expect type: " + DataTypeStrings[static_cast<int>(DataType::kStrings)] +
", get type: " + DataTypeStrings[static_cast<int>(GetMetaValueType(old_value))]);
}
}
if (s.ok()) {
Expand All @@ -658,6 +660,7 @@ Status Redis::Incrby(const Slice& key, int64_t value, int64_t* ret) {
new_value = std::to_string(*ret);
StringsValue strings_value(new_value);
strings_value.SetEtime(timestamp);
*expired_timestamp_sec = timestamp;
return db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode());
}
} else if (s.IsNotFound()) {
Expand All @@ -670,9 +673,10 @@ Status Redis::Incrby(const Slice& key, int64_t value, int64_t* ret) {
}
}

Status Redis::Incrbyfloat(const Slice& key, const Slice& value, std::string* ret) {
Status Redis::Incrbyfloat(const Slice& key, const Slice& value, std::string* ret, int64_t* expired_timestamp_sec) {
std::string old_value;
std::string new_value;
*expired_timestamp_sec = 0;
long double long_double_by;
if (StrToLongDouble(value.data(), value.size(), &long_double_by) == -1) {
return Status::Corruption("Value is not a vaild float");
Expand Down Expand Up @@ -713,6 +717,7 @@ Status Redis::Incrbyfloat(const Slice& key, const Slice& value, std::string* ret
*ret = new_value;
StringsValue strings_value(new_value);
strings_value.SetEtime(timestamp);
*expired_timestamp_sec = timestamp;
return db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode());
}
} else if (s.IsNotFound()) {
Expand Down
12 changes: 6 additions & 6 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ Status Storage::GetrangeWithValue(const Slice& key, int64_t start_offset, int64_
return inst->GetrangeWithValue(key, start_offset, end_offset, ret, value, ttl);
}

Status Storage::Append(const Slice& key, const Slice& value, int32_t* ret) {
Status Storage::Append(const Slice& key, const Slice& value, int32_t* ret, int64_t* expired_timestamp_sec, std::string& out_new_value) {
auto& inst = GetDBInstance(key);
return inst->Append(key, value, ret);
return inst->Append(key, value, ret, expired_timestamp_sec, out_new_value);
}

Status Storage::BitCount(const Slice& key, int64_t start_offset, int64_t end_offset, int32_t* ret, bool have_range) {
Expand Down Expand Up @@ -347,14 +347,14 @@ Status Storage::Decrby(const Slice& key, int64_t value, int64_t* ret) {
return inst->Decrby(key, value, ret);
}

Status Storage::Incrby(const Slice& key, int64_t value, int64_t* ret) {
Status Storage::Incrby(const Slice& key, int64_t value, int64_t* ret, int64_t* expired_timestamp_sec) {
auto& inst = GetDBInstance(key);
return inst->Incrby(key, value, ret);
return inst->Incrby(key, value, ret, expired_timestamp_sec);
}

Status Storage::Incrbyfloat(const Slice& key, const Slice& value, std::string* ret) {
Status Storage::Incrbyfloat(const Slice& key, const Slice& value, std::string* ret, int64_t* expired_timestamp_sec) {
auto& inst = GetDBInstance(key);
return inst->Incrbyfloat(key, value, ret);
return inst->Incrbyfloat(key, value, ret, expired_timestamp_sec);
}

Status Storage::Setex(const Slice& key, const Slice& value, int64_t ttl) {
Expand Down
Loading

0 comments on commit ada4ecf

Please sign in to comment.