From cfb9dfd59e2830d7995b5e75afdc928049a25381 Mon Sep 17 00:00:00 2001 From: Henry Barreto Date: Thu, 27 Feb 2025 16:15:35 -0300 Subject: [PATCH] refactor(api,ssh,pkg): move session's events to it own collection --- api/routes/session.go | 1 + api/store/mongo/migrations/migration_90.go | 86 ++++++++++----- .../mongo/migrations/migration_90_test.go | 82 +++++--------- api/store/mongo/migrations/migration_91.go | 56 ++++++---- .../mongo/migrations/migration_91_test.go | 103 ++++++------------ api/store/mongo/session.go | 34 ++++-- api/store/session.go | 1 - pkg/models/session.go | 4 +- ssh/session/session.go | 2 + 9 files changed, 188 insertions(+), 181 deletions(-) diff --git a/api/routes/session.go b/api/routes/session.go index bff937d1ff7..4edf1e119e7 100644 --- a/api/routes/session.go +++ b/api/routes/session.go @@ -132,6 +132,7 @@ func (h *Handler) EventSession(c gateway.Context) error { } return h.service.EventSession(c.Ctx(), models.UID(req.UID), &models.SessionEvent{ + Session: req.UID, Type: req.Type, Timestamp: req.Timestamp, Data: req.Data, diff --git a/api/store/mongo/migrations/migration_90.go b/api/store/mongo/migrations/migration_90.go index 40064499d91..26ea4482fe6 100644 --- a/api/store/mongo/migrations/migration_90.go +++ b/api/store/mongo/migrations/migration_90.go @@ -3,6 +3,7 @@ package migrations import ( "context" + "github.com/shellhub-io/shellhub/pkg/models" "github.com/sirupsen/logrus" migrate "github.com/xakep666/mongo-migrate" "go.mongodb.org/mongo-driver/bson" @@ -11,7 +12,7 @@ import ( var migration90 = migrate.Migration{ Version: 90, - Description: "Add events field on sessions", + Description: "Add sessions_events collections", Up: migrate.MigrationFunc(func(ctx context.Context, db *mongo.Database) error { logrus.WithFields(logrus.Fields{ "component": "migration", @@ -19,27 +20,48 @@ var migration90 = migrate.Migration{ "action": "Up", }).Info("Applying migration") - filter := bson.M{ - "events": bson.M{"$exists": false}, - "events.types": bson.M{"$exists": false}, - "events.items": bson.M{"$exists": false}, + if err := db.CreateCollection(ctx, "sessions_events"); err != nil { + return err } - update := bson.M{ - "$set": bson.M{ - "events": bson.M{ - "types": bson.A{}, - "items": bson.A{}, - "seats": bson.A{0}, - }, - }, + cursor, err := db.Collection("sessions").Find(ctx, bson.M{"events.items": bson.M{"$exists": true}}) + if err != nil { + return err } - _, err := db. - Collection("sessions"). - UpdateMany(ctx, filter, update) + defer cursor.Close(ctx) + + for cursor.Next(ctx) { + var session struct { + UID string `bson:"uid"` + Events struct { + Items []models.SessionEvent `bson:"items"` + } `bson:"events"` + } + + if err := cursor.Decode(&session); err != nil { + return err + } + + for _, event := range session.Events.Items { + event.Session = session.UID + if _, err := db.Collection("sessions_events").InsertOne(ctx, event); err != nil { + return err + } + } + + if _, err := db.Collection("sessions").UpdateOne(ctx, bson.M{ + "uid": session.UID, + }, bson.M{ + "$unset": bson.M{ + "events.items": "", + }, + }); err != nil { + return err + } + } - return err + return nil }), Down: migrate.MigrationFunc(func(ctx context.Context, db *mongo.Database) error { logrus.WithFields(logrus.Fields{ @@ -47,19 +69,31 @@ var migration90 = migrate.Migration{ "version": 90, "action": "Down", }).Info("Reverting migration") + cursor, err := db.Collection("sessions_events").Find(ctx, bson.M{}) + if err != nil { + return err + } - filter := bson.M{} + defer cursor.Close(ctx) - update := bson.M{ - "$unset": bson.M{ - "events": "", - }, + for cursor.Next(ctx) { + var event models.SessionEvent + if err := cursor.Decode(&event); err != nil { + return err + } + sessionID := event.Session + + event.Session = "" + update := bson.M{"$push": bson.M{"events.items": event}} + if _, err := db.Collection("sessions").UpdateOne(ctx, bson.M{"uid": sessionID}, update); err != nil { + return err + } } - _, err := db. - Collection("sessions"). - UpdateMany(ctx, filter, update) + if err := db.Collection("sessions_events").Drop(ctx); err != nil { + return err + } - return err + return nil }), } diff --git a/api/store/mongo/migrations/migration_90_test.go b/api/store/mongo/migrations/migration_90_test.go index e9e4e839790..9969ae8aaab 100644 --- a/api/store/mongo/migrations/migration_90_test.go +++ b/api/store/mongo/migrations/migration_90_test.go @@ -2,11 +2,11 @@ package migrations import ( "context" + "fmt" "testing" "github.com/shellhub-io/shellhub/pkg/envs" envmock "github.com/shellhub-io/shellhub/pkg/envs/mocks" - "github.com/shellhub-io/shellhub/pkg/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" migrate "github.com/xakep666/mongo-migrate" @@ -29,7 +29,20 @@ func TestMigration90Up(t *testing.T) { _, err := c. Database("test"). Collection("sessions"). - InsertOne(ctx, map[string]interface{}{}) + InsertOne(ctx, bson.M{ + "uid": "session-1", + "events": bson.M{ + "types": bson.A{ + "test", + }, + "items": []bson.M{ + { + "type": "test", + "data": "some data", + }, + }, + }, + }) return err }, @@ -45,71 +58,30 @@ func TestMigration90Up(t *testing.T) { assert.NoError(tt, tc.setup()) migrates := migrate.NewMigrate(c.Database("test"), GenerateMigrations()[89]) - require.NoError(tt, migrates.Up(context.Background(), migrate.AllAvailable)) + require.NoError(tt, migrates.Up(ctx, migrate.AllAvailable)) query := c. Database("test"). - Collection("sessions"). - FindOne(context.TODO(), bson.M{}) + Collection("sessions_events"). + FindOne(ctx, bson.M{"session": "session-1"}) - session := make(map[string]interface{}) - require.NoError(tt, query.Decode(&session)) + sessionEvent := make(map[string]interface{}) + require.NoError(tt, query.Decode(&sessionEvent)) + fmt.Println(sessionEvent) - require.Contains(tt, session, "events") - }) - } -} + require.Contains(tt, sessionEvent, "type") + require.Contains(tt, sessionEvent, "data") -func TestMigration90Down(t *testing.T) { - ctx := context.Background() - - mock := &envmock.Backend{} - envs.DefaultBackend = mock - - cases := []struct { - description string - setup func() error - }{ - { - description: "Success to revert migration 90", - setup: func() error { - _, err := c. - Database("test"). - Collection("sessions"). - InsertOne(ctx, models.Session{ - Events: models.SessionEvents{ - Types: []string{}, - Items: []models.SessionEvent{}, - Seats: []int{0}, - }, - }) - - return err - }, - }, - } - - for _, tc := range cases { - t.Run(tc.description, func(tt *testing.T) { - tt.Cleanup(func() { - assert.NoError(tt, srv.Reset()) - }) - - assert.NoError(tt, tc.setup()) - - migrates := migrate.NewMigrate(c.Database("test"), GenerateMigrations()[89]) - require.NoError(tt, migrates.Up(context.Background(), migrate.AllAvailable)) - require.NoError(tt, migrates.Down(context.Background(), migrate.AllAvailable)) - - query := c. + query = c. Database("test"). Collection("sessions"). - FindOne(context.TODO(), bson.M{}) + FindOne(ctx, bson.M{"uid": "session-1"}) session := make(map[string]interface{}) require.NoError(tt, query.Decode(&session)) + fmt.Println(session) - require.NotContains(tt, session, "events") + require.NotContains(tt, session, "events.items") }) } } diff --git a/api/store/mongo/migrations/migration_91.go b/api/store/mongo/migrations/migration_91.go index cf455147e59..0121995bf0c 100644 --- a/api/store/mongo/migrations/migration_91.go +++ b/api/store/mongo/migrations/migration_91.go @@ -11,7 +11,7 @@ import ( var migration91 = migrate.Migration{ Version: 91, - Description: "Adding seat and seats to sessions and event's session", + Description: "Adding seat and seats to sessions and sessions events", Up: migrate.MigrationFunc(func(ctx context.Context, db *mongo.Database) error { logrus.WithFields(logrus.Fields{ "component": "migration", @@ -21,18 +21,27 @@ var migration91 = migrate.Migration{ filter := bson.M{} - update := bson.M{ - "$set": bson.M{ - "events.items.$[].seat": 0, - "events.seats": bson.A{0}, - }, + if _, err := db. + Collection("sessions"). + UpdateMany(ctx, filter, bson.M{ + "$set": bson.M{ + "events.seats": bson.A{0}, + }, + }); err != nil { + return err } - _, err := db. - Collection("sessions"). - UpdateMany(ctx, filter, update) + if _, err := db. + Collection("sessions_events"). + UpdateMany(ctx, filter, bson.M{ + "$set": bson.M{ + "seat": 0, + }, + }); err != nil { + return err + } - return err + return nil }), Down: migrate.MigrationFunc(func(ctx context.Context, db *mongo.Database) error { logrus.WithFields(logrus.Fields{ @@ -43,17 +52,26 @@ var migration91 = migrate.Migration{ filter := bson.M{} - update := bson.M{ - "$unset": bson.M{ - "events.items.$[].seat": "", - "events.seats": "", - }, + if _, err := db. + Collection("sessions"). + UpdateMany(ctx, filter, bson.M{ + "$unset": bson.M{ + "events.seats": "", + }, + }); err != nil { + return err } - _, err := db. - Collection("sessions"). - UpdateMany(ctx, filter, update) + if _, err := db. + Collection("sessions_events"). + UpdateMany(ctx, filter, bson.M{ + "$unset": bson.M{ + "seat": "", + }, + }); err != nil { + return err + } - return err + return nil }), } diff --git a/api/store/mongo/migrations/migration_91_test.go b/api/store/mongo/migrations/migration_91_test.go index b49d3cc57ec..482c37fae25 100644 --- a/api/store/mongo/migrations/migration_91_test.go +++ b/api/store/mongo/migrations/migration_91_test.go @@ -6,7 +6,6 @@ import ( "github.com/shellhub-io/shellhub/pkg/envs" envmock "github.com/shellhub-io/shellhub/pkg/envs/mocks" - "github.com/shellhub-io/shellhub/pkg/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" migrate "github.com/xakep666/mongo-migrate" @@ -15,7 +14,6 @@ import ( func TestMigration91Up(t *testing.T) { ctx := context.Background() - mock := &envmock.Backend{} envs.DefaultBackend = mock @@ -29,70 +27,25 @@ func TestMigration91Up(t *testing.T) { _, err := c. Database("test"). Collection("sessions"). - InsertOne(ctx, map[string]interface{}{ - "events": map[string]interface{}{ - "types": []string{}, - "items": []map[string]interface{}{}, - "seats": []int{}, + InsertOne(ctx, bson.M{ + "uid": "session-1", + "events": bson.M{ + "types": bson.A{ + "test", + }, }, }) + if err != nil { + return err + } - return err - }, - }, - } - - for _, tc := range cases { - t.Run(tc.description, func(tt *testing.T) { - tt.Cleanup(func() { - assert.NoError(tt, srv.Reset()) - }) - - assert.NoError(tt, tc.setup()) - - migrates := migrate.NewMigrate(c.Database("test"), GenerateMigrations()[89]) - require.NoError(tt, migrates.Up(context.Background(), migrate.AllAvailable)) - - query := c. - Database("test"). - Collection("sessions"). - FindOne(context.TODO(), bson.M{}) - - session := make(map[string]interface{}) - require.NoError(tt, query.Decode(&session)) - - events, ok := session["events"].(map[string]interface{}) - require.True(tt, ok) - require.Contains(tt, events, "seats") - }) - } -} - -func TestMigration91Down(t *testing.T) { - ctx := context.Background() - - mock := &envmock.Backend{} - envs.DefaultBackend = mock - - cases := []struct { - description string - setup func() error - }{ - { - description: "Success to revert migration 91", - setup: func() error { - _, err := c. + _, err = c. Database("test"). - Collection("sessions"). - InsertOne(ctx, models.Session{ - Events: models.SessionEvents{ - Items: []models.SessionEvent{ - { - Seat: 0, - }, - }, - Seats: []int{0}, - }, + Collection("sessions_events"). + InsertOne(ctx, bson.M{ + "session": "session-1", + "type": "test", + "data": "some data", }) return err @@ -105,24 +58,36 @@ func TestMigration91Down(t *testing.T) { tt.Cleanup(func() { assert.NoError(tt, srv.Reset()) }) - assert.NoError(tt, tc.setup()) migrates := migrate.NewMigrate(c.Database("test"), GenerateMigrations()[90]) - require.NoError(tt, migrates.Up(context.Background(), migrate.AllAvailable)) - require.NoError(tt, migrates.Down(context.Background(), migrate.AllAvailable)) + require.NoError(tt, migrates.Up(ctx, migrate.AllAvailable)) query := c. Database("test"). Collection("sessions"). - FindOne(context.TODO(), bson.M{}) - + FindOne(ctx, bson.M{"uid": "session-1"}) session := make(map[string]interface{}) require.NoError(tt, query.Decode(&session)) events, ok := session["events"].(map[string]interface{}) - require.True(tt, ok) - require.NotContains(tt, events, "seats") + require.True(tt, ok, "events field should exist") + + seats, ok := events["seats"].(bson.A) + require.True(tt, ok, "events.seats field should exist") + require.Equal(tt, 1, len(seats), "seats array should have one element") + require.Equal(tt, int32(0), seats[0], "first seat should be 0") + + query = c. + Database("test"). + Collection("sessions_events"). + FindOne(ctx, bson.M{"session": "session-1"}) + sessionEvent := make(map[string]interface{}) + require.NoError(tt, query.Decode(&sessionEvent)) + + seat, ok := sessionEvent["seat"] + require.True(tt, ok, "seat field should exist") + require.Equal(tt, int32(0), seat, "seat should be 0") }) } } diff --git a/api/store/mongo/session.go b/api/store/mongo/session.go index 751db69f627..269a126bdc0 100644 --- a/api/store/mongo/session.go +++ b/api/store/mongo/session.go @@ -272,16 +272,32 @@ func (s *Store) SessionActiveCreate(ctx context.Context, uid models.UID, session // It pushes the event into events type array, and the event type into a separated set. The set is used to improve the // performance of indexing when looking for sessions. func (s *Store) SessionEvent(ctx context.Context, uid models.UID, event *models.SessionEvent) error { - if _, err := s.db.Collection("sessions").UpdateOne(ctx, - bson.M{"uid": uid}, - bson.M{ - "$addToSet": bson.M{ - "events.types": event.Type, - "events.seats": event.Seat, + session, err := s.db.Client().StartSession() + if err != nil { + return FromMongoError(err) + } + + defer session.EndSession(ctx) + + if _, err := session.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) { + if _, err := s.db.Collection("sessions").UpdateOne(ctx, + bson.M{"uid": uid}, + bson.M{ + "$addToSet": bson.M{ + "events.types": event.Type, + "events.seats": event.Seat, + }, }, - "$push": bson.M{"events.items": event}, - }, - ); err != nil { + ); err != nil { + return nil, FromMongoError(err) + } + + if _, err := s.db.Collection("sessions_events").InsertOne(ctx, event); err != nil { + return nil, FromMongoError(err) + } + + return nil, nil + }); err != nil { return FromMongoError(err) } diff --git a/api/store/session.go b/api/store/session.go index cce9fa5af54..b80396bc428 100644 --- a/api/store/session.go +++ b/api/store/session.go @@ -17,6 +17,5 @@ type SessionStore interface { SessionUpdateDeviceUID(ctx context.Context, oldUID models.UID, newUID models.UID) error SessionSetRecorded(ctx context.Context, uid models.UID, recorded bool) error SessionActiveCreate(ctx context.Context, uid models.UID, session *models.Session) error - // SessionEvent register a log event into the session. SessionEvent(ctx context.Context, uid models.UID, event *models.SessionEvent) error } diff --git a/pkg/models/session.go b/pkg/models/session.go index 4a8f70c5915..fd6a7695ec9 100644 --- a/pkg/models/session.go +++ b/pkg/models/session.go @@ -67,6 +67,8 @@ type SessionUpdate struct { // SessionEvent represents a session event. type SessionEvent struct { + // Session is the session UID where the event occurred. + Session string `json:"session" bson:"session,omitempty"` // Type of the session. Normally, it is the SSH request name. Type string `json:"type" bson:"type"` // Timestamp contains the time when the event was logged. @@ -81,8 +83,6 @@ type SessionEvent struct { type SessionEvents struct { // Types field is a set of sessions type to simplify the indexing on the database. Types []string `json:"types" bson:"types,omitempty"` - // Items contains a list of events happened in a session. - Items []SessionEvent `json:"items" bson:"items,omitempty"` // Seats contains a list of seats of events. Seats []int `json:"seats" bson:"seats,omitempty"` } diff --git a/ssh/session/session.go b/ssh/session/session.go index 60f61793313..4f4278e8596 100644 --- a/ssh/session/session.go +++ b/ssh/session/session.go @@ -453,6 +453,7 @@ func (s *Session) NewSeat() (int, error) { // Events register an event to the session. func (s *Session) Event(t string, data any, seat int) { go s.api.EventSession(s.UID, &models.SessionEvent{ //nolint:errcheck + Session: s.UID, Type: t, Timestamp: clock.Now(), Data: data, @@ -467,6 +468,7 @@ func Event[D any](sess *Session, t string, data []byte, seat int) { } go sess.api.EventSession(sess.UID, &models.SessionEvent{ //nolint:errcheck + Session: sess.UID, Type: t, Timestamp: clock.Now(), Data: d,