Skip to content

Commit

Permalink
Merge pull request #4541 from StackStorm/rabbitmq_connection_ssl_options
Browse files Browse the repository at this point in the history
Support for SSL client cert authentication and server certificate validation when connecting to RabbitMQ
  • Loading branch information
Kami authored Feb 12, 2019
2 parents 456b3e0 + 28f6809 commit 09fc032
Show file tree
Hide file tree
Showing 62 changed files with 929 additions and 116 deletions.
30 changes: 18 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,20 @@ addons:
- mongodb-upstart
- sourceline: 'deb [arch=amd64] http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.4 multiverse'
key_url: 'https://www.mongodb.org/static/pgp/server-3.4.asc'
# NOTE: Precise repo doesn't contain Erlang 20.x, latest version is 19.x so we need to use RabbitMQ 3.7.6
#- sourceline: 'deb [arch=amd64] http://packages.erlang-solutions.com/ubuntu precise contrib'
# key_url: 'https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc'
#- sourceline: 'deb [arch=amd64] https://dl.bintray.com/rabbitmq/debian precise rabbitmq-server-v3.6.x'
# key_url: '/~https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc'
- sourceline: 'ppa:git-core/ppa'
packages:
- mongodb-org-server
- mongodb-org-shell
# NOTE: Uncomment below for Xenial
# -rabbitmq-server
- erlang
- rabbitmq-server
- git
- libffi-dev

# NOTE: Remove / comment services section below for Xenial
services:
- rabbitmq

