diff --git a/server/controller/http/router/agent/agent_cmd.go b/server/controller/http/router/agent/agent_cmd.go new file mode 100644 index 000000000000..a0281ff6e782 --- /dev/null +++ b/server/controller/http/router/agent/agent_cmd.go @@ -0,0 +1,317 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package agent + +import ( + "bytes" + "errors" + "fmt" + "io" + "net/http" + "net/http/httputil" + "net/url" + "strconv" + + grpcapi "github.com/deepflowio/deepflow/message/agent" + "github.com/deepflowio/deepflow/server/controller/common" + "github.com/deepflowio/deepflow/server/controller/config" + "github.com/deepflowio/deepflow/server/controller/db/mysql" + mysqlmodel "github.com/deepflowio/deepflow/server/controller/db/mysql/model" + httpcommon "github.com/deepflowio/deepflow/server/controller/http/common" + . "github.com/deepflowio/deepflow/server/controller/http/router/common" + service "github.com/deepflowio/deepflow/server/controller/http/service/agent" + "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" +) + +const ( + ForwardControllerTimes = "ForwardControllerTimes" + DefaultForwardControllerTimes = 3 + + AgentCommandTypeProbe = AgentCommandType("probe") + AgentCommandTypeProfile = AgentCommandType("profile") +) + +type AgentCommandType string + +var ( + agentCommandMap = map[AgentCommandType]map[string]struct{}{ + AgentCommandTypeProbe: probeCommandMap, + AgentCommandTypeProfile: profileCommandMap, + } + + profileCommandMap = map[string]struct{}{ + "ps": struct{}{}, + "java-dump-stack": struct{}{}, + "java-dump-gc": struct{}{}, + "java-dump-heap": struct{}{}, + "ebpf-dump-stack": struct{}{}, + } + probeCommandMap = map[string]struct{}{ + "ping": struct{}{}, + "tcping": struct{}{}, + "curl": struct{}{}, + "dig": struct{}{}, + "traceroute": struct{}{}, + } +) + +type AgentCMD struct { + cfg *config.ControllerConfig +} + +func NewAgentCMD(cfg *config.ControllerConfig) *AgentCMD { + return &AgentCMD{ + cfg: cfg, + } +} + +func (a *AgentCMD) RegisterTo(e *gin.Engine) { + agentRoutes := e.Group("/v1/agent/:id-or-name") + + agentRoutes.GET("/cmd", forwardToServerConnectedByAgent(), a.getCMDAndNamespaceHandler()) + agentRoutes.POST("/cmd/run", forwardToServerConnectedByAgent(), a.cmdRunHandler()) +} + +func forwardToServerConnectedByAgent() gin.HandlerFunc { + return func(c *gin.Context) { + orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID) + db, err := mysql.GetDB(orgID.(int)) + if err != nil { + log.Error(err, db.LogPrefixORGID) + BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error()) + c.Abort() + return + } + agentID, err := getAgentID(c, db) + if err != nil { + log.Error(err, db.LogPrefixORGID) + BadRequestResponse(c, httpcommon.INVALID_PARAMETERS, err.Error()) + c.Abort() + return + } + var agent *mysqlmodel.VTap + if err = db.Where("id = ?", agentID).First(&agent).Error; err != nil { + log.Error(err, db.LogPrefixORGID) + BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error()) + c.Abort() + return + } + key := agent.CtrlIP + "-" + agent.CtrlMac + // handle forward times + var forwardTimes int + timesStr := c.Request.Header.Get(ForwardControllerTimes) + if len(timesStr) > 0 { + v, err := strconv.Atoi(timesStr) + if err != nil { + log.Error(err, db.LogPrefixORGID) + BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error()) + return + } + forwardTimes = v + } else { + log.Infof("agent(key: %s), node ip(%s) init %s to 0", key, common.NodeIP, ForwardControllerTimes, db.LogPrefixORGID) + c.Request.Header.Set(ForwardControllerTimes, "0") + } + log.Infof("agent(key: %s), node ip(%s) forward times: %d", key, common.NodeIP, forwardTimes, db.LogPrefixORGID) + if forwardTimes > DefaultForwardControllerTimes { + err := fmt.Errorf("get agent(name: %s, key: %s) commands forward times > %d", agent.Name, key, DefaultForwardControllerTimes) + log.Error(err, db.LogPrefixORGID) + BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error()) + c.Abort() + return + } + + log.Infof("agent(key: %s), node ip(%s), agent cur controller ip(%s), controller ip(%s)", + key, common.NodeIP, agent.CurControllerIP, agent.ControllerIP) + // get reverse proxy host + newHost := common.NodeIP + if common.NodeIP == agent.CurControllerIP { + if manager := service.GetAgentCMDManager(key); manager != nil { + log.Infof("agent(key: %s) command context next, node ip(%s)", key) + c.Next() + return + } else { + newHost = agent.ControllerIP + c.Request.Header.Set(ForwardControllerTimes, fmt.Sprintf("%d", forwardTimes+1)) + } + } else if common.NodeIP == agent.ControllerIP { + if manager := service.GetAgentCMDManager(key); manager != nil { + log.Infof("agent(key: %s) command context next, node ip(%s)", key) + c.Next() + return + } else { + newHost = agent.CurControllerIP + c.Request.Header.Set(ForwardControllerTimes, fmt.Sprintf("%d", forwardTimes+1)) + } + } else { + newHost = agent.ControllerIP + c.Request.Header.Set(ForwardControllerTimes, fmt.Sprintf("%d", forwardTimes+1)) + } + + reverseProxy := fmt.Sprintf("http://%s:%d", newHost, common.GConfig.HTTPNodePort) + log.Infof("agnet(key: %s), node ip(%s), reverse proxy(%s), agent current controller ip(%s), controller ip(%s)", + key, common.NodeIP, reverseProxy, agent.CurControllerIP, agent.ControllerIP, db.LogPrefixORGID) + + proxyURL, err := url.Parse(reverseProxy) + if err != nil { + log.Error(err, db.LogPrefixORGID) + BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error()) + c.Abort() + return + } + proxy := httputil.NewSingleHostReverseProxy(proxyURL) + proxy.ServeHTTP(c.Writer, c.Request) + c.Abort() + } +} + +func (a *AgentCMD) getCMDAndNamespaceHandler() gin.HandlerFunc { + return func(c *gin.Context) { + orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID) + db, err := mysql.GetDB(orgID.(int)) + if err != nil { + JsonResponse(c, nil, err) + return + } + agentID, err := getAgentID(c, db) + if err != nil { + BadRequestResponse(c, httpcommon.INVALID_PARAMETERS, err.Error()) + return + } + var agent *mysqlmodel.VTap + if err = db.Where("id = ?", agentID).First(&agent).Error; err != nil { + JsonResponse(c, nil, err) + return + } + + data, err := service.GetCMDAndNamespace(a.cfg.AgentCommandTimeout, orgID.(int), agentID) + if err != nil { + JsonResponse(c, data, err) + return + } + + userType, _ := c.Get(common.HEADER_KEY_X_USER_TYPE) + if !(userType == common.USER_TYPE_SUPER_ADMIN || userType == common.USER_TYPE_ADMIN) { + var cmds []*grpcapi.RemoteCommand + for _, item := range data.RemoteCommand { + _, ok1 := profileCommandMap[*item.Cmd] + _, ok2 := probeCommandMap[*item.Cmd] + if ok1 || ok2 { + cmds = append(cmds, item) + } + } + data.RemoteCommand = cmds + } + + if filterCommandMap, ok := agentCommandMap[AgentCommandType(c.Query("type"))]; ok { + var cmds []*grpcapi.RemoteCommand + for _, item := range data.RemoteCommand { + if item.Cmd == nil { + continue + } + if _, ok := filterCommandMap[*item.Cmd]; ok { + cmds = append(cmds, item) + } + } + data.RemoteCommand = cmds + data.LinuxNamespace = nil + + } + JsonResponse(c, data, nil) + } +} + +func getAgentID(c *gin.Context, db *mysql.DB) (int, error) { + agentIDentStr := c.Param("id-or-name") + if agentIDentStr == "" { + return 0, errors.New("ident can not be empty") + } + agentID, err := strconv.Atoi(agentIDentStr) + if err != nil { + var agent mysqlmodel.VTap + if err := db.Where("name = ?", agentIDentStr).First(&agent).Error; err != nil { + return 0, fmt.Errorf("failed to get agent by name(%s), error: %s", err.Error()) + } + return agent.ID, nil + } + return agentID, nil +} + +func (a *AgentCMD) cmdRunHandler() gin.HandlerFunc { + return func(c *gin.Context) { + req := service.RemoteExecReq{} + if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { + BadRequestResponse(c, httpcommon.INVALID_PARAMETERS, err.Error()) + return + } + // Profile commands and probe commands are available to everyone. + userType, _ := c.Get(common.HEADER_KEY_X_USER_TYPE) + if !(userType == common.USER_TYPE_SUPER_ADMIN || userType == common.USER_TYPE_ADMIN) { + _, ok1 := profileCommandMap[req.CMD] + _, ok2 := probeCommandMap[req.CMD] + if !(ok1 || ok2) { + StatusForbiddenResponse(c, fmt.Sprintf("only super admin and admin can operate command(%s)", req.CMD)) + return + } + } + + agentReq := grpcapi.RemoteExecRequest{ + ExecType: grpcapi.ExecutionType_RUN_COMMAND.Enum(), + // CommandId: req.CommandId, // deprecated + CommandIdent: req.CommandIdent, + LinuxNsPid: req.LinuxNsPid, + Params: req.Params, + } + + orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID) + db, err := mysql.GetDB(orgID.(int)) + if err != nil { + JsonResponse(c, nil, err) + return + } + agentID, err := getAgentID(c, db) + if err != nil { + BadRequestResponse(c, httpcommon.INVALID_PARAMETERS, err.Error()) + return + } + content, err := service.RunAgentCMD(a.cfg.AgentCommandTimeout, orgID.(int), agentID, &agentReq, req.CMD) + if err != nil { + InternalErrorResponse(c, content, httpcommon.SERVER_ERROR, err.Error()) + return + } + + if req.OutputFormat.String() == grpcapi.OutputFormat_TEXT.String() { + JsonResponse(c, content, nil) + return + } + sendAsFile(c, req.OutputFilename, bytes.NewBuffer([]byte(content))) + } +} + +func sendAsFile(c *gin.Context, fileName string, content *bytes.Buffer) { + c.Writer.Header().Set("Content-Type", "application/octet-stream") + if fileName != "" { + c.Writer.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename*=utf-8''%s", fileName)) + } + + if _, err := io.Copy(c.Writer, content); err != nil { + c.AbortWithStatus(http.StatusInternalServerError) + log.Error(err) + return + } +} diff --git a/server/controller/http/router/agent/logger.go b/server/controller/http/router/agent/logger.go new file mode 100644 index 000000000000..c8e610765960 --- /dev/null +++ b/server/controller/http/router/agent/logger.go @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package agent + +import ( + "github.com/deepflowio/deepflow/server/libs/logger" +) + +var log = logger.MustGetLogger("http.router.agent") diff --git a/server/controller/http/router/agent_cmd.go b/server/controller/http/router/vtap/agent_cmd.go similarity index 97% rename from server/controller/http/router/agent_cmd.go rename to server/controller/http/router/vtap/agent_cmd.go index 5100bd7ec4fc..f01e4e1c84a6 100644 --- a/server/controller/http/router/agent_cmd.go +++ b/server/controller/http/router/vtap/agent_cmd.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package router +package vtap import ( "bytes" @@ -26,6 +26,9 @@ import ( "net/url" "strconv" + "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" + "github.com/deepflowio/deepflow/message/trident" "github.com/deepflowio/deepflow/server/controller/common" "github.com/deepflowio/deepflow/server/controller/config" @@ -33,10 +36,7 @@ import ( mysqlmodel "github.com/deepflowio/deepflow/server/controller/db/mysql/model" httpcommon "github.com/deepflowio/deepflow/server/controller/http/common" . "github.com/deepflowio/deepflow/server/controller/http/router/common" - "github.com/deepflowio/deepflow/server/controller/http/service" - "github.com/deepflowio/deepflow/server/controller/model" - "github.com/gin-gonic/gin" - "github.com/gin-gonic/gin/binding" + service "github.com/deepflowio/deepflow/server/controller/http/service/vtap" ) const ( @@ -82,7 +82,7 @@ func NewAgentCMD(cfg *config.ControllerConfig) *AgentCMD { } func (a *AgentCMD) RegisterTo(e *gin.Engine) { - agentRoutes := e.Group("/v1/agent/:id-or-name") + agentRoutes := e.Group("/v1/vtap/:id-or-name") agentRoutes.GET("/cmd", forwardToServerConnectedByAgent(), a.getCMDAndNamespaceHandler()) agentRoutes.POST("/cmd/run", forwardToServerConnectedByAgent(), a.cmdRunHandler()) @@ -255,7 +255,7 @@ func getAgentID(c *gin.Context, db *mysql.DB) (int, error) { func (a *AgentCMD) cmdRunHandler() gin.HandlerFunc { return func(c *gin.Context) { - req := model.RemoteExecReq{} + req := service.RemoteExecReq{} if err := c.ShouldBindBodyWith(&req, binding.JSON); err != nil { BadRequestResponse(c, httpcommon.INVALID_PARAMETERS, err.Error()) return diff --git a/server/controller/http/router/vtap/logger.go b/server/controller/http/router/vtap/logger.go new file mode 100644 index 000000000000..d27cbf72c7ce --- /dev/null +++ b/server/controller/http/router/vtap/logger.go @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package vtap + +import ( + "fmt" + + "github.com/deepflowio/deepflow/server/libs/logger" +) + +var log = logger.MustGetLogger("http.router.agent") diff --git a/server/controller/http/server.go b/server/controller/http/server.go index 881c02e6c112..3ebf9ccc3228 100644 --- a/server/controller/http/server.go +++ b/server/controller/http/server.go @@ -30,7 +30,9 @@ import ( "github.com/deepflowio/deepflow/server/controller/http/appender" "github.com/deepflowio/deepflow/server/controller/http/common/registrant" "github.com/deepflowio/deepflow/server/controller/http/router" + "github.com/deepflowio/deepflow/server/controller/http/router/agent" "github.com/deepflowio/deepflow/server/controller/http/router/resource" + "github.com/deepflowio/deepflow/server/controller/http/router/vtap" "github.com/deepflowio/deepflow/server/controller/manager" "github.com/deepflowio/deepflow/server/controller/monitor" trouter "github.com/deepflowio/deepflow/server/controller/trisolaris/server/http" @@ -114,7 +116,6 @@ func (s *Server) appendRegistrant() []registrant.Registrant { router.NewPlugin(), router.NewMail(), router.NewDatabase(s.controllerConfig), - router.NewAgentCMD(s.controllerConfig), router.NewAgentGroupConfig(s.controllerConfig), // icon @@ -122,6 +123,9 @@ func (s *Server) appendRegistrant() []registrant.Registrant { // resource resource.NewDomain(s.controllerConfig), + + agent.NewAgentCMD(s.controllerConfig), + vtap.NewAgentCMD(s.controllerConfig), // TODO remove } // appends routers supported in CE or EE diff --git a/server/controller/http/service/agent/agent_cmd.go b/server/controller/http/service/agent/agent_cmd.go new file mode 100644 index 000000000000..abd4f84504c3 --- /dev/null +++ b/server/controller/http/service/agent/agent_cmd.go @@ -0,0 +1,414 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package agent + +import ( + "encoding/json" + "errors" + "fmt" + "sync" + "time" + + grpcapi "github.com/deepflowio/deepflow/message/agent" + ctrlcommon "github.com/deepflowio/deepflow/server/controller/common" + "github.com/deepflowio/deepflow/server/controller/db/mysql" + mysqlmodel "github.com/deepflowio/deepflow/server/controller/db/mysql/model" +) + +type RemoteExecReq struct { + grpcapi.RemoteExecRequest + + OutputFormat *grpcapi.OutputFormat `json:"output_format"` // 0: "TEXT", 1: "BINARY" + OutputFilename string `json:"output_filename"` + CMD string `json:"cmd" binding:"required"` +} + +type RemoteExecResp struct { + Content string `json:"content,omitempty"` // RUN_COMMAND + ErrorMessage string `json:"-"` + RemoteCommand []*grpcapi.RemoteCommand `json:"remote_commands,omitempty"` // LIST_COMMAND + LinuxNamespace []*grpcapi.LinuxNamespace `json:"linux_namespaces,omitempty"` // LIST_NAMESPACE +} + +var ( + agentCMDMutex sync.RWMutex + agentCMDManager = make(AgentCMDManager) +) + +type AgentCMDManager map[string]*CMDManager + +func AgentCommandLock() { + agentCMDMutex.Lock() +} + +func AgentCommandUnlock() { + agentCMDMutex.Unlock() +} + +func GetAgentCMDManager(key string) *CMDManager { + agentCMDMutex.RLock() + defer agentCMDMutex.RUnlock() + if manager, ok := agentCMDManager[key]; ok { + return manager + } + return nil +} + +func GetAgentCMDManagerWithoutLock(key string) *CMDManager { + if manager, ok := agentCMDManager[key]; ok { + return manager + } + return nil +} + +func AddToCMDManagerIfNotExist(key string, requestID uint64) *CMDManager { + agentCMDMutex.Lock() + defer agentCMDMutex.Unlock() + if _, ok := agentCMDManager[key]; ok { + return agentCMDManager[key] + } + + log.Infof("add agent(key:%s) to cmd manager", key) + agentCMDManager[key] = &CMDManager{ + requestID: requestID, + ExecCH: make(chan *grpcapi.RemoteExecRequest, 1), + + requestIDToResp: make(map[uint64]*CMDResp), + } + return agentCMDManager[key] +} + +func RemoveFromCMDManager(key string, requestID uint64) { + agentCMDMutex.Lock() + defer agentCMDMutex.Unlock() + if manager, ok := agentCMDManager[key]; ok { + delete(manager.requestIDToResp, requestID) + log.Infof("delete agent(key: %s, request id: %v) in manager", key, requestID) + } +} + +func RemoveAllFromCMDManager(key string) { + log.Infof("preparing to remove all agent(key: %s) from cmd manager", key) + agentCMDMutex.Lock() + defer agentCMDMutex.Unlock() + manager, ok := agentCMDManager[key] + if !ok { + log.Error("can not find agent command manager(key: %s)", key) + return + } + + for requestID, cmdResp := range manager.requestIDToResp { + errMessage := fmt.Sprintf("agent(key: %s) disconnected from the server", key) + AppendErrorMessage(key, requestID, &errMessage) + log.Error(errMessage) + close(cmdResp.ExecDoneCH) + close(cmdResp.RemoteCMDDoneCH) + close(cmdResp.LinuxNamespaceDoneCH) + } + close(manager.ExecCH) + delete(agentCMDManager, key) + log.Infof("delete agent(key: %s) in manager", key) +} + +type CMDManager struct { + requestID uint64 + ExecCH chan *grpcapi.RemoteExecRequest + requestIDToResp map[uint64]*CMDResp +} + +type CMDResp struct { + ExecDoneCH chan struct{} + RemoteCMDDoneCH chan struct{} + LinuxNamespaceDoneCH chan struct{} + + data *RemoteExecResp +} + +func NewAgentCMDResp(key string) (uint64, *CMDResp) { + agentCMDMutex.RLock() + defer agentCMDMutex.RUnlock() + if manager, ok := agentCMDManager[key]; ok { + manager.requestID += 1 + resp := &CMDResp{ + ExecDoneCH: make(chan struct{}, 1), + RemoteCMDDoneCH: make(chan struct{}, 1), + LinuxNamespaceDoneCH: make(chan struct{}, 1), + data: &RemoteExecResp{}, + } + manager.requestIDToResp[manager.requestID] = resp + return manager.requestID, resp + } + return 0, nil +} + +func GetAgentCMDResp(key string, requestID uint64) *CMDResp { + agentCMDMutex.RLock() + defer agentCMDMutex.RUnlock() + if manager, ok := agentCMDManager[key]; ok { + return manager.requestIDToResp[requestID] + } + return nil +} + +func GetAgentCMDRespWithoutLock(key string, requestID uint64) *CMDResp { + if manager, ok := agentCMDManager[key]; ok { + return manager.requestIDToResp[requestID] + } + return nil +} + +func RemoveAgentCMDResp(key string, requestID uint64) { + agentCMDMutex.RLock() + defer agentCMDMutex.RUnlock() + if manager, ok := agentCMDManager[key]; ok { + delete(manager.requestIDToResp, requestID) + } +} + +func GetRequestID(key string) uint64 { + agentCMDMutex.RLock() + defer agentCMDMutex.RUnlock() + if manager, ok := agentCMDManager[key]; ok { + return manager.requestID + } + return 0 +} + +func AppendCommands(key string, requestID uint64, data []*grpcapi.RemoteCommand) { + if manager, ok := agentCMDManager[key]; ok { + if resp, ok := manager.requestIDToResp[requestID]; ok { + resp.data.RemoteCommand = append(resp.data.RemoteCommand, data...) + } + } +} + +func InitCommands(key string, requestID uint64, data []*grpcapi.RemoteCommand) { + if manager, ok := agentCMDManager[key]; ok { + if resp, ok := manager.requestIDToResp[requestID]; ok { + resp.data.RemoteCommand = data + } + } +} + +func AppendNamespaces(key string, requestID uint64, data []*grpcapi.LinuxNamespace) { + if manager, ok := agentCMDManager[key]; ok { + if resp, ok := manager.requestIDToResp[requestID]; ok { + resp.data.LinuxNamespace = append(resp.data.LinuxNamespace, data...) + } + } +} + +func InitNamespaces(key string, requestID uint64, data []*grpcapi.LinuxNamespace) { + if manager, ok := agentCMDManager[key]; ok { + if resp, ok := manager.requestIDToResp[requestID]; ok { + resp.data.LinuxNamespace = data + } + } +} + +func AppendContent(key string, requestID uint64, data []byte) { + if manager, ok := agentCMDManager[key]; ok { + if resp, ok := manager.requestIDToResp[requestID]; ok { + resp.data.Content += string(data) + } + } +} + +func AppendErrorMessage(key string, requestID uint64, data *string) { + if manager, ok := agentCMDManager[key]; ok { + if resp, ok := manager.requestIDToResp[requestID]; ok { + resp.data.ErrorMessage = *data + } + } +} + +func GetErrormessage(key string, requestID uint64) string { + agentCMDMutex.RLock() + defer agentCMDMutex.RUnlock() + if manager, ok := agentCMDManager[key]; ok { + if resp, ok := manager.requestIDToResp[requestID]; ok { + return resp.data.ErrorMessage + } + } + return "" +} + +func GetContent(key string, requestID uint64) string { + agentCMDMutex.RLock() + defer agentCMDMutex.RUnlock() + if manager, ok := agentCMDManager[key]; ok { + if resp, ok := manager.requestIDToResp[requestID]; ok { + return resp.data.Content + } + } + return "" +} + +func GetCommands(key string, requestID uint64) []*grpcapi.RemoteCommand { + agentCMDMutex.RLock() + defer agentCMDMutex.RUnlock() + if manager, ok := agentCMDManager[key]; ok { + if resp, ok := manager.requestIDToResp[requestID]; ok { + return resp.data.RemoteCommand + } + } + return nil +} + +func GetCommandsWithoutLock(key string, requestID uint64) []*grpcapi.RemoteCommand { + if manager, ok := agentCMDManager[key]; ok { + if resp, ok := manager.requestIDToResp[requestID]; ok { + return resp.data.RemoteCommand + } + } + return nil +} + +func GetNamespaces(key string, requestID uint64) []*grpcapi.LinuxNamespace { + agentCMDMutex.RLock() + defer agentCMDMutex.RUnlock() + if manager, ok := agentCMDManager[key]; ok { + if resp, ok := manager.requestIDToResp[requestID]; ok { + return resp.data.LinuxNamespace + } + } + return nil +} + +func GetNamespacesWithoutLock(key string, requestID uint64) []*grpcapi.LinuxNamespace { + if manager, ok := agentCMDManager[key]; ok { + if resp, ok := manager.requestIDToResp[requestID]; ok { + return resp.data.LinuxNamespace + } + } + return nil +} + +func GetCMDAndNamespace(timeout, orgID, agentID int) (*RemoteExecResp, error) { + log.Infof("current node ip(%s) get cmd and namespace", ctrlcommon.NodeIP) + dbInfo, err := mysql.GetDB(orgID) + if err != nil { + return nil, err + } + var agent *mysqlmodel.VTap + if err := dbInfo.Where("id = ?", agentID).Find(&agent).Error; err != nil { + return nil, err + } + log.Infof("current node ip(%s) agent(cur controller ip: %s, controller ip: %s, id: %d, name: %s) get remote commands and linux namespaces", + ctrlcommon.NodeIP, agent.CurControllerIP, agent.ControllerIP, agentID, agent.Name, dbInfo.LogPrefixORGID) + + key := agent.CtrlIP + "-" + agent.CtrlMac + manager := GetAgentCMDManager(key) + requestID, cmdResp := NewAgentCMDResp(key) + if manager == nil || cmdResp == nil { + return nil, fmt.Errorf("agent(name: %s, key: %s) remote exec map not found", agent.Name, key) + } + defer RemoveAgentCMDResp(key, requestID) + + cmdReq := &grpcapi.RemoteExecRequest{ + RequestId: &requestID, + ExecType: grpcapi.ExecutionType_LIST_COMMAND.Enum(), + } + manager.ExecCH <- cmdReq + + cmdTimeout := time.After(time.Duration(timeout) * time.Second) + resp := &RemoteExecResp{} + for { + select { + case <-cmdTimeout: + // RemoveAllFromCMDManager(key) + return nil, fmt.Errorf("timeout(%vs) to get remote commands and linux namespace", timeout) + case _, ok := <-cmdResp.RemoteCMDDoneCH: + if !ok { + return nil, fmt.Errorf("%sagent(key: %s, name: %s) command manager is lost", key, agent.Name) + } + resp.RemoteCommand = GetCommands(key, requestID) + namespaceReq := &grpcapi.RemoteExecRequest{RequestId: &requestID, ExecType: grpcapi.ExecutionType_LIST_NAMESPACE.Enum()} + manager.ExecCH <- namespaceReq + case _, ok := <-cmdResp.LinuxNamespaceDoneCH: + if !ok { + return nil, fmt.Errorf("%sagent(key: %s, name: %s) command manager is lost", key, agent.Name) + } + resp.LinuxNamespace = GetNamespaces(key, requestID) + case _, ok := <-cmdResp.ExecDoneCH: // error occurred + if !ok { + return nil, fmt.Errorf("%sagent(key: %s, name: %s) command manager is lost", key, agent.Name) + } + if len(GetCommands(key, requestID)) != 0 { + return &RemoteExecResp{RemoteCommand: GetCommands(key, requestID)}, nil + } + log.Errorf("get agent(key: %s) remote commands error: %s", key, GetContent(key, requestID), dbInfo.LogPrefixORGID) + return nil, errors.New(key) + default: + if len(GetCommands(key, requestID)) != 0 && len(GetNamespaces(key, requestID)) != 0 { + log.Infof("len(commands)=%d, len(namespaces)=%d", + len(GetCommands(key, requestID)), len(GetNamespaces(key, requestID)), dbInfo.LogPrefixORGID) + return &RemoteExecResp{ + RemoteCommand: GetCommands(key, requestID), + LinuxNamespace: GetNamespaces(key, requestID), + }, nil + } + } + } +} + +func RunAgentCMD(timeout, orgID, agentID int, req *grpcapi.RemoteExecRequest, CMD string) (string, error) { + serverLog := fmt.Sprintf("The deepflow-server is unable to execute the `%s` command."+ + " Detailed error information is as follows:\n\n", CMD) + dbInfo, err := mysql.GetDB(orgID) + if err != nil { + return "", fmt.Errorf("%s%s", serverLog, err.Error()) + } + var agent *mysqlmodel.VTap + if err := dbInfo.Where("id = ?", agentID).Find(&agent).Error; err != nil { + return "", fmt.Errorf("%s%s", serverLog, err.Error()) + } + b, _ := json.Marshal(req) + log.Infof("current node ip(%s) agent(cur controller ip: %s, controller ip: %s, id: %d, name: %s) run remote command, request: %s", + ctrlcommon.NodeIP, agent.CurControllerIP, agent.ControllerIP, agentID, agent.Name, string(b), dbInfo.LogPrefixORGID) + key := agent.CtrlIP + "-" + agent.CtrlMac + manager := GetAgentCMDManager(key) + requestID, cmdResp := NewAgentCMDResp(key) + if manager == nil || cmdResp == nil { + return "", fmt.Errorf("agent(name: %s, key: %s) remote exec map not found", agent.Name, key) + } + defer RemoveAgentCMDResp(key, requestID) + req.RequestId = &requestID + manager.ExecCH <- req + + cmdTimeout := time.After(time.Duration(timeout) * time.Second) + content := "" + for { + select { + case <-cmdTimeout: + err = fmt.Errorf("%stimeout(%vs) to run agent command", serverLog, timeout) + log.Error(err, dbInfo.LogPrefixORGID) + return "", err + case _, ok := <-cmdResp.ExecDoneCH: + if !ok { + return "", fmt.Errorf("%sagent(key: %s, name: %s) command manager is lost", key, agent.Name) + } + if msg := GetErrormessage(key, requestID); msg != "" { + return GetContent(key, requestID), fmt.Errorf("The deepflow-agent is unable to execute the `%s` command."+ + " Detailed error information is as follows:\n\n%s", CMD, msg) + } + content = GetContent(key, requestID) + log.Infof("command run content len: %d", len(content), dbInfo.LogPrefixORGID) + return content, nil + } + } +} diff --git a/server/controller/http/service/agent/logger.go b/server/controller/http/service/agent/logger.go new file mode 100644 index 000000000000..af61e8e22ea6 --- /dev/null +++ b/server/controller/http/service/agent/logger.go @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package agent + +import ( + "github.com/deepflowio/deepflow/server/libs/logger" +) + +var log = logger.MustGetLogger("http.service.agent") diff --git a/server/controller/http/service/agent_cmd.go b/server/controller/http/service/vtap/agent_cmd.go similarity index 92% rename from server/controller/http/service/agent_cmd.go rename to server/controller/http/service/vtap/agent_cmd.go index b85730eab6c8..359fb896b128 100644 --- a/server/controller/http/service/agent_cmd.go +++ b/server/controller/http/service/vtap/agent_cmd.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package service +package vtap import ( "encoding/json" @@ -27,9 +27,23 @@ import ( ctrlcommon "github.com/deepflowio/deepflow/server/controller/common" "github.com/deepflowio/deepflow/server/controller/db/mysql" mysqlmodel "github.com/deepflowio/deepflow/server/controller/db/mysql/model" - "github.com/deepflowio/deepflow/server/controller/model" ) +type RemoteExecReq struct { + trident.RemoteExecRequest + + OutputFormat *trident.OutputFormat `json:"output_format"` // 0: "TEXT", 1: "BINARY" + OutputFilename string `json:"output_filename"` + CMD string `json:"cmd" binding:"required"` +} + +type RemoteExecResp struct { + Content string `json:"content,omitempty"` // RUN_COMMAND + ErrorMessage string `json:"-"` + RemoteCommand []*trident.RemoteCommand `json:"remote_commands,omitempty"` // LIST_COMMAND + LinuxNamespace []*trident.LinuxNamespace `json:"linux_namespaces,omitempty"` // LIST_NAMESPACE +} + var ( agentCMDMutex sync.RWMutex agentCMDManager = make(AgentCMDManager) @@ -121,7 +135,7 @@ type CMDResp struct { RemoteCMDDoneCH chan struct{} LinuxNamespaceDoneCH chan struct{} - data *model.RemoteExecResp + data *RemoteExecResp } func NewAgentCMDResp(key string) (uint64, *CMDResp) { @@ -133,7 +147,7 @@ func NewAgentCMDResp(key string) (uint64, *CMDResp) { ExecDoneCH: make(chan struct{}, 1), RemoteCMDDoneCH: make(chan struct{}, 1), LinuxNamespaceDoneCH: make(chan struct{}, 1), - data: &model.RemoteExecResp{}, + data: &RemoteExecResp{}, } manager.requestIDToResp[manager.requestID] = resp return manager.requestID, resp @@ -284,7 +298,7 @@ func GetNamespacesWithoutLock(key string, requestID uint64) []*trident.LinuxName return nil } -func GetCMDAndNamespace(timeout, orgID, agentID int) (*model.RemoteExecResp, error) { +func GetCMDAndNamespace(timeout, orgID, agentID int) (*RemoteExecResp, error) { log.Infof("current node ip(%s) get cmd and namespace", ctrlcommon.NodeIP) dbInfo, err := mysql.GetDB(orgID) if err != nil { @@ -312,7 +326,7 @@ func GetCMDAndNamespace(timeout, orgID, agentID int) (*model.RemoteExecResp, err manager.ExecCH <- cmdReq cmdTimeout := time.After(time.Duration(timeout) * time.Second) - resp := &model.RemoteExecResp{} + resp := &RemoteExecResp{} for { select { case <-cmdTimeout: @@ -335,7 +349,7 @@ func GetCMDAndNamespace(timeout, orgID, agentID int) (*model.RemoteExecResp, err return nil, fmt.Errorf("%sagent(key: %s, name: %s) command manager is lost", key, agent.Name) } if len(GetCommands(key, requestID)) != 0 { - return &model.RemoteExecResp{RemoteCommand: GetCommands(key, requestID)}, nil + return &RemoteExecResp{RemoteCommand: GetCommands(key, requestID)}, nil } log.Errorf("get agent(key: %s) remote commands error: %s", key, GetContent(key, requestID), dbInfo.LogPrefixORGID) return nil, errors.New(key) @@ -343,7 +357,7 @@ func GetCMDAndNamespace(timeout, orgID, agentID int) (*model.RemoteExecResp, err if len(GetCommands(key, requestID)) != 0 && len(GetNamespaces(key, requestID)) != 0 { log.Infof("len(commands)=%d, len(namespaces)=%d", len(GetCommands(key, requestID)), len(GetNamespaces(key, requestID)), dbInfo.LogPrefixORGID) - return &model.RemoteExecResp{ + return &RemoteExecResp{ RemoteCommand: GetCommands(key, requestID), LinuxNamespace: GetNamespaces(key, requestID), }, nil diff --git a/server/controller/model/model.go b/server/controller/model/model.go index 62cf54c7ea25..f6d7192eb9d8 100644 --- a/server/controller/model/model.go +++ b/server/controller/model/model.go @@ -19,7 +19,6 @@ package model import ( "time" - "github.com/deepflowio/deepflow/message/trident" "github.com/deepflowio/deepflow/server/agent_config" ) @@ -762,18 +761,3 @@ type MailServer struct { NtlmPassword string `json:"NTLM_PASSWORD"` Lcuuid string `json:"LCUUID"` } - -type RemoteExecReq struct { - trident.RemoteExecRequest - - OutputFormat *trident.OutputFormat `json:"output_format"` // 0: "TEXT", 1: "BINARY" - OutputFilename string `json:"output_filename"` - CMD string `json:"cmd" binding:"required"` -} - -type RemoteExecResp struct { - Content string `json:"content,omitempty"` // RUN_COMMAND - ErrorMessage string `json:"-"` - RemoteCommand []*trident.RemoteCommand `json:"remote_commands,omitempty"` // LIST_COMMAND - LinuxNamespace []*trident.LinuxNamespace `json:"linux_namespaces,omitempty"` // LIST_NAMESPACE -} diff --git a/server/controller/trisolaris/services/grpc/agentsynchronize/remote_execute.go b/server/controller/trisolaris/services/grpc/agentsynchronize/remote_execute.go index 3f6b8a05bcc9..b59df0950745 100644 --- a/server/controller/trisolaris/services/grpc/agentsynchronize/remote_execute.go +++ b/server/controller/trisolaris/services/grpc/agentsynchronize/remote_execute.go @@ -23,11 +23,16 @@ import ( "io" "runtime" "sync" + "time" - api "github.com/deepflowio/deepflow/message/agent" - "github.com/deepflowio/deepflow/message/trident" - "github.com/deepflowio/deepflow/server/controller/http/service" "google.golang.org/protobuf/proto" + + api "github.com/deepflowio/deepflow/message/agent" + service "github.com/deepflowio/deepflow/server/controller/http/service/agent" +) + +const ( + CMD_INACTIVITY_TIMEOUT = 1 * time.Minute ) func (e *AgentEvent) RemoteExecute(stream api.Synchronizer_RemoteExecuteServer) error { @@ -46,23 +51,52 @@ func (e *AgentEvent) RemoteExecute(stream api.Synchronizer_RemoteExecuteServer) ctx, cancel := context.WithCancel(stream.Context()) defer cancel() + errCH := make(chan error, 1) + go func() { defer func() { + log.Infof("agent(key: %s) remote exec stream receive goroutine done", key) wg.Done() if r := recover(); r != nil { buf := make([]byte, 2048) n := runtime.Stack(buf, false) - log.Errorf("recovered in RemoteExecute: %s", buf[:n]) + errMsg := fmt.Sprintf("recovered in RemoteExecute: %s", buf[:n]) + log.Errorf(errMsg) + errCH <- fmt.Errorf(errMsg) } }() + inactivityTimer := time.NewTimer(CMD_INACTIVITY_TIMEOUT) + defer inactivityTimer.Stop() + for { select { case <-ctx.Done(): - log.Infof("context done, agent(key: %s)", key) + log.Infof("context done, agent(key: %s), context err: %v", key, ctx.Err()) + return + case <-inactivityTimer.C: + errMsg := fmt.Errorf("no message received for %vs, closing connection for agent(key: %s)", + CMD_INACTIVITY_TIMEOUT.Seconds(), key) + log.Error(errMsg) + errCH <- errMsg return default: resp, err := stream.Recv() + // Handle any errors that occur during stream reception + // if server restart, an io.EOF error may be received + if err == io.EOF { + log.Errorf("agent(key: %s) command stream error: %v", key, err) + errCH <- err + return + } + // Attempt to stop the inactivity timer + if !inactivityTimer.Stop() { + // If the timer has already expired, drain the channel + <-inactivityTimer.C + } + // Reset the inactivity timer to its original duration + inactivityTimer.Reset(CMD_INACTIVITY_TIMEOUT) + if resp == nil { continue } @@ -81,6 +115,7 @@ func (e *AgentEvent) RemoteExecute(stream api.Synchronizer_RemoteExecuteServer) manager = service.AddToCMDManagerIfNotExist(key, uint64(1)) initDone <- struct{}{} } + service.AgentCommandLock() manager = service.GetAgentCMDManagerWithoutLock(key) if manager == nil { @@ -93,19 +128,12 @@ func (e *AgentEvent) RemoteExecute(stream api.Synchronizer_RemoteExecuteServer) if resp.CommandResult == nil && resp.LinuxNamespaces == nil && resp.Commands == nil && resp.Errmsg == nil { log.Infof("agent heart beat command response: %s", resp.String()) - manager.ExecCH <- &trident.RemoteExecRequest{RequestId: proto.Uint64(0)} + manager.ExecCH <- &api.RemoteExecRequest{RequestId: proto.Uint64(0)} service.AgentCommandUnlock() continue } if err != nil { - if err == io.EOF { - handleResponse(resp) - log.Infof("agent(key: %s) command exec get response finish", key) - service.AgentCommandUnlock() - continue - } - err := fmt.Errorf("agent(key: %s) command stream error: %v", key, err) log.Error(err) service.AgentCommandUnlock() @@ -119,6 +147,7 @@ func (e *AgentEvent) RemoteExecute(stream api.Synchronizer_RemoteExecuteServer) }() <-initDone + log.Infof("agent(key: %s) init done", key) if manager == nil { err := fmt.Errorf("get agent(key: %s) remote exec manager nil", key) log.Error(err) @@ -127,8 +156,11 @@ func (e *AgentEvent) RemoteExecute(stream api.Synchronizer_RemoteExecuteServer) for { select { case <-ctx.Done(): - log.Infof("context done, agent(key: %s)", key) - return nil + log.Infof("context done, agent(key: %s), context err: %v", key, ctx.Err()) + return ctx.Err() + case err := <-errCH: + log.Error(err) + return err case req, ok := <-manager.ExecCH: if !ok { err := fmt.Errorf("agent(key: %s) exec channel is closed", key) @@ -137,10 +169,7 @@ func (e *AgentEvent) RemoteExecute(stream api.Synchronizer_RemoteExecuteServer) } b, _ := json.Marshal(req) log.Infof("agent(key: %s) request: %s", key, string(b)) - agentReq := &api.RemoteExecRequest{ - RequestId: req.RequestId, - } - if err := stream.Send(agentReq); err != nil { + if err := stream.Send(req); err != nil { log.Errorf("send cmd to agent error: %s, req: %#v", err.Error(), req) return err } @@ -191,16 +220,16 @@ func handleResponse(resp *api.RemoteExecResponse) { return case len(resp.LinuxNamespaces) > 0: if len(service.GetNamespacesWithoutLock(key, *resp.RequestId)) > 0 { - //service.InitNamespaces(key, *resp.RequestId, resp.LinuxNamespaces) + service.InitNamespaces(key, *resp.RequestId, resp.LinuxNamespaces) } else { - //service.AppendNamespaces(key, *resp.RequestId, resp.LinuxNamespaces) + service.AppendNamespaces(key, *resp.RequestId, resp.LinuxNamespaces) } cmdResp.LinuxNamespaceDoneCH <- struct{}{} case len(resp.Commands) > 0: if len(service.GetCommandsWithoutLock(key, *resp.RequestId)) > 0 { - //service.InitCommands(key, *resp.RequestId, resp.Commands) + service.InitCommands(key, *resp.RequestId, resp.Commands) } else { - //service.AppendCommands(key, *resp.RequestId, resp.Commands) + service.AppendCommands(key, *resp.RequestId, resp.Commands) } cmdResp.RemoteCMDDoneCH <- struct{}{} default: diff --git a/server/controller/trisolaris/services/grpc/synchronize/remote_execute.go b/server/controller/trisolaris/services/grpc/synchronize/remote_execute.go index 88cf2d12e578..5db764b34e35 100644 --- a/server/controller/trisolaris/services/grpc/synchronize/remote_execute.go +++ b/server/controller/trisolaris/services/grpc/synchronize/remote_execute.go @@ -25,9 +25,8 @@ import ( "sync" "time" - "github.com/deepflowio/deepflow/message/trident" api "github.com/deepflowio/deepflow/message/trident" - "github.com/deepflowio/deepflow/server/controller/http/service" + service "github.com/deepflowio/deepflow/server/controller/http/service/vtap" "google.golang.org/protobuf/proto" ) @@ -177,7 +176,7 @@ func (e *VTapEvent) RemoteExecute(stream api.Synchronizer_RemoteExecuteServer) e } } -func handleResponse(resp *trident.RemoteExecResponse) { +func handleResponse(resp *api.RemoteExecResponse) { key := resp.AgentId.GetIp() + "-" + resp.AgentId.GetMac() if resp.RequestId == nil { log.Errorf("agent(key: %s) command resp request id not found", key, resp.RequestId)