Skip to content

Commit

Permalink
[#33]: feature: check the status of the rabbitmq connection
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Nov 25, 2022
2 parents 1d49051 + 98842e8 commit 5927764
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 19 deletions.
6 changes: 6 additions & 0 deletions amqpjobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
requeueOnFail string = "requeue_on_fail"

// new in 2.12
redialTimeout string = "redial_timeout"
exchangeDurable string = "exchange_durable"
exchangeAutoDelete string = "exchange_auto_delete"
queueAutoDelete string = "queue_auto_delete"
Expand Down Expand Up @@ -44,6 +45,7 @@ type config struct {
ExchangeDurable bool `mapstructure:"exchange_durable"`
ExchangeAutoDelete bool `mapstructure:"exchange_auto_delete"`
QueueAutoDelete bool `mapstructure:"queue_auto_delete"`
RedialTimeout int `mapstructure:"redial_timeout"`

RoutingKey string `mapstructure:"routing_key"`
ConsumeAll bool `mapstructure:"consume_all"`
Expand All @@ -68,6 +70,10 @@ func (c *config) InitDefault() {
c.Queue = "default"
}

if c.RedialTimeout == 0 {
c.RedialTimeout = 60
}

if c.Prefetch == 0 {
c.Prefetch = 10
}
Expand Down
62 changes: 45 additions & 17 deletions amqpjobs/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v3/plugins/jobs"
"github.com/roadrunner-server/sdk/v3/plugins/jobs/pipeline"
"github.com/roadrunner-server/sdk/v3/plugins/status"
priorityqueue "github.com/roadrunner-server/sdk/v3/priority_queue"
"github.com/roadrunner-server/sdk/v3/utils"
"go.uber.org/zap"
Expand All @@ -31,7 +32,7 @@ const (
)

type Consumer struct {
sync.Mutex
mu sync.Mutex
log *zap.Logger
pq priorityqueue.Queue
pipeline atomic.Pointer[pipeline.Pipeline]
Expand Down Expand Up @@ -113,9 +114,8 @@ func NewAMQPConsumer(configKey string, log *zap.Logger, cfg Configurer, pq prior
stopCh: make(chan struct{}, 1),
consumeAll: conf.ConsumeAll,

retryTimeout: time.Minute,
priority: conf.Priority,
delayed: utils.Int64(0),
priority: conf.Priority,
delayed: utils.Int64(0),

publishChan: make(chan *amqp.Channel, 1),
stateChan: make(chan *amqp.Channel, 1),
Expand All @@ -137,6 +137,8 @@ func NewAMQPConsumer(configKey string, log *zap.Logger, cfg Configurer, pq prior
multipleAck: conf.MultipleAck,
requeueOnFail: conf.RequeueOnFail,

// 2.12
retryTimeout: time.Duration(conf.RedialTimeout) * time.Second,
exchangeAutoDelete: conf.ExchangeAutoDelete,
exchangeDurable: conf.ExchangeDurable,
queueAutoDelete: conf.QueueAutoDelete,
Expand Down Expand Up @@ -205,12 +207,11 @@ func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg Configurer,
}

jb := &Consumer{
log: log,
pq: pq,
consumeID: uuid.NewString(),
stopCh: make(chan struct{}, 1),
retryTimeout: time.Minute,
delayed: utils.Int64(0),
log: log,
pq: pq,
consumeID: uuid.NewString(),
stopCh: make(chan struct{}, 1),
delayed: utils.Int64(0),

publishChan: make(chan *amqp.Channel, 1),
stateChan: make(chan *amqp.Channel, 1),
Expand All @@ -235,8 +236,9 @@ func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg Configurer,
requeueOnFail: pipeline.Bool(requeueOnFail, false),

// new in 2.12
retryTimeout: time.Duration(pipeline.Int(redialTimeout, 60)) * time.Second,
exchangeAutoDelete: pipeline.Bool(exchangeAutoDelete, false),
exchangeDurable: pipeline.Bool(exchangeAutoDelete, false),
exchangeDurable: pipeline.Bool(exchangeDurable, false),
queueAutoDelete: pipeline.Bool(queueAutoDelete, false),
}

Expand Down Expand Up @@ -315,8 +317,8 @@ func (c *Consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
}

// protect connection (redial)
c.Lock()
defer c.Unlock()
c.mu.Lock()
defer c.mu.Unlock()

var err error
c.consumeChan, err = c.conn.Channel()
Expand Down Expand Up @@ -399,8 +401,8 @@ func (c *Consumer) Pause(_ context.Context, p string) {
atomic.AddUint32(&c.listeners, ^uint32(0))

// protect connection (redial)
c.Lock()
defer c.Unlock()
c.mu.Lock()
defer c.mu.Unlock()

err := c.consumeChan.Cancel(c.consumeID, true)
if err != nil {
Expand All @@ -424,8 +426,8 @@ func (c *Consumer) Resume(_ context.Context, p string) {
}

// protect connection (redial)
c.Lock()
defer c.Unlock()
c.mu.Lock()
defer c.mu.Unlock()

l := atomic.LoadUint32(&c.listeners)
// no active listeners
Expand Down Expand Up @@ -481,6 +483,32 @@ func (c *Consumer) Stop(context.Context) error {
return nil
}

func (c *Consumer) Status() (*status.Status, error) {
c.mu.Lock()
defer c.mu.Unlock()

ch, err := c.conn.Channel()
if err != nil {
return nil, err
}

defer func() {
_ = ch.Close()
}()

_, err = ch.QueueInspect(c.queue)
if err != nil {
c.log.Error("queue inspect", zap.Error(err))
return &status.Status{
Code: 500,
}, nil
}

return &status.Status{
Code: 200,
}, nil
}

// handleItem
func (c *Consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("rabbitmq_handle_item")
Expand Down
4 changes: 2 additions & 2 deletions amqpjobs/redial.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ func (c *Consumer) reset() {
func (c *Consumer) redialMergeCh() {
go func() {
for err := range c.redialCh {
c.Lock()
c.mu.Lock()
c.redial(err)
c.Unlock()
c.mu.Unlock()
}
}()
}
Expand Down

0 comments on commit 5927764

Please sign in to comment.