cache:
pip: true
directories:
Expand All @@ -105,22 +106,27 @@ install:
# Let's enable rabbitmqadmin
# See /~https://github.com/messagebus/lapine/wiki/Testing-on-Travis.
before_script:
# key_url no longer works for APT addon
# Use a custom mongod.conf which uses various speed optimizations
- sudo cp scripts/travis/mongod.conf /etc/mongod.conf
# Clean up any old MongoDB 3.4 data files laying around and make sure mongodb user can write to it
- sudo rm -rf /var/lib/mongodb ; sudo mkdir /var/lib/mongodb ; sudo chown -R mongodb:mongodb /var/lib/mongodb
- sudo service mongod restart ; sleep 5
- sudo service mongod status
- sudo tail -30 /var/log/mongodb/mongod.log
- mongod --version
- git --version
- pip --version
- virtualenv --version
- sudo tail -n 30 /var/log/mongodb/mongod.log
# Use custom RabbitMQ config which enables SSL / TLS listener on port 5671 with test certs
- sudo cp scripts/travis/rabbitmq.config /etc/rabbitmq/rabbitmq.config
# Install rabbitmq_management RabbitMQ plugin
- sudo service rabbitmq-server restart ; sleep 5
- sudo rabbitmq-plugins enable rabbitmq_management
- sudo wget http://guest:guest@localhost:15672/cli/rabbitmqadmin -O /usr/local/bin/rabbitmqadmin
- sudo chmod +x /usr/local/bin/rabbitmqadmin
- sudo service rabbitmq-server restart
- sudo tail -n 30 /var/log/rabbitmq/*
# Print various binary versions
- mongod --version
- git --version
- pip --version
- virtualenv --version
# Print out various environment variables info
- make play

Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ Added

For backward compatibility reasons, if pack metadata file doesn't contain that attribute, it's
assumed it only works with Python 2. (new feature) #4474
* Add support for various new SSL / TLS related config options (``ssl_keyfile``, ``ssl_certfile``,
``ssl_ca_certs``, ``ssl_certfile``, ``authentication_mechanism``) to the ``messaging`` section in
``st2.conf`` config file.

With those config options, user can configure things such as client based certificate
authentication, client side verification of a server certificate against a specific CA bundle, etc.

NOTE: Those options are only supported when using a default and officially supported AMQP backend
with RabbitMQ server. (new feature) #4541

Changed
~~~~~~~
Expand Down
20 changes: 16 additions & 4 deletions conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,26 @@ mask_secrets_blacklist = # comma separated list allowed here.
mask_secrets = True

[messaging]
# URL of the messaging server.
url = amqp://guest:guest@127.0.0.1:5672//
# How long should we wait between connection retries.
connection_retry_wait = 10000
# Certificate file used to identify the local connection (client).
ssl_certfile = None
# How many times should we retry connection before failing.
connection_retries = 10
# Use SSL / TLS to connect to the messaging server. Same as appending "?ssl=true" at the end of the connection URL string.
ssl = False
# URL of the messaging server.
url = amqp://guest:guest@127.0.0.1:5672//
# Specifies whether a certificate is required from the other side of the connection, and whether it will be validated if provided.
ssl_cert_reqs = None
# URL of all the nodes in a messaging service cluster.
cluster_urls = # comma separated list allowed here.
# How long should we wait between connection retries.
connection_retry_wait = 10000
# Private keyfile used to identify the local connection against RabbitMQ.
ssl_keyfile = None
# ca_certs file contains a set of concatenated CA certificates, which are used to validate certificates passed from RabbitMQ.
ssl_ca_certs = None
# Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.).
login_method = None

[metrics]
# Randomly sample and only send metrics for X% of metric operations to the backend. Default value of 1 means no sampling is done and all the metrics are sent to the backend. E.g. 0.1 would mean 10% of operations are sampled.
Expand Down
9 changes: 8 additions & 1 deletion conf/st2.dev.conf
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,14 @@ ssh_key_file = /home/vagrant/.ssh/stanley_rsa

[messaging]
url = amqp://guest:guest@127.0.0.1:5672/
#url = redis://localhost:6379/0
# Uncomment to test SSL options
#url = amqp://guest:guest@127.0.0.1:5671/
#ssl = True
#ssl_keyfile = /data/stanley/st2tests/st2tests/fixtures/ssl_certs/client/private_key.pem
#ssl_certfile = /data/stanley/st2tests/st2tests/fixtures/ssl_certs/client/client_certificate.pem
#ssl_ca_certs = /data/stanley/st2tests/st2tests/fixtures/ssl_certs/ca/ca_certificate_bundle.pem
#ssl_cert_reqs = required
#ssl_cert_reqs = required

[ssh_runner]
remote_dir = /tmp
Expand Down
11 changes: 11 additions & 0 deletions scripts/travis/rabbitmq.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[
{rabbit, [
{ssl_listeners, [5671]},
{ssl_allow_poodle_attack, true},
{ssl_options, [{cacertfile, "/home/travis/build/StackStorm/st2/st2tests/st2tests/fixtures/ssl_certs/ca/ca_certificate_bundle.pem"},
{certfile, "/home/travis/build/StackStorm/st2/st2tests/st2tests/fixtures/ssl_certs/server/server_certificate.pem"},
{keyfile, "/home/travis/build/StackStorm/st2/st2tests/st2tests/fixtures/ssl_certs/server/private_key.pem"},
{verify, verify_peer},
{fail_if_no_peer_cert, false}]}
]}
].
4 changes: 2 additions & 2 deletions st2actions/st2actions/notifier/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
# limitations under the License.

from __future__ import absolute_import

from datetime import datetime
import json

from kombu import Connection
from oslo_config import cfg

from st2common import log as logging
Expand Down Expand Up @@ -268,6 +268,6 @@ def _get_runner_ref(self, action_ref):


def get_notifier():
with Connection(transport_utils.get_messaging_urls()) as conn:
with transport_utils.get_connection() as conn:
return Notifier(conn, [NOTIFIER_ACTIONUPDATE_WORK_QUEUE],
trigger_dispatcher=TriggerDispatcher(LOG))
4 changes: 2 additions & 2 deletions st2actions/st2actions/resultstracker/resultstracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
# limitations under the License.

from __future__ import absolute_import

import eventlet
import six

from collections import defaultdict
from kombu import Connection

from st2common.query.base import QueryContext
from st2common import log as logging
Expand Down Expand Up @@ -111,5 +111,5 @@ def get_querier(self, query_module_name):


def get_tracker():
with Connection(transport_utils.get_messaging_urls()) as conn:
with transport_utils.get_connection() as conn:
return ResultsTracker(conn, [RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE])
3 changes: 1 addition & 2 deletions st2actions/st2actions/scheduler/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.

from __future__ import absolute_import
from kombu import Connection

from st2common import log as logging
from st2common.util import date
Expand Down Expand Up @@ -105,5 +104,5 @@ def _create_execution_queue_item_db_from_liveaction(self, liveaction, delay=None


def get_scheduler_entrypoint():
with Connection(transport_utils.get_messaging_urls()) as conn:
with transport_utils.get_connection() as conn:
return SchedulerEntrypoint(conn, [ACTIONSCHEDULER_REQUEST_QUEUE])
4 changes: 1 addition & 3 deletions st2actions/st2actions/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import sys
import traceback

from kombu import Connection

from st2actions.container.base import RunnerContainer
from st2common import log as logging
from st2common.constants import action as action_constants
Expand Down Expand Up @@ -250,5 +248,5 @@ def _resume_action(self, liveaction_db):


def get_worker():
with Connection(transport_utils.get_messaging_urls()) as conn:
with transport_utils.get_connection() as conn:
return ActionExecutionDispatcher(conn, ACTIONRUNNER_QUEUES)
4 changes: 1 addition & 3 deletions st2actions/tests/integration/test_action_state_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import mock

from kombu import Connection

from st2common.transport.queues import RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE
from st2actions.resultstracker.resultstracker import ResultsTracker
from st2common.models.db.executionstate import ActionExecutionStateDB
Expand Down Expand Up @@ -63,7 +61,7 @@ def setUpClass(cls):

@mock.patch.object(TestQuerier, 'query', mock.MagicMock(return_value=(False, {})))
def test_process_message(self):
with Connection(transport_utils.get_messaging_urls()) as conn:
with transport_utils.get_connection() as conn:
tracker = ResultsTracker(conn, [RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE])
tracker._bootstrap()
state = ActionStateConsumerTests.get_state(
Expand Down
23 changes: 22 additions & 1 deletion st2common/st2common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,28 @@ def register_opts(ignore_errors=False):
help='How many times should we retry connection before failing.'),
cfg.IntOpt(
'connection_retry_wait', default=10000,
help='How long should we wait between connection retries.')
help='How long should we wait between connection retries.'),
cfg.BoolOpt(
'ssl', default=False,
help='Use SSL / TLS to connect to the messaging server. Same as '
'appending "?ssl=true" at the end of the connection URL string.'),
cfg.StrOpt(
'ssl_keyfile', default=None,
help='Private keyfile used to identify the local connection against RabbitMQ.'),
cfg.StrOpt(
'ssl_certfile', default=None,
help='Certificate file used to identify the local connection (client).'),
cfg.StrOpt(
'ssl_cert_reqs', default=None, choices='none, optional, required',
help='Specifies whether a certificate is required from the other side of the '
'connection, and whether it will be validated if provided.'),
cfg.StrOpt(
'ssl_ca_certs', default=None,
help='ca_certs file contains a set of concatenated CA certificates, which are '
'used to validate certificates passed from RabbitMQ.'),
cfg.StrOpt(
'login_method', default=None,
help='Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.).')
]

do_register_opts(messaging_opts, 'messaging', ignore_errors)
Expand Down
6 changes: 3 additions & 3 deletions st2common/st2common/models/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,11 @@ def _get_ssl_kwargs(ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_req
ssl_kwargs['ssl'] = True
ssl_kwargs['ssl_certfile'] = ssl_certfile
if ssl_cert_reqs:
if ssl_cert_reqs is 'none':
if ssl_cert_reqs == 'none':
ssl_cert_reqs = ssl_lib.CERT_NONE
elif ssl_cert_reqs is 'optional':
elif ssl_cert_reqs == 'optional':
ssl_cert_reqs = ssl_lib.CERT_OPTIONAL
elif ssl_cert_reqs is 'required':
elif ssl_cert_reqs == 'required':
ssl_cert_reqs = ssl_lib.CERT_REQUIRED
ssl_kwargs['ssl_cert_reqs'] = ssl_cert_reqs
if ssl_ca_certs:
Expand Down
7 changes: 2 additions & 5 deletions st2common/st2common/persistence/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from st2common.models.db.execution import ActionExecutionDB
from st2common.models.db.execution import ActionExecutionOutputDB
from st2common.persistence.base import Access
from st2common.transport import utils as transport_utils

__all__ = [
'ActionExecution',
Expand All @@ -38,8 +37,7 @@ def _get_impl(cls):
@classmethod
def _get_publisher(cls):
if not cls.publisher:
cls.publisher = transport.execution.ActionExecutionPublisher(
urls=transport_utils.get_messaging_urls())
cls.publisher = transport.execution.ActionExecutionPublisher()
return cls.publisher

@classmethod
Expand All @@ -57,8 +55,7 @@ def _get_impl(cls):
@classmethod
def _get_publisher(cls):
if not cls.publisher:
cls.publisher = transport.execution.ActionExecutionOutputPublisher(
urls=transport_utils.get_messaging_urls())
cls.publisher = transport.execution.ActionExecutionOutputPublisher()
return cls.publisher

@classmethod
Expand Down
9 changes: 6 additions & 3 deletions st2common/st2common/persistence/executionstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
# limitations under the License.

from __future__ import absolute_import

from st2common import transport
from st2common.models.db.executionstate import actionexecstate_access
from st2common.persistence import base as persistence
from st2common.transport import utils as transport_utils

__all__ = [
'ActionExecutionState'
]


class ActionExecutionState(persistence.Access):
Expand All @@ -31,6 +35,5 @@ def _get_impl(cls):
@classmethod
def _get_publisher(cls):
if not cls.publisher:
cls.publisher = transport.actionexecutionstate.ActionExecutionStatePublisher(
urls=transport_utils.get_messaging_urls())
cls.publisher = transport.actionexecutionstate.ActionExecutionStatePublisher()
return cls.publisher
9 changes: 6 additions & 3 deletions st2common/st2common/persistence/liveaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
# limitations under the License.

from __future__ import absolute_import

from st2common import transport
from st2common.models.db.liveaction import liveaction_access
from st2common.persistence import base as persistence
from st2common.transport import utils as transport_utils

__all__ = [
'LiveAction'
]


class LiveAction(persistence.StatusBasedResource):
Expand All @@ -31,8 +35,7 @@ def _get_impl(cls):
@classmethod
def _get_publisher(cls):
if not cls.publisher:
cls.publisher = transport.liveaction.LiveActionPublisher(
urls=transport_utils.get_messaging_urls())
cls.publisher = transport.liveaction.LiveActionPublisher()
return cls.publisher

@classmethod
Expand Down
9 changes: 6 additions & 3 deletions st2common/st2common/persistence/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
# limitations under the License.

from __future__ import absolute_import

from st2common import transport
from st2common.models.db.sensor import sensor_type_access
from st2common.persistence.base import ContentPackResource
from st2common.transport import utils as transport_utils

__all__ = [
'SensorType'
]


class SensorType(ContentPackResource):
Expand All @@ -31,6 +35,5 @@ def _get_impl(cls):
@classmethod
def _get_publisher(cls):
if not cls.publisher:
cls.publisher = transport.reactor.SensorCUDPublisher(
urls=transport_utils.get_messaging_urls())
cls.publisher = transport.reactor.SensorCUDPublisher()
return cls.publisher
Loading

0 comments on commit 09fc032

Please sign in to comment.