From b03372dbae9c0ebb73641e2d6d6a8e5a7715b692 Mon Sep 17 00:00:00 2001 From: zhuofeng Date: Fri, 30 Aug 2024 11:30:41 +0800 Subject: [PATCH] fix: resource label errors for non-default organization resource change events --- server/ingester/event/decoder/decoder.go | 49 ++-- .../event/decoder/grpc_resource_info.go | 218 ------------------ server/libs/grpc/grpc_platformdata.go | 90 +++++++- 3 files changed, 108 insertions(+), 249 deletions(-) delete mode 100644 server/ingester/event/decoder/grpc_resource_info.go diff --git a/server/ingester/event/decoder/decoder.go b/server/ingester/event/decoder/decoder.go index a73f94ba844..8e770ff5e40 100644 --- a/server/ingester/event/decoder/decoder.go +++ b/server/ingester/event/decoder/decoder.go @@ -57,15 +57,14 @@ type Counter struct { } type Decoder struct { - index int - eventType common.EventType - resourceInfoTable *ResourceInfoTable - platformData *grpc.PlatformInfoTable - inQueue queue.QueueReader - eventWriter *dbwriter.EventWriter - exporters *exporters.Exporters - debugEnabled bool - config *config.Config + index int + eventType common.EventType + platformData *grpc.PlatformInfoTable + inQueue queue.QueueReader + eventWriter *dbwriter.EventWriter + exporters *exporters.Exporters + debugEnabled bool + config *config.Config orgId, teamId uint16 @@ -89,20 +88,15 @@ func NewDecoder( controllers[i] = controllers[i].To4() } } - var resourceInfoTable *ResourceInfoTable - if eventType == common.RESOURCE_EVENT { - resourceInfoTable = NewResourceInfoTable(controllers, int(config.Base.ControllerPort), config.Base.GrpcBufferSize) - } return &Decoder{ - eventType: eventType, - resourceInfoTable: resourceInfoTable, - platformData: platformData, - inQueue: inQueue, - debugEnabled: log.IsEnabledFor(logging.DEBUG), - eventWriter: eventWriter, - exporters: exporters, - config: config, - counter: &Counter{}, + eventType: eventType, + platformData: platformData, + inQueue: inQueue, + debugEnabled: log.IsEnabledFor(logging.DEBUG), + eventWriter: eventWriter, + exporters: exporters, + config: config, + counter: &Counter{}, } } @@ -114,9 +108,6 @@ func (d *Decoder) GetCounter() interface{} { func (d *Decoder) Run() { log.Infof("event (%s) decoder run", d.eventType) - if d.resourceInfoTable != nil { - d.resourceInfoTable.Start() - } ingestercommon.RegisterCountableForIngester("decoder", d, stats.OptionStatTags{ "event_type": d.eventType.String()}) buffer := make([]interface{}, BUFFER_SIZE) @@ -343,11 +334,11 @@ func (d *Decoder) handleResourceEvent(event *eventapi.ResourceEvent) { podGroupType := uint8(0) if event.IfNeedTagged { s.Tagged = 1 - resourceInfo := d.resourceInfoTable.QueryResourceInfo(event.InstanceType, event.InstanceID) + resourceInfo := d.platformData.QueryResourceInfo(s.OrgId, event.InstanceType, event.InstanceID) if resourceInfo != nil { s.RegionID = uint16(resourceInfo.RegionID) s.AZID = uint16(resourceInfo.AZID) - s.L3EpcID = resourceInfo.L3EpcID + s.L3EpcID = resourceInfo.EpcID s.HostID = uint16(resourceInfo.HostID) s.PodID = resourceInfo.PodID s.PodNodeID = resourceInfo.PodNodeID @@ -355,8 +346,8 @@ func (d *Decoder) handleResourceEvent(event *eventapi.ResourceEvent) { s.PodClusterID = uint16(resourceInfo.PodClusterID) s.PodGroupID = resourceInfo.PodGroupID podGroupType = resourceInfo.PodGroupType - s.L3DeviceType = uint8(resourceInfo.L3DeviceType) - s.L3DeviceID = resourceInfo.L3DeviceID + s.L3DeviceType = uint8(resourceInfo.DeviceType) + s.L3DeviceID = resourceInfo.DeviceID } } else { s.Tagged = 0 diff --git a/server/ingester/event/decoder/grpc_resource_info.go b/server/ingester/event/decoder/grpc_resource_info.go deleted file mode 100644 index 796637ee87a..00000000000 --- a/server/ingester/event/decoder/grpc_resource_info.go +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright (c) 2024 Yunshan Networks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package decoder - -import ( - "fmt" - "net" - - "github.com/gogo/protobuf/proto" - "golang.org/x/net/context" - - "github.com/deepflowio/deepflow/message/trident" - "github.com/deepflowio/deepflow/server/libs/datatype" - "github.com/deepflowio/deepflow/server/libs/grpc" -) - -func (p *ResourceInfoTable) QueryResourceInfo(resourceType uint32, resourceID uint32) *ResourceInfo { - switch trident.DeviceType(resourceType) { - case trident.DeviceType_DEVICE_TYPE_POD: - return p.podInfos[resourceID] - case trident.DeviceType_DEVICE_TYPE_POD_NODE: - return p.podNodeInfos[resourceID] - case trident.DeviceType_DEVICE_TYPE_HOST_DEVICE: - return p.hostInfos[resourceID] - case trident.DeviceType_DEVICE_TYPE_VM: - return p.vmInfos[resourceID] - default: - return p.resourceInfos[uint64(resourceType)<<32|uint64(resourceID)] - } - -} - -func (p *ResourceInfoTable) Close() { - p.GrpcSession.Close() -} - -func (p *ResourceInfoTable) Start() { - p.GrpcSession.Start() -} - -type ResourceInfo struct { - L3EpcID int32 - HostID uint32 - RegionID uint32 - L3DeviceType uint32 - L3DeviceID uint32 - PodNodeID uint32 - PodNSID uint32 - PodGroupID uint32 - PodGroupType uint8 // no need to store - PodID uint32 - PodClusterID uint32 - AZID uint32 -} - -type ResourceInfoTable struct { - ctlIP string - GrpcSession *grpc.GrpcSession - versionPlatformData uint64 - - resourceInfos map[uint64]*ResourceInfo - podInfos map[uint32]*ResourceInfo - podNodeInfos map[uint32]*ResourceInfo - hostInfos map[uint32]*ResourceInfo - vmInfos map[uint32]*ResourceInfo -} - -func NewResourceInfoTable(ips []net.IP, port, rpcMaxMsgSize int) *ResourceInfoTable { - info := &ResourceInfoTable{ - GrpcSession: &grpc.GrpcSession{}, - resourceInfos: make(map[uint64]*ResourceInfo), - podInfos: make(map[uint32]*ResourceInfo), - podNodeInfos: make(map[uint32]*ResourceInfo), - hostInfos: make(map[uint32]*ResourceInfo), - vmInfos: make(map[uint32]*ResourceInfo), - } - runOnce := func() { - if err := info.Reload(); err != nil { - log.Warning(err) - } - } - info.GrpcSession.Init(ips, uint16(port), grpc.DEFAULT_SYNC_INTERVAL, rpcMaxMsgSize, runOnce) - info.Reload() - log.Infof("New ResourceInfoTable ips:%v port:%d rpcMaxMsgSize:%d", ips, port, rpcMaxMsgSize) - return info -} - -func (p *ResourceInfoTable) Reload() error { - var response *trident.SyncResponse - err := p.GrpcSession.Request(func(ctx context.Context, remote net.IP) error { - var err error - if p.ctlIP == "" { - var local net.IP - // 根据remote ip获取本端ip - if local, err = grpc.Lookup(remote); err != nil { - return err - } - p.ctlIP = local.String() - } - - request := trident.SyncRequest{ - VersionPlatformData: proto.Uint64(p.versionPlatformData), - CtrlIp: proto.String(p.ctlIP), - ProcessName: proto.String("resource-info-watcher"), - } - c := p.GrpcSession.GetClient() - if c == nil { - return fmt.Errorf("can't get grpc client to %s", remote) - } - client := trident.NewSynchronizerClient(c) - response, err = client.AnalyzerSync(ctx, &request) - return err - }) - if err != nil { - return err - } - - if status := response.GetStatus(); status != trident.Status_SUCCESS { - return fmt.Errorf("grpc resource response failed. responseStatus is %v", status) - } - - newVersion := response.GetVersionPlatformData() - if newVersion == p.versionPlatformData { - return nil - } - - platformData := trident.PlatformData{} - if plarformCompressed := response.GetPlatformData(); plarformCompressed != nil { - if err := platformData.Unmarshal(plarformCompressed); err != nil { - log.Warningf("unmarshal grpc compressed platformData failed as %v", err) - return err - } - } - - resourceInfos := make(map[uint64]*ResourceInfo) - podInfos := make(map[uint32]*ResourceInfo) - podNodeInfos := make(map[uint32]*ResourceInfo) - hostInfos := make(map[uint32]*ResourceInfo) - vmInfos := make(map[uint32]*ResourceInfo) - for _, intf := range platformData.GetInterfaces() { - updateResourceInfos(resourceInfos, podInfos, podNodeInfos, hostInfos, vmInfos, intf) - } - p.resourceInfos = resourceInfos - p.podInfos = podInfos - p.podNodeInfos = podNodeInfos - p.hostInfos = hostInfos - p.vmInfos = vmInfos - - log.Infof("Event update rpc platformdata version %d -> %d", p.versionPlatformData, newVersion) - - return nil -} - -func updateResourceInfos(reourceInfos map[uint64]*ResourceInfo, podInfos, podNodeInfos, hostInfos, vmInfos map[uint32]*ResourceInfo, intf *trident.Interface) { - epcID := intf.GetEpcId() - if epcID == 0 { - tmp := datatype.EPC_FROM_DEEPFLOW - epcID = uint32(tmp) - } - deviceType := intf.GetDeviceType() - deviceID := intf.GetDeviceId() - podID := intf.GetPodId() - podNodeID := intf.GetPodNodeId() - hostID := intf.GetLaunchServerId() - - info := &ResourceInfo{ - L3EpcID: int32(epcID), - HostID: hostID, - RegionID: intf.GetRegionId(), - L3DeviceType: deviceType, - L3DeviceID: deviceID, - PodNodeID: podNodeID, - PodNSID: intf.GetPodNsId(), - PodGroupID: intf.GetPodGroupId(), - PodGroupType: uint8(intf.GetPodGroupType()), - PodID: podID, - PodClusterID: intf.GetPodClusterId(), - AZID: intf.GetAzId(), - } - reourceInfos[uint64(deviceType)<<32|uint64(deviceID)] = info - podInfos[podID] = info - - nodeInfo := *info - nodeInfo.PodID = 0 - nodeInfo.PodNSID = 0 - nodeInfo.PodGroupID = 0 - nodeInfo.PodGroupType = 0 - nodeInfo.PodClusterID = 0 - podNodeInfos[podNodeID] = &nodeInfo - - hostInfo := *info - hostInfo.PodNodeID = 0 - hostInfo.PodID = 0 - hostInfo.PodNSID = 0 - hostInfo.PodGroupID = 0 - nodeInfo.PodGroupType = 0 - hostInfo.PodClusterID = 0 - hostInfos[hostID] = &hostInfo - - if deviceType == uint32(trident.DeviceType_DEVICE_TYPE_VM) { - vmInfo := hostInfo - vmInfos[deviceID] = &vmInfo - } -} diff --git a/server/libs/grpc/grpc_platformdata.go b/server/libs/grpc/grpc_platformdata.go index cc8401a7fa0..18739c9f671 100644 --- a/server/libs/grpc/grpc_platformdata.go +++ b/server/libs/grpc/grpc_platformdata.go @@ -195,6 +195,12 @@ type PlatformInfoTable struct { peerConnections [MAX_ORG_COUNT]map[int32][]int32 + // resource event infos + resourceInfos [MAX_ORG_COUNT]map[uint64]*Info + podNodeInfos [MAX_ORG_COUNT]map[uint32]*Info + hostInfos [MAX_ORG_COUNT]map[uint32]*Info + vmInfos [MAX_ORG_COUNT]map[uint32]*Info + *GrpcSession counter *Counter @@ -378,6 +384,21 @@ func (t *PlatformInfoTable) QueryService(orgId uint16, podID, podNodeID, podClus return t.ServiceTable[orgId].QueryService(podID, podNodeID, podClusterID, podGroupID, epcID, isIPv6, ipv4, ipv6, protocol, serverPort) } +func (t *PlatformInfoTable) QueryResourceInfo(orgId uint16, resourceType uint32, resourceID uint32) *Info { + switch trident.DeviceType(resourceType) { + case trident.DeviceType_DEVICE_TYPE_POD: + return t.podIDInfos[orgId][resourceID] + case trident.DeviceType_DEVICE_TYPE_POD_NODE: + return t.podNodeInfos[orgId][resourceID] + case trident.DeviceType_DEVICE_TYPE_HOST_DEVICE: + return t.hostInfos[orgId][resourceID] + case trident.DeviceType_DEVICE_TYPE_VM: + return t.vmInfos[orgId][resourceID] + default: + return t.resourceInfos[orgId][uint64(resourceType)<<32|uint64(resourceID)] + } +} + type PlatformDataManager struct { masterTable *PlatformInfoTable tableLock sync.Mutex @@ -479,6 +500,11 @@ func NewPlatformInfoTable(ips []net.IP, port, index, rpcMaxMsgSize int, moduleNa table.containerMissCount[i] = make(map[string]*uint64) table.containerHitCount[i] = make(map[string]*uint64) table.peerConnections[i] = make(map[int32][]int32) + + table.resourceInfos[i] = make(map[uint64]*Info) + table.podNodeInfos[i] = make(map[uint32]*Info) + table.hostInfos[i] = make(map[uint32]*Info) + table.vmInfos[i] = make(map[uint32]*Info) } runOnce := func() { @@ -1042,8 +1068,17 @@ func (t *PlatformInfoTable) updatePlatformData(orgId uint16, platformData *tride newEpcIDIPV6CidrInfos := make(map[int32][]*CidrInfo) newEpcIDPodInfos := make(map[uint32]*Info) + newResourceInfos := make(map[uint64]*Info) + newPodNodeInfos := make(map[uint32]*Info) + newHostInfos := make(map[uint32]*Info) + newVmInfos := make(map[uint32]*Info) + for _, intf := range platformData.GetInterfaces() { - updateInterfaceInfos(newEpcIDIPV4Infos, newEpcIDIPV6Infos, newMacInfos, newEpcIDBaseInfos, newEpcIDPodInfos, intf) + updateInterfaceInfos(newEpcIDIPV4Infos, newEpcIDIPV6Infos, + newMacInfos, newResourceInfos, + newEpcIDBaseInfos, + newEpcIDPodInfos, newPodNodeInfos, newHostInfos, newVmInfos, + intf) } for _, cidr := range platformData.GetCidrs() { updateCidrInfos(newEpcIDIPV4CidrInfos, newEpcIDIPV6CidrInfos, newEpcIDBaseInfos, cidr) @@ -1074,6 +1109,11 @@ func (t *PlatformInfoTable) updatePlatformData(orgId uint16, platformData *tride t.containerMissCount[orgId] = make(map[string]*uint64) t.podIDInfosPlatformData[orgId] = newEpcIDPodInfos + + t.resourceInfos[orgId] = newResourceInfos + t.podNodeInfos[orgId] = newPodNodeInfos + t.hostInfos[orgId] = newHostInfos + t.vmInfos[orgId] = newVmInfos } func (t *PlatformInfoTable) updateOthers(orgId uint16, response *trident.SyncResponse) { @@ -1141,6 +1181,11 @@ func (t *PlatformInfoTable) ReloadSlave(orgId uint16) error { t.versionPlatformData[orgId] = newVersion t.otherRegionCount[orgId] = 0 + + t.resourceInfos[orgId] = masterTable.resourceInfos[orgId] + t.podNodeInfos[orgId] = masterTable.podNodeInfos[orgId] + t.hostInfos[orgId] = masterTable.hostInfos[orgId] + t.vmInfos[orgId] = masterTable.vmInfos[orgId] } t.vtapIdInfos[orgId] = masterTable.vtapIdInfos[orgId] t.orgIds = masterTable.orgIds @@ -1417,7 +1462,7 @@ func updateCidrInfos(IPV4CidrInfos, IPV6CidrInfos map[int32][]*CidrInfo, epcIDBa } } -func updateInterfaceInfos(epcIDIPV4Infos map[uint64]*Info, epcIDIPV6Infos map[[EpcIDIPV6_LEN]byte]*Info, macInfos map[uint64]*Info, epcIDBaseInfos map[int32]*BaseInfo, podIDInfos map[uint32]*Info, intf *trident.Interface) { +func updateInterfaceInfos(epcIDIPV4Infos map[uint64]*Info, epcIDIPV6Infos map[[EpcIDIPV6_LEN]byte]*Info, macInfos, resourceInfos map[uint64]*Info, epcIDBaseInfos map[int32]*BaseInfo, podIDInfos, podNodeInfos, hostInfos, vmInfos map[uint32]*Info, intf *trident.Interface) { // intf.GetEpcId() in range (0,64000], when convert to int32, 0 need convert to datatype.EPC_FROM_INTERNET epcID := int32(intf.GetEpcId()) // 由于doc中epcID为-2,对应trisolaris的epcID为0.故在此统一将收到epcID为0的,修改为-2,便于doc数据查找 @@ -1562,6 +1607,47 @@ func updateInterfaceInfos(epcIDIPV4Infos map[uint64]*Info, epcIDIPV6Infos map[[E HitCount: new(uint64), } } + + resourceInfo := &Info{ + EpcID: epcID, + HostID: hostID, + RegionID: regionID, + DeviceType: deviceType, + DeviceID: deviceID, + PodNodeID: podNodeID, + PodNSID: podNSID, + PodGroupID: podGroupID, + PodGroupType: podGroupType, + PodID: podID, + PodClusterID: podClusterID, + AZID: azID, + } + resourceInfos[uint64(deviceType)<<32|uint64(deviceID)] = resourceInfo + + podNodeInfo := &Info{ + EpcID: epcID, + HostID: hostID, + RegionID: regionID, + DeviceType: deviceType, + DeviceID: deviceID, + PodNodeID: podNodeID, + AZID: azID, + } + podNodeInfos[podNodeID] = podNodeInfo + + hostInfo := &Info{ + EpcID: epcID, + HostID: hostID, + RegionID: regionID, + DeviceType: deviceType, + DeviceID: deviceID, + AZID: azID, + } + hostInfos[hostID] = hostInfo + + if deviceType == uint32(trident.DeviceType_DEVICE_TYPE_VM) { + vmInfos[deviceID] = hostInfo + } } func (t *PlatformInfoTable) QueryVtapEpc0(orgId, vtapId uint16) int32 {