Skip to content

Commit

Permalink
fix: get reverse history message change. (#805)
Browse files Browse the repository at this point in the history
Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>
  • Loading branch information
FGadvancer authored Dec 4, 2024
1 parent e8eb653 commit dda6220
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 187 deletions.
85 changes: 52 additions & 33 deletions internal/conversation_msg/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
}
startTime = m.SendTime
} else {
c.messagePullMinSeqMap.Delete(conversationID)
// Clear both maps when the user enters the conversation
c.messagePullForwardEndSeqMap.Delete(conversationID)
c.messagePullReverseEndSeqMap.Delete(conversationID)
}
log.ZDebug(ctx, "Assembly conversation parameters", "cost time", time.Since(t), "conversationID",
conversationID, "startTime:", startTime, "count:", req.Count, "startTime", startTime)
Expand All @@ -75,41 +77,48 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
log.ZDebug(ctx, "pull message", "pull cost time", time.Since(t))
t = time.Now()

var thisMinSeq int64
thisMinSeq, messageList = c.LocalChatLog2MsgStruct(ctx, list)
var thisEndSeq int64
thisEndSeq, messageList = c.LocalChatLog2MsgStruct(list, isReverse)
log.ZDebug(ctx, "message convert and unmarshal", "unmarshal cost time", time.Since(t))
t = time.Now()
if !isReverse {
sort.Sort(messageList)
if thisEndSeq != 0 {
c.messagePullForwardEndSeqMap.Store(conversationID, thisEndSeq)
}
} else {
if thisEndSeq != 0 {
c.messagePullReverseEndSeqMap.Store(conversationID, thisEndSeq)
}
}
log.ZDebug(ctx, "sort", "sort cost time", time.Since(t))
messageListCallback.MessageList = messageList
if thisMinSeq != 0 {
c.messagePullMinSeqMap.Store(conversationID, thisMinSeq)
}
return &messageListCallback, nil

return &messageListCallback, nil
}

