Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: multiplexing support for database plugins #14033

Merged
merged 23 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b1037ba
feat: DB plugin multiplexing (#13734)
fairclothjm Feb 3, 2022
4782d45
feat: plugin multiplexing: handle plugin client cleanup (#13896)
fairclothjm Feb 10, 2022
25620a1
feature: multiplexing: fix unit tests (#14007)
fairclothjm Feb 11, 2022
c4ec473
feat: multiplexing: handle v5 plugin compiled with new sdk
fairclothjm Feb 11, 2022
51eabad
add mux supported flag and increase test coverage
fairclothjm Feb 11, 2022
2fc2de5
set multiplexingSupport field in plugin server
fairclothjm Feb 11, 2022
774606a
remove multiplexingSupport field in sdk
fairclothjm Feb 12, 2022
82813ee
revert postgres to non-multiplexed
fairclothjm Feb 12, 2022
a8e4cba
add comments on grpc server fields
fairclothjm Feb 12, 2022
f4a11ed
use pointer receiver on grpc server methods
fairclothjm Feb 14, 2022
909f23a
add changelog
fairclothjm Feb 14, 2022
efaa9e7
use pointer for grpcserver instance
fairclothjm Feb 14, 2022
1a5a609
Use a gRPC server to determine if a plugin should be multiplexed
Feb 14, 2022
c0aaec2
Apply suggestions from code review
fairclothjm Feb 15, 2022
b18bdc6
add lock to removePluginClient
fairclothjm Feb 15, 2022
a46478c
add multiplexingSupport field to externalPlugin struct
fairclothjm Feb 15, 2022
189382d
do not send nil to grpc MultiplexingSupport
fairclothjm Feb 15, 2022
49fab35
Merge branch 'feature-multiplexing-server' into feature-multiplexing
fairclothjm Feb 15, 2022
451d9c7
check err before logging
fairclothjm Feb 15, 2022
baa01f0
handle locking scenario for cleanupFunc
fairclothjm Feb 15, 2022
d4d20c7
allow ServeConfigMultiplex to dispense v5 plugin
fairclothjm Feb 15, 2022
3c9cd0d
reposition structs, add err check and comments
fairclothjm Feb 16, 2022
c60d273
add comment on locking for cleanupExternalPlugin
fairclothjm Feb 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions builtin/logical/database/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func Backend(conf *logical.BackendConfig) *databaseBackend {
}

type databaseBackend struct {
// connections holds configured database connections by config name
connections map[string]*dbPluginInstance
logger log.Logger

Expand Down
5 changes: 5 additions & 0 deletions builtin/logical/database/path_config_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ func (b *databaseBackend) connectionWriteHandler() framework.OperationFunc {
}
config.ConnectionDetails = initResp.Config

b.Logger().Debug("created database object", "name", name, "plugin_name", config.PluginName)

b.Lock()
defer b.Unlock()

Expand Down Expand Up @@ -365,6 +367,9 @@ func (b *databaseBackend) connectionWriteHandler() framework.OperationFunc {
"Vault (or the sdk if using a custom plugin) to gain password policy support", config.PluginName))
}

if len(resp.Warnings) == 0 {
return nil, nil
}
return resp, nil
}
}
Expand Down
3 changes: 3 additions & 0 deletions changelog/14033.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
**Database plugin multiplexing**: manage multiple database connections with a single plugin process
```
4 changes: 2 additions & 2 deletions helper/builtinplugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ type registry struct {
logicalBackends map[string]logical.Factory
}

// Get returns the BuiltinFactory func for a particular backend plugin
// from the plugins map.
// Get returns the Factory func for a particular backend plugin from the
// plugins map.
func (r *registry) Get(name string, pluginType consts.PluginType) (func() (interface{}, error), bool) {
switch pluginType {
case consts.PluginTypeCredential:
Expand Down
1 change: 0 additions & 1 deletion plugins/database/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ var (
singleQuotedPhrases = regexp.MustCompile(`('.*?')`)
)

// New implements builtinplugins.BuiltinFactory
func New() (interface{}, error) {
db := new()
// Wrap the plugin with middleware to sanitize errors
Expand Down
23 changes: 19 additions & 4 deletions sdk/database/dbplugin/v5/grpc_database_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ import (
// a plugin and host. If the handshake fails, a user friendly error is shown.
// This prevents users from executing bad plugins or executing a plugin
// directory. It is a UX feature, not a security feature.
var handshakeConfig = plugin.HandshakeConfig{
ProtocolVersion: 5,
var HandshakeConfig = plugin.HandshakeConfig{
MagicCookieKey: "VAULT_DATABASE_PLUGIN",
MagicCookieValue: "926a0820-aea2-be28-51d6-83cdf00e8edb",
}

// Factory is the factory function to create a dbplugin Database.
type Factory func() (interface{}, error)

type GRPCDatabasePlugin struct {
Impl Database
FactoryFunc Factory
Impl Database

// Embeding this will disable the netRPC protocol
plugin.NetRPCUnsupportedPlugin
Expand All @@ -31,7 +34,19 @@ var (
)

func (d GRPCDatabasePlugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error {
proto.RegisterDatabaseServer(s, gRPCServer{impl: d.Impl})
var server gRPCServer

if d.Impl != nil {
server = gRPCServer{singleImpl: d.Impl}
} else {
// multiplexing is supported
server = gRPCServer{
factoryFunc: d.FactoryFunc,
instances: make(map[string]Database),
}
}

proto.RegisterDatabaseServer(s, &server)
return nil
}

Expand Down
155 changes: 141 additions & 14 deletions sdk/database/dbplugin/v5/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,121 @@ package dbplugin
import (
"context"
"fmt"
"sync"
"time"

"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/vault/sdk/database/dbplugin/v5/proto"
"github.com/hashicorp/vault/sdk/helper/pluginutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

var _ proto.DatabaseServer = gRPCServer{}
var _ proto.DatabaseServer = &gRPCServer{}

type gRPCServer struct {
proto.UnimplementedDatabaseServer

impl Database
// holds the non-multiplexed Database
// when this is set the plugin does not support multiplexing
singleImpl Database

// instances holds the multiplexed Databases
instances map[string]Database
factoryFunc func() (interface{}, error)

sync.RWMutex
}

func getMultiplexIDFromContext(ctx context.Context) (string, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", fmt.Errorf("missing plugin multiplexing metadata")
}

multiplexIDs := md[pluginutil.MultiplexingCtxKey]
if len(multiplexIDs) != 1 {
return "", fmt.Errorf("unexpected number of IDs in metadata: (%d)", len(multiplexIDs))
}

multiplexID := multiplexIDs[0]
if multiplexID == "" {
return "", fmt.Errorf("empty multiplex ID in metadata")
}

return multiplexID, nil
}

func (g *gRPCServer) getOrCreateDatabase(ctx context.Context) (Database, error) {
g.Lock()
defer g.Unlock()

if g.singleImpl != nil {
return g.singleImpl, nil
}

id, err := getMultiplexIDFromContext(ctx)
if err != nil {
return nil, err
}

if db, ok := g.instances[id]; ok {
return db, nil
}

db, err := g.factoryFunc()
if err != nil {
return nil, err
}

database := db.(Database)
g.instances[id] = database

return database, nil
}

// getDatabaseInternal returns the database but does not hold a lock
func (g *gRPCServer) getDatabaseInternal(ctx context.Context) (Database, error) {
if g.singleImpl != nil {
return g.singleImpl, nil
}

id, err := getMultiplexIDFromContext(ctx)
if err != nil {
return nil, err
}

if db, ok := g.instances[id]; ok {
return db, nil
}

return nil, fmt.Errorf("no database instance found")
}

// getDatabase holds a read lock and returns the database
func (g *gRPCServer) getDatabase(ctx context.Context) (Database, error) {
g.RLock()
impl, err := g.getDatabaseInternal(ctx)
g.RUnlock()
return impl, err
}

// Initialize the database plugin
func (g gRPCServer) Initialize(ctx context.Context, request *proto.InitializeRequest) (*proto.InitializeResponse, error) {
func (g *gRPCServer) Initialize(ctx context.Context, request *proto.InitializeRequest) (*proto.InitializeResponse, error) {
impl, err := g.getOrCreateDatabase(ctx)
if err != nil {
return nil, err
}

rawConfig := structToMap(request.ConfigData)

dbReq := InitializeRequest{
Config: rawConfig,
VerifyConnection: request.VerifyConnection,
}

dbResp, err := g.impl.Initialize(ctx, dbReq)
dbResp, err := impl.Initialize(ctx, dbReq)
if err != nil {
return &proto.InitializeResponse{}, status.Errorf(codes.Internal, "failed to initialize: %s", err)
}
Expand All @@ -45,7 +134,7 @@ func (g gRPCServer) Initialize(ctx context.Context, request *proto.InitializeReq
return resp, nil
}

func (g gRPCServer) NewUser(ctx context.Context, req *proto.NewUserRequest) (*proto.NewUserResponse, error) {
func (g *gRPCServer) NewUser(ctx context.Context, req *proto.NewUserRequest) (*proto.NewUserResponse, error) {
if req.GetUsernameConfig() == nil {
return &proto.NewUserResponse{}, status.Errorf(codes.InvalidArgument, "missing username config")
}
Expand All @@ -60,6 +149,11 @@ func (g gRPCServer) NewUser(ctx context.Context, req *proto.NewUserRequest) (*pr
expiration = exp
}

impl, err := g.getDatabase(ctx)
if err != nil {
return nil, err
}

dbReq := NewUserRequest{
UsernameConfig: UsernameMetadata{
DisplayName: req.GetUsernameConfig().GetDisplayName(),
Expand All @@ -71,7 +165,7 @@ func (g gRPCServer) NewUser(ctx context.Context, req *proto.NewUserRequest) (*pr
RollbackStatements: getStatementsFromProto(req.GetRollbackStatements()),
}

dbResp, err := g.impl.NewUser(ctx, dbReq)
dbResp, err := impl.NewUser(ctx, dbReq)
if err != nil {
return &proto.NewUserResponse{}, status.Errorf(codes.Internal, "unable to create new user: %s", err)
}
Expand All @@ -82,7 +176,7 @@ func (g gRPCServer) NewUser(ctx context.Context, req *proto.NewUserRequest) (*pr
return resp, nil
}

func (g gRPCServer) UpdateUser(ctx context.Context, req *proto.UpdateUserRequest) (*proto.UpdateUserResponse, error) {
func (g *gRPCServer) UpdateUser(ctx context.Context, req *proto.UpdateUserRequest) (*proto.UpdateUserResponse, error) {
if req.GetUsername() == "" {
return &proto.UpdateUserResponse{}, status.Errorf(codes.InvalidArgument, "no username provided")
}
Expand All @@ -92,7 +186,12 @@ func (g gRPCServer) UpdateUser(ctx context.Context, req *proto.UpdateUserRequest
return &proto.UpdateUserResponse{}, status.Errorf(codes.InvalidArgument, err.Error())
}

_, err = g.impl.UpdateUser(ctx, dbReq)
impl, err := g.getDatabase(ctx)
if err != nil {
return nil, err
}

_, err = impl.UpdateUser(ctx, dbReq)
if err != nil {
return &proto.UpdateUserResponse{}, status.Errorf(codes.Internal, "unable to update user: %s", err)
}
Expand Down Expand Up @@ -144,7 +243,7 @@ func hasChange(dbReq UpdateUserRequest) bool {
return false
}

func (g gRPCServer) DeleteUser(ctx context.Context, req *proto.DeleteUserRequest) (*proto.DeleteUserResponse, error) {
func (g *gRPCServer) DeleteUser(ctx context.Context, req *proto.DeleteUserRequest) (*proto.DeleteUserResponse, error) {
if req.GetUsername() == "" {
return &proto.DeleteUserResponse{}, status.Errorf(codes.InvalidArgument, "no username provided")
}
Expand All @@ -153,15 +252,25 @@ func (g gRPCServer) DeleteUser(ctx context.Context, req *proto.DeleteUserRequest
Statements: getStatementsFromProto(req.GetStatements()),
}

_, err := g.impl.DeleteUser(ctx, dbReq)
impl, err := g.getDatabase(ctx)
if err != nil {
return nil, err
}

_, err = impl.DeleteUser(ctx, dbReq)
if err != nil {
return &proto.DeleteUserResponse{}, status.Errorf(codes.Internal, "unable to delete user: %s", err)
}
return &proto.DeleteUserResponse{}, nil
}

func (g gRPCServer) Type(ctx context.Context, _ *proto.Empty) (*proto.TypeResponse, error) {
t, err := g.impl.Type()
func (g *gRPCServer) Type(ctx context.Context, _ *proto.Empty) (*proto.TypeResponse, error) {
impl, err := g.getOrCreateDatabase(ctx)
if err != nil {
return nil, err
}

t, err := impl.Type()
if err != nil {
return &proto.TypeResponse{}, status.Errorf(codes.Internal, "unable to retrieve type: %s", err)
}
Expand All @@ -172,11 +281,29 @@ func (g gRPCServer) Type(ctx context.Context, _ *proto.Empty) (*proto.TypeRespon
return resp, nil
}

func (g gRPCServer) Close(ctx context.Context, _ *proto.Empty) (*proto.Empty, error) {
err := g.impl.Close()
func (g *gRPCServer) Close(ctx context.Context, _ *proto.Empty) (*proto.Empty, error) {
g.Lock()
defer g.Unlock()

impl, err := g.getDatabaseInternal(ctx)
if err != nil {
return nil, err
}

err = impl.Close()
if err != nil {
return &proto.Empty{}, status.Errorf(codes.Internal, "unable to close database plugin: %s", err)
}

if g.singleImpl == nil {
// only cleanup instances map when multiplexing is supported
id, err := getMultiplexIDFromContext(ctx)
if err != nil {
return nil, err
}
delete(g.instances, id)
}

return &proto.Empty{}, nil
}

Expand Down
Loading