Skip to content

Commit

Permalink
fix & refine
Browse files Browse the repository at this point in the history
Signed-off-by: xuleiming <xuleiming@yf-networks.com>
  • Loading branch information
xuleiming committed Dec 5, 2024
1 parent 2c381e1 commit 799050a
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 91 deletions.
184 changes: 105 additions & 79 deletions bfe_modules/mod_wasmplugin/plugin_rule_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,111 +52,137 @@ type FilterRule struct {
type RuleList []FilterRule
type ProductRules map[string]RuleList // product => list of filter rules

func updatePluginConf(t *PluginTable, conf PluginConfFile, pluginPath string) error {
if conf.Version != nil && *conf.Version != t.GetVersion() {
pluginMapNew := make(map[string]bfe_wasmplugin.WasmPlugin)
var beforeLocationRulesNew RuleList
productRulesNew := make(ProductRules)
type PluginMap map[string]bfe_wasmplugin.WasmPlugin

// 1. check plugin map
unchanged := make(map[string]bool)
func buildRuleList(rules []FilterRuleFile, pluginMap PluginMap) (RuleList, error) {
var rulelist RuleList

pm := t.GetPluginMap()
if conf.PluginMap != nil {
for pn, p := range *conf.PluginMap {
plugOld := pm[pn]
// check whether plugin version changed.
if plugOld != nil {
configOld := plugOld.GetConfig()
if configOld.WasmVersion == p.WasmVersion && configOld.ConfigVersion == p.ConfVersion {
// not change, just copy to new map
pluginMapNew[pn] = plugOld

// ensure instance num
for _, r := range rules {
rule := FilterRule{}
cond, err := condition.Build(*r.Cond)
if err != nil {
return nil, err
}

rule.Cond =cond

for _, pn := range *r.PluginList {
plug := pluginMap[pn]
if plug == nil {
return nil, fmt.Errorf("unknown plugin: %s", pn)
}
rule.PluginList = append(rule.PluginList, plug)
}

rulelist = append(rulelist, rule)
}

return rulelist, nil
}

func buildNewPluginMap(conf *map[string]PluginMeta, pmOld PluginMap,
pluginPath string) (pmNew PluginMap, unchanged map[string]bool, err error) {

pmNew = PluginMap{}
unchanged = map[string]bool{}

if conf != nil {
for pn, p := range *conf {
plugOld := pmOld[pn]
// check whether plugin version changed.
if plugOld != nil {
configOld := plugOld.GetConfig()
if configOld.WasmVersion == p.WasmVersion && configOld.ConfigVersion == p.ConfVersion {
// not change, just copy to new map
pmNew[pn] = plugOld

// grow instance num if needed
if p.InstanceNum > plugOld.InstanceNum() {
actual := plugOld.EnsureInstanceNum(p.InstanceNum)
if actual != p.InstanceNum {
return fmt.Errorf("can not EnsureInstanceNum, plugin:%s, num:%d", pn, p.InstanceNum)
err = fmt.Errorf("can not EnsureInstanceNum, plugin:%s, num:%d", pn, p.InstanceNum)
return
}

unchanged[pn] = true
continue
}

unchanged[pn] = true
continue
}
// if changed, construct a new plugin.
wasmconf := bfe_wasmplugin.WasmPluginConfig {
PluginName: pn,
WasmVersion: p.WasmVersion,
ConfigVersion: p.ConfVersion,
InstanceNum: p.InstanceNum,
Path: path.Join(pluginPath, pn),
// Md5: p.Md5,
}
plug, err := bfe_wasmplugin.NewWasmPlugin(wasmconf)
if err != nil {
// build plugin error
return err
}
}
// if changed, construct a new plugin.
wasmconf := bfe_wasmplugin.WasmPluginConfig {
PluginName: pn,
WasmVersion: p.WasmVersion,
ConfigVersion: p.ConfVersion,
InstanceNum: p.InstanceNum,
Path: path.Join(pluginPath, pn),
}
plug, err1 := bfe_wasmplugin.NewWasmPlugin(wasmconf)
if err1 != nil {
// build plugin error
err = err1
return
}

// plug.OnPluginStart()
pmNew[pn] = plug
}
}

return
}

pluginMapNew[pn] = plug
func cleanPlugins(pm PluginMap, unchanged map[string]bool, conf *map[string]PluginMeta) {
for pn, plug := range pm {
if unchanged[pn] {
// shink instance num if needed
confnum := (*conf)[pn].InstanceNum
if plug.InstanceNum() > confnum {
plug.EnsureInstanceNum(confnum)
}
} else {
// stop plug
plug.OnPluginDestroy()
plug.Clear()
}
}
}

func updatePluginConf(t *PluginTable, conf PluginConfFile, pluginPath string) error {
if conf.Version != nil && *conf.Version != t.GetVersion() {

// 1. check plugin map
pm := t.GetPluginMap()
pluginMapNew, unchanged, err := buildNewPluginMap(conf.PluginMap, pm, pluginPath)
if err != nil {
return err
}

// 2. construct product rules
var beforeLocationRulesNew RuleList
if conf.BeforeLocationRules != nil {
for _, r := range *conf.BeforeLocationRules {
rule := FilterRule{}
cond, err := condition.Build(*r.Cond)
if err != nil {
return err
}
rule.Cond =cond
for _, pn := range *r.PluginList {
plug := pluginMapNew[pn]
if plug == nil {
return fmt.Errorf("unknown plugin: %s", pn)
}
rule.PluginList = append(rule.PluginList, plug)
}
beforeLocationRulesNew = append(beforeLocationRulesNew, rule)
if rulelist, err := buildRuleList(*conf.BeforeLocationRules, pluginMapNew); err == nil {
beforeLocationRulesNew = rulelist
} else {
return err
}
}

productRulesNew := make(ProductRules)
if conf.FoundProductRules != nil {
for product, rules := range *conf.FoundProductRules {
var rulelist RuleList
for _, r := range rules {
rule := FilterRule{}
cond, err := condition.Build(*r.Cond)
if err != nil {
return err
}
rule.Cond =cond
for _, pn := range *r.PluginList {
plug := pluginMapNew[pn]
if plug == nil {
return fmt.Errorf("unknown plugin: %s", pn)
}
rule.PluginList = append(rule.PluginList, plug)
}
rulelist = append(rulelist, rule)
if rulelist, err := buildRuleList(rules, pluginMapNew); err == nil {
productRulesNew[product] = rulelist
} else {
return err
}
productRulesNew[product] = rulelist
}
}

// 3. update PluginTable
t.Update(*conf.Version, beforeLocationRulesNew, productRulesNew, pluginMapNew)

// 4. stop & clear old plugins
for pn, plug := range pm {
if _, ok := unchanged[pn]; !ok {
// stop plug
plug.OnPluginDestroy()
plug.Clear()
}
}
// 4. stop & clean old plugins
cleanPlugins(pm, unchanged, conf.PluginMap)
}
return nil
}
Expand Down
16 changes: 7 additions & 9 deletions bfe_modules/mod_wasmplugin/plugin_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,24 @@ package mod_wasmplugin

import (
"sync"

"github.com/bfenetworks/bfe/bfe_wasmplugin"
)

type PluginTable struct {
lock sync.RWMutex
version string
lock sync.RWMutex
version string
beforeLocationRules RuleList
productRules ProductRules
pluginMap map[string]bfe_wasmplugin.WasmPlugin
productRules ProductRules
pluginMap PluginMap
}

func NewPluginTable() *PluginTable {
t := new(PluginTable)
t.productRules = make(ProductRules)
t.pluginMap = make(map[string]bfe_wasmplugin.WasmPlugin)
t.pluginMap = make(PluginMap)
return t
}

func (t *PluginTable) Update(version string, beforeLocationRules RuleList, productRules ProductRules, pluginMap map[string]bfe_wasmplugin.WasmPlugin) {
func (t *PluginTable) Update(version string, beforeLocationRules RuleList, productRules ProductRules, pluginMap PluginMap) {
t.lock.Lock()

t.version = version
Expand All @@ -52,7 +50,7 @@ func (t *PluginTable) GetVersion() string {
return t.version
}

func (t *PluginTable) GetPluginMap() map[string]bfe_wasmplugin.WasmPlugin {
func (t *PluginTable) GetPluginMap() PluginMap {
defer t.lock.RUnlock()
t.lock.RLock()
return t.pluginMap
Expand Down
20 changes: 17 additions & 3 deletions bfe_wasmplugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,12 @@ func (w *wasmPluginImpl) EnsureInstanceNum(num int) int {
}

// Instantiate any ABI needed by the guest.
for _, abi := range wasmABI.GetABIList(instance) {
abilist := wasmABI.GetABIList(instance)
if len(abilist) == 0 {
log.Logger.Error("[wasm][plugin] EnsureInstanceNum fail to get abilist, i: %v", i)
break
}
for _, abi := range abilist {
//abi.OnInstanceCreate(instance)
if err := instance.RegisterImports(abi.Name()); err != nil {
panic(err)
Expand All @@ -253,7 +258,10 @@ func (w *wasmPluginImpl) EnsureInstanceNum(num int) int {
continue
}

w.OnInstanceStart(instance)
if !w.OnInstanceStart(instance) {
log.Logger.Error("[wasm][plugin] EnsureInstanceNum fail on instance start, i: %v", i)
break
}
newInstance = append(newInstance, instance)
}

Expand Down Expand Up @@ -324,7 +332,13 @@ func (w *wasmPluginImpl) ReleaseInstance(instance common.WasmInstance) {
}

func (w *wasmPluginImpl) OnInstanceStart(instance common.WasmInstance) bool {
abi := wasmABI.GetABIList(instance)[0]
abilist := wasmABI.GetABIList(instance)
if len(abilist) == 0 {
log.Logger.Error("[proxywasm][factory] instance has no correct abi list")
return false
}

abi := abilist[0]
var exports v1Host.Exports
if abi != nil {
// v1
Expand Down

0 comments on commit 799050a

Please sign in to comment.