func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversationID string,
count int, startTime int64, isReverse bool, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {

var list []*model_struct.LocalChatLog
var list, validMessages []*model_struct.LocalChatLog

// If all retrieved messages are either deleted or filtered out, continue fetching messages from an earlier point.
shouldFetchMoreMessages := func(messages []*model_struct.LocalChatLog) bool {
// Get the number of invalid messages in this batch to recursive fetching from earlier points.
shouldFetchMoreMessagesNum := func(messages []*model_struct.LocalChatLog) int {
if len(messages) == 0 {
return false
return count
}

allDeleted := true
// Represents the number of valid messages in the batch
validateMessageNum := 0
for _, msg := range messages {
if msg.Status < constant.MsgStatusHasDeleted {
allDeleted = false
break
validateMessageNum++
validMessages = append(validMessages, msg)
} else {
log.ZDebug(ctx, "this message has been deleted or exception message", "msg", msg)
}
}
return allDeleted
return count - validateMessageNum
}
getNewStartTime := func(messages []*model_struct.LocalChatLog) int64 {
if len(messages) == 0 {
Expand All @@ -128,39 +137,49 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
return nil, err
}
t = time.Now()
maxSeq := c.validateAndFillInternalGaps(ctx, conversationID, isReverse,
thisStartSeq := c.validateAndFillInternalGaps(ctx, conversationID, isReverse,
count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t))
t = time.Now()
c.validateAndFillInterBlockGaps(ctx, maxSeq, conversationID,
c.validateAndFillInterBlockGaps(ctx, thisStartSeq, conversationID,
isReverse, count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t))
t = time.Now()
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse,
count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "end continuity check", "cost time", time.Since(t))
// If all retrieved messages are either deleted or filtered out,
//continue fetching recursively until either valid messages are found or all messages have been fetched.
if shouldFetchMoreMessages(list) && !messageListCallback.IsEnd {
return c.fetchMessagesWithGapCheck(ctx, conversationID, count, getNewStartTime(list), isReverse, messageListCallback)
// If the number of valid messages retrieved is less than the count,
// continue fetching recursively until the valid messages are sufficient or all messages have been fetched.
missingCount := shouldFetchMoreMessagesNum(list)
if missingCount > 0 && !messageListCallback.IsEnd {
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID", conversationID)
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, getNewStartTime(list), isReverse, messageListCallback)
if err != nil {
return nil, err
}
log.ZDebug(ctx, "fetch more messages", "missingMessages", missingMessages)
return append(validMessages, missingMessages...), nil
}

return list, nil
return validMessages, nil
}

func (c *Conversation) LocalChatLog2MsgStruct(ctx context.Context, list []*model_struct.LocalChatLog) (int64, []*sdk_struct.MsgStruct) {
func (c *Conversation) LocalChatLog2MsgStruct(list []*model_struct.LocalChatLog, isReverse bool) (int64, []*sdk_struct.MsgStruct) {
messageList := make([]*sdk_struct.MsgStruct, 0, len(list))
var thisMinSeq int64
var thisEndSeq int64
for _, v := range list {
if v.Seq != 0 && thisMinSeq == 0 {
thisMinSeq = v.Seq
if v.Seq != 0 && thisEndSeq == 0 {
thisEndSeq = v.Seq
}
if v.Seq < thisMinSeq && v.Seq != 0 {
thisMinSeq = v.Seq
}
if v.Status >= constant.MsgStatusHasDeleted {
log.ZDebug(ctx, "this message has been deleted or exception message", "msg", v)
continue
if isReverse {
if v.Seq > thisEndSeq && thisEndSeq != 0 {
thisEndSeq = v.Seq
}

} else {
if v.Seq < thisEndSeq && v.Seq != 0 {
thisEndSeq = v.Seq
}
}
temp := LocalChatLogToMsgStruct(v)

Expand All @@ -169,7 +188,7 @@ func (c *Conversation) LocalChatLog2MsgStruct(ctx context.Context, list []*model
}
messageList = append(messageList, temp)
}
return thisMinSeq, messageList
return thisEndSeq, messageList
}

func (c *Conversation) typingStatusUpdate(ctx context.Context, recvID, msgTip string) error {
Expand Down
140 changes: 71 additions & 69 deletions internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,29 @@ var SearchContentType = []int{constant.Text, constant.AtText, constant.File}

type Conversation struct {
*interaction.LongConnMgr
conversationSyncer *syncer.Syncer[*model_struct.LocalConversation, pbConversation.GetOwnerConversationResp, string]
db db_interface.DataBase
ConversationListener func() open_im_sdk_callback.OnConversationListener
msgListener func() open_im_sdk_callback.OnAdvancedMsgListener
msgKvListener func() open_im_sdk_callback.OnMessageKvInfoListener
batchMsgListener func() open_im_sdk_callback.OnBatchMsgListener
businessListener func() open_im_sdk_callback.OnCustomBusinessListener
recvCH chan common.Cmd2Value
loginUserID string
platformID int32
DataDir string
relation *relation.Relation
group *group.Group
user *user.User
file *file.File
cache *cache.Cache[string, *model_struct.LocalConversation]
maxSeqRecorder MaxSeqRecorder
messagePullMinSeqMap *cache.Cache[string, int64]
IsExternalExtensions bool
msgOffset int
progress int
conversationSyncMutex sync.Mutex
conversationSyncer *syncer.Syncer[*model_struct.LocalConversation, pbConversation.GetOwnerConversationResp, string]
db db_interface.DataBase
ConversationListener func() open_im_sdk_callback.OnConversationListener
msgListener func() open_im_sdk_callback.OnAdvancedMsgListener
msgKvListener func() open_im_sdk_callback.OnMessageKvInfoListener
batchMsgListener func() open_im_sdk_callback.OnBatchMsgListener
businessListener func() open_im_sdk_callback.OnCustomBusinessListener
recvCH chan common.Cmd2Value
loginUserID string
platformID int32
DataDir string
relation *relation.Relation
group *group.Group
user *user.User
file *file.File
cache *cache.Cache[string, *model_struct.LocalConversation]
maxSeqRecorder MaxSeqRecorder
messagePullForwardEndSeqMap *cache.Cache[string, int64]
messagePullReverseEndSeqMap *cache.Cache[string, int64]
IsExternalExtensions bool
msgOffset int
progress int
conversationSyncMutex sync.Mutex

startTime time.Time

Expand Down Expand Up @@ -100,20 +101,21 @@ func NewConversation(ctx context.Context, longConnMgr *interaction.LongConnMgr,
file *file.File) *Conversation {
info := ccontext.Info(ctx)
n := &Conversation{db: db,
LongConnMgr: longConnMgr,
recvCH: ch,
loginUserID: info.UserID(),
platformID: info.PlatformID(),
DataDir: info.DataDir(),
relation: relation,
group: group,
user: user,
file: file,
IsExternalExtensions: info.IsExternalExtensions(),
maxSeqRecorder: NewMaxSeqRecorder(),
messagePullMinSeqMap: cache.NewCache[string, int64](),
msgOffset: 0,
progress: 0,
LongConnMgr: longConnMgr,
recvCH: ch,
loginUserID: info.UserID(),
platformID: info.PlatformID(),
DataDir: info.DataDir(),
relation: relation,
group: group,
user: user,
file: file,
IsExternalExtensions: info.IsExternalExtensions(),
maxSeqRecorder: NewMaxSeqRecorder(),
messagePullForwardEndSeqMap: cache.NewCache[string, int64](),
messagePullReverseEndSeqMap: cache.NewCache[string, int64](),
msgOffset: 0,
progress: 0,
}
n.typing = newTyping(n)
n.initSyncer()
Expand Down Expand Up @@ -835,8 +837,8 @@ func (c *Conversation) batchAddFaceURLAndName(ctx context.Context, conversations
conversation.FaceURL = v.FaceURL
conversation.ShowName = v.Nickname
} else {
log.ZWarn(ctx, "user info not found", errors.New("user not found"),"userID", conversation.UserID)
log.ZWarn(ctx, "user info not found", errors.New("user not found"), "userID", conversation.UserID)

conversation.FaceURL = ""
conversation.ShowName = "UserNotFound"
}
Expand Down Expand Up @@ -929,37 +931,37 @@ func (c *Conversation) FetchSurroundingMessages(ctx context.Context, conversatio
if len(res) == 0 {
return []*sdk_struct.MsgStruct{}, nil
}
_, msgList := c.LocalChatLog2MsgStruct(ctx, []*model_struct.LocalChatLog{res[0]})
if len(msgList) == 0 {
return []*sdk_struct.MsgStruct{}, nil
}
msg := msgList[0]
//_, msgList := c.LocalChatLog2MsgStruct []*model_struct.LocalChatLog{res[0]})
//if len(msgList) == 0 {
// return []*sdk_struct.MsgStruct{}, nil
//}
//msg := msgList[0]
result := make([]*sdk_struct.MsgStruct, 0, before+after+1)
if before > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
ConversationID: conversationID,
Count: int(before),
StartClientMsgID: msg.ClientMsgID,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
result = append(result, msg)
if after > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
ConversationID: conversationID,
Count: int(after),
StartClientMsgID: msg.ClientMsgID,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
sort.Sort(sdk_struct.NewMsgList(result))
//if before > 0 {
// req := sdk.GetAdvancedHistoryMessageListParams{
// ConversationID: conversationID,
// Count: int(before),
// StartClientMsgID: msg.ClientMsgID,
// }
// val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
// if err != nil {
// return nil, err
// }
// result = append(result, val.MessageList...)
//}
//result = append(result, msg)
//if after > 0 {
// req := sdk.GetAdvancedHistoryMessageListParams{
// ConversationID: conversationID,
// Count: int(after),
// StartClientMsgID: msg.ClientMsgID,
// }
// val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
// if err != nil {
// return nil, err
// }
// result = append(result, val.MessageList...)
//}
//sort.Sort(sdk_struct.NewMsgList(result))
return result, nil
}
Loading

0 comments on commit dda6220

Please sign in to comment.