Skip to content

Commit

Permalink
Introduce Announcements
Browse files Browse the repository at this point in the history
  • Loading branch information
enykeev committed Nov 3, 2015
1 parent 1e878a7 commit eb463db
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 8 deletions.
10 changes: 10 additions & 0 deletions contrib/core/actions/announcement.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
description: Action that broadcasts the announcement to all stream consumers.
enabled: true
entry_point: ''
name: announcement
parameters:
message:
description: Message to broadcast.
type: string
runner_type: announcement
41 changes: 41 additions & 0 deletions st2actions/st2actions/runners/announcementrunner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import uuid

from st2actions.runners import ActionRunner
from st2common import log as logging
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
from st2common.transport.announcement import AnnouncementDispatcher

LOG = logging.getLogger(__name__)


def get_runner():
return AnnouncementRunner(str(uuid.uuid4()))


class AnnouncementRunner(ActionRunner):
def __init__(self, runner_id):
super(AnnouncementRunner, self).__init__(runner_id=runner_id)
self._dispatcher = AnnouncementDispatcher(LOG)

def pre_run(self):
LOG.debug('Entering AnnouncementRunner.pre_run() for liveaction_id="%s"',
self.liveaction_id)

def run(self, action_parameters):
self._dispatcher.dispatch('general', payload=action_parameters)
return (LIVEACTION_STATUS_SUCCEEDED, {'OK': True}, None)
19 changes: 11 additions & 8 deletions st2api/st2api/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from st2common.models.api.action import LiveActionAPI
from st2common.models.api.execution import ActionExecutionAPI
from st2common.transport import liveaction, execution, publishers
from st2common.transport import announcement, liveaction, execution, publishers
from st2common.transport import utils as transport_utils
from st2common import log as logging

Expand All @@ -32,11 +32,6 @@

LOG = logging.getLogger(__name__)

QUEUE = Queue(None,
liveaction.LIVEACTION_XCHG,
routing_key=publishers.ANY_RK,
exclusive=True)

_listener = None


Expand All @@ -49,6 +44,11 @@ def __init__(self, connection):

def get_consumers(self, consumer, channel):
return [
consumer(queues=[announcement.get_queue(routing_key=publishers.ANY_RK,
exclusive=True)],
accept=['pickle'],
callbacks=[self.processor()]),

consumer(queues=[execution.get_queue(routing_key=publishers.ANY_RK,
exclusive=True)],
accept=['pickle'],
Expand All @@ -62,14 +62,17 @@ def get_consumers(self, consumer, channel):
callbacks=[self.processor(LiveActionAPI)])
]

def processor(self, model):
def processor(self, model=None):
def process(body, message):
from_model_kwargs = {'mask_secrets': cfg.CONF.api.mask_secrets}
meta = message.delivery_info
event_name = '%s__%s' % (meta.get('exchange'), meta.get('routing_key'))

try:
self.emit(event_name, model.from_model(body, **from_model_kwargs))
if model:
body = model.from_model(body, **from_model_kwargs)

self.emit(event_name, body)
finally:
message.ack()

Expand Down
9 changes: 9 additions & 0 deletions st2common/st2common/bootstrap/runnersregistrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,15 @@
},

# Experimental runners below
{
'name': 'announcement',
'aliases': [],
'description': 'A runner for emitting an announcement event on the stream.',
'enabled': True,
'runner_parameters': {
},
'runner_module': 'st2actions.runners.announcementrunner'
},
{
'name': 'windows-cmd',
'aliases': [],
Expand Down
74 changes: 74 additions & 0 deletions st2common/st2common/transport/announcement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kombu import Exchange, Queue

from st2common import log as logging
from st2common.constants.trace import TRACE_CONTEXT
from st2common.models.api.trace import TraceContext
from st2common.transport import publishers
from st2common.transport import utils as transport_utils

LOG = logging.getLogger(__name__)

# Exchange for Announcements
ANNOUNCEMENT_XCHG = Exchange('st2.announcement', type='topic')


class AnnouncementPublisher(object):
def __init__(self, urls):
self._publisher = publishers.PoolPublisher(urls=urls)

def publish(self, payload=None, routing_key=None):
self._publisher.publish(payload, ANNOUNCEMENT_XCHG, routing_key)


class AnnouncementDispatcher(object):
"""
This trigger dispatcher dispatches trigger instances to a message queue (RabbitMQ).
"""

def __init__(self, logger=LOG):
self._publisher = AnnouncementPublisher(urls=transport_utils.get_messaging_urls())
self._logger = logger

def dispatch(self, routing_key, payload=None, trace_context=None):
"""
Method which dispatches the announcement.
:param trigger: Full name / reference of the announcement.
:type trigger: ``str`` or ``object``
:param payload: Announcement payload.
:type payload: ``dict``
:param trace_context: Trace context to associate with Announcement.
:type trace_context: ``TraceContext``
"""
assert isinstance(payload, (type(None), dict))
assert isinstance(trace_context, (type(None), TraceContext))

payload = {
'payload': payload,
TRACE_CONTEXT: trace_context
}

self._logger.debug('Dispatching announcement (routing_key=%s,payload=%s)',
routing_key, payload)
self._publisher.publish(payload=payload, routing_key=routing_key)


def get_queue(name=None, routing_key=None, exclusive=False):
return Queue(name, ANNOUNCEMENT_XCHG, routing_key=routing_key, exclusive=exclusive)

0 comments on commit eb463db

Please sign in to comment.