Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rft : Extract priority router #630

Merged
merged 1 commit into from
Jun 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cluster/directory/base_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
return
}

routers := make([]router.Router, 0, len(urls))
routers := make([]router.PriorityRouter, 0, len(urls))

for _, url := range urls {
routerKey := url.GetParam(constant.ROUTER_KEY, "")

if len(routerKey) > 0 {
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewRouter(url)
r, err := factory.NewPriorityRouter(url)
if err != nil {
logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
return
Expand Down
30 changes: 20 additions & 10 deletions cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,21 @@ type RouterChain struct {
// Full list of addresses from registry, classified by method name.
invokers []protocol.Invoker
// Containing all routers, reconstruct every time 'route://' urls change.
routers []router.Router
routers []router.PriorityRouter
// Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the rule for each instance may change but the
// instance will never delete or recreate.
builtinRouters []router.Router
builtinRouters []router.PriorityRouter

mutex sync.RWMutex

url common.URL
}

// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(invoker []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
finalInvokers := invoker
l := len(c.routers)
rs := make([]router.Router, l, int(math.Ceil(float64(l)*1.2)))
rs := make([]router.PriorityRouter, l, int(math.Ceil(float64(l)*1.2)))
c.mutex.RLock()
copy(rs, c.routers)
c.mutex.RUnlock()
Expand All @@ -67,8 +69,8 @@ func (c *RouterChain) Route(invoker []protocol.Invoker, url *common.URL, invocat
// New a array add builtinRouters which is not sorted in RouterChain and routers
// Sort the array
// Replace router array in RouterChain
func (c *RouterChain) AddRouters(routers []router.Router) {
newRouters := make([]router.Router, 0, len(c.builtinRouters)+len(routers))
func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
newRouters := make([]router.PriorityRouter, 0, len(c.builtinRouters)+len(routers))
newRouters = append(newRouters, c.builtinRouters...)
newRouters = append(newRouters, routers...)
sortRouter(newRouters)
Expand All @@ -77,24 +79,29 @@ func (c *RouterChain) AddRouters(routers []router.Router) {
c.routers = newRouters
}

// URL Return URL in RouterChain
func (c *RouterChain) URL() common.URL {
return c.url
}

// NewRouterChain Use url to init router chain
// Loop routerFactories and call NewRouter method
func NewRouterChain(url *common.URL) (*RouterChain, error) {
routerFactories := extension.GetRouterFactories()
if len(routerFactories) == 0 {
return nil, perrors.Errorf("No routerFactory exits , create one please")
}
routers := make([]router.Router, 0, len(routerFactories))
routers := make([]router.PriorityRouter, 0, len(routerFactories))
for key, routerFactory := range routerFactories {
r, err := routerFactory().NewRouter(url)
r, err := routerFactory().NewPriorityRouter(url)
if r == nil || err != nil {
logger.Errorf("router chain build router fail! routerFactories key:%s error:%s", key, err.Error())
continue
}
routers = append(routers, r)
}

newRouters := make([]router.Router, len(routers))
newRouters := make([]router.PriorityRouter, len(routers))
copy(newRouters, routers)

sortRouter(newRouters)
Expand All @@ -103,17 +110,20 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
builtinRouters: routers,
routers: newRouters,
}
if url != nil {
chain.url = *url
}

return chain, nil
}

// sortRouter Sort router instance by priority with stable algorithm
func sortRouter(routers []router.Router) {
func sortRouter(routers []router.PriorityRouter) {
sort.Stable(byPriority(routers))
}

// byPriority Sort by priority
type byPriority []router.Router
type byPriority []router.PriorityRouter

func (a byPriority) Len() int { return len(a) }
func (a byPriority) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
Expand Down
4 changes: 2 additions & 2 deletions cluster/router/chain/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ conditions:
url := getConditionRouteUrl("test-condition")
assert.NotNil(t, url)
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewRouter(url)
r, err := factory.NewPriorityRouter(url)
assert.Nil(t, err)
assert.NotNil(t, r)

routers := make([]router.Router, 0)
routers := make([]router.PriorityRouter, 0)
routers = append(routers, r)
chain.AddRouters(routers)
assert.Equal(t, 3, len(chain.routers))
Expand Down
25 changes: 25 additions & 0 deletions cluster/router/chan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package router

// Chain
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more annotation please. help user to use it

type Chain interface {
router
// AddRouters Add routers
AddRouters([]PriorityRouter)
}
14 changes: 7 additions & 7 deletions cluster/router/condition/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,28 @@ func init() {
// ConditionRouterFactory Condition router factory
type ConditionRouterFactory struct{}

func newConditionRouterFactory() router.RouterFactory {
func newConditionRouterFactory() router.PriorityRouterFactory {
return &ConditionRouterFactory{}
}

// NewRouter Create ConditionRouterFactory by URL
func (c *ConditionRouterFactory) NewRouter(url *common.URL) (router.Router, error) {
// NewPriorityRouter creates ConditionRouterFactory by URL
func (c *ConditionRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) {
return NewConditionRouter(url)
}

// NewRouter Create FileRouterFactory by Content
func (c *ConditionRouterFactory) NewFileRouter(content []byte) (router.Router, error) {
func (c *ConditionRouterFactory) NewFileRouter(content []byte) (router.PriorityRouter, error) {
return NewFileConditionRouter(content)
}

// AppRouterFactory Application router factory
type AppRouterFactory struct{}

func newAppRouterFactory() router.RouterFactory {
func newAppRouterFactory() router.PriorityRouterFactory {
return &AppRouterFactory{}
}

// NewRouter Create AppRouterFactory by URL
func (c *AppRouterFactory) NewRouter(url *common.URL) (router.Router, error) {
// NewPriorityRouter creates AppRouterFactory by URL
func (c *AppRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) {
return NewAppRouter(url)
}
50 changes: 25 additions & 25 deletions cluster/router/condition/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,32 +121,32 @@ func (bi *MockInvoker) Destroy() {
func TestRouteMatchWhen(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4"))
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
cUrl, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService")
matchWhen := router.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, true, matchWhen)
rule1 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router1, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule1))
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1))
matchWhen1 := router1.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, true, matchWhen1)
rule2 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4"))
router2, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule2))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2))
matchWhen2 := router2.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, false, matchWhen2)
rule3 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router3, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule3))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3))
matchWhen3 := router3.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, true, matchWhen3)
rule4 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router4, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule4))
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4))
matchWhen4 := router4.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, true, matchWhen4)
rule5 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.1 => host = 1.2.3.4"))
router5, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule5))
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5))
matchWhen5 := router5.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, false, matchWhen5)
rule6 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.2 => host = 1.2.3.4"))
router6, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule6))
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6))
matchWhen6 := router6.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, true, matchWhen6)
}
Expand All @@ -164,12 +164,12 @@ func TestRouteMatchFilter(t *testing.T) {
rule4 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.2,10.20.3.3,10.20.3.4"))
rule5 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host != 10.20.3.3"))
rule6 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " serialization = fastjson"))
router1, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule1))
router2, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule2))
router3, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule3))
router4, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule4))
router5, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule5))
router6, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule6))
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3))
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4))
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5))
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6))
cUrl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
fileredInvokers1 := router1.Route(invokers, &cUrl, &invocation.RPCInvocation{})
fileredInvokers2 := router2.Route(invokers, &cUrl, &invocation.RPCInvocation{})
Expand All @@ -189,7 +189,7 @@ func TestRouteMatchFilter(t *testing.T) {
func TestRouteMethodRoute(t *testing.T) {
inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{}))
rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
url, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo")
matchWhen := router.(*ConditionRouter).MatchWhen(&url, inv)
assert.Equal(t, true, matchWhen)
Expand All @@ -198,12 +198,12 @@ func TestRouteMethodRoute(t *testing.T) {
assert.Equal(t, true, matchWhen)
url2, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=getFoo")
rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4"))
router2, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule2))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2))
matchWhen = router2.(*ConditionRouter).MatchWhen(&url2, inv)
assert.Equal(t, false, matchWhen)
url3, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=getFoo")
rule3 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host=1.1.1.1 => host = 1.2.3.4"))
router3, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule3))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3))
matchWhen = router3.(*ConditionRouter).MatchWhen(&url3, inv)
assert.Equal(t, true, matchWhen)

Expand All @@ -216,7 +216,7 @@ func TestRouteReturnFalse(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false"))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 0, len(fileredInvokers))
}
Expand All @@ -228,7 +228,7 @@ func TestRouteReturnEmpty(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => "))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 0, len(fileredInvokers))
}
Expand All @@ -244,7 +244,7 @@ func TestRouteReturnAll(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, invokers, fileredInvokers)
}
Expand All @@ -261,7 +261,7 @@ func TestRouteHostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
Expand All @@ -280,7 +280,7 @@ func TestRouteEmptyHostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
Expand All @@ -299,7 +299,7 @@ func TestRouteFalseHostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
Expand All @@ -318,7 +318,7 @@ func TestRoutePlaceholder(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host"))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
Expand All @@ -337,7 +337,7 @@ func TestRouteNoForce(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4"))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrlWithNoForce(rule))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, invokers, fileredInvokers)
}
Expand All @@ -354,7 +354,7 @@ func TestRouteForce(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4"))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrlWithForce(rule, "true"))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true"))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 0, len(fileredInvokers))
}
Expand Down
6 changes: 3 additions & 3 deletions cluster/router/healthcheck/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ type HealthCheckRouteFactory struct {
}

// newHealthCheckRouteFactory construct a new HealthCheckRouteFactory
func newHealthCheckRouteFactory() router.RouterFactory {
func newHealthCheckRouteFactory() router.PriorityRouterFactory {
return &HealthCheckRouteFactory{}
}

// NewRouter construct a new NewHealthCheckRouter via url
func (f *HealthCheckRouteFactory) NewRouter(url *common.URL) (router.Router, error) {
// NewPriorityRouter construct a new NewHealthCheckRouter via url
func (f *HealthCheckRouteFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) {
return NewHealthCheckRouter(url)
}
2 changes: 1 addition & 1 deletion cluster/router/healthcheck/health_check_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type HealthCheckRouter struct {
}

// NewHealthCheckRouter construct an HealthCheckRouter via url
func NewHealthCheckRouter(url *common.URL) (router.Router, error) {
func NewHealthCheckRouter(url *common.URL) (router.PriorityRouter, error) {
r := &HealthCheckRouter{
url: url,
enabled: url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false),
Expand Down
Loading