Use taskflow library for building and executing action plans
The aim of this patchset is to integrate taskflow in the Watcher Applier. Taskflow will help us a lot to make Action Plan execution easy, consistent, scalable and reliable. DocImpact Partially implements: blueprint use-taskflow Change-Id: I903d6509d74a61ad64e1506b8a7156e6e91abcfb Closes-Bug: #1535326 Closes-Bug: #1531912
This commit is contained in:
@@ -18,51 +18,51 @@
|
||||
#
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.applier.action_plan.base import BaseActionPlanHandler
|
||||
from watcher.applier.default import DefaultApplier
|
||||
from watcher.applier.messaging.events import Events
|
||||
from watcher.common.messaging.events.event import Event
|
||||
from watcher.objects.action_plan import ActionPlan
|
||||
from watcher.objects.action_plan import Status
|
||||
from watcher.applier.action_plan import base
|
||||
from watcher.applier import default
|
||||
from watcher.applier.messaging import event_types
|
||||
from watcher.common.messaging.events import event
|
||||
from watcher import objects
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DefaultActionPlanHandler(BaseActionPlanHandler):
|
||||
def __init__(self, context, manager_applier, action_plan_uuid):
|
||||
class DefaultActionPlanHandler(base.BaseActionPlanHandler):
|
||||
def __init__(self, context, applier_manager, action_plan_uuid):
|
||||
super(DefaultActionPlanHandler, self).__init__()
|
||||
self.ctx = context
|
||||
self.action_plan_uuid = action_plan_uuid
|
||||
self.manager_applier = manager_applier
|
||||
self.applier_manager = applier_manager
|
||||
|
||||
def notify(self, uuid, event_type, state):
|
||||
action_plan = ActionPlan.get_by_uuid(self.ctx, uuid)
|
||||
action_plan = objects.ActionPlan.get_by_uuid(self.ctx, uuid)
|
||||
action_plan.state = state
|
||||
action_plan.save()
|
||||
event = Event()
|
||||
event.type = event_type
|
||||
event.data = {}
|
||||
ev = event.Event()
|
||||
ev.type = event_type
|
||||
ev.data = {}
|
||||
payload = {'action_plan__uuid': uuid,
|
||||
'action_plan_state': state}
|
||||
self.manager_applier.topic_status.publish_event(event.type.name,
|
||||
self.applier_manager.topic_status.publish_event(ev.type.name,
|
||||
payload)
|
||||
|
||||
def execute(self):
|
||||
try:
|
||||
# update state
|
||||
self.notify(self.action_plan_uuid,
|
||||
Events.LAUNCH_ACTION_PLAN,
|
||||
Status.ONGOING)
|
||||
applier = DefaultApplier(self.manager_applier, self.ctx)
|
||||
event_types.EventTypes.LAUNCH_ACTION_PLAN,
|
||||
objects.action_plan.Status.ONGOING)
|
||||
applier = default.DefaultApplier(self.applier_manager, self.ctx)
|
||||
result = applier.execute(self.action_plan_uuid)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
result = False
|
||||
LOG.error("Launch Action Plan " + unicode(e))
|
||||
finally:
|
||||
if result is True:
|
||||
status = Status.SUCCEEDED
|
||||
status = objects.action_plan.Status.SUCCEEDED
|
||||
else:
|
||||
status = Status.FAILED
|
||||
status = objects.action_plan.Status.FAILED
|
||||
# update state
|
||||
self.notify(self.action_plan_uuid, Events.LAUNCH_ACTION_PLAN,
|
||||
self.notify(self.action_plan_uuid,
|
||||
event_types.EventTypes.LAUNCH_ACTION_PLAN,
|
||||
status)
|
||||
|
||||
@@ -32,16 +32,15 @@ the appropriate commands to Nova for this type of
|
||||
"""
|
||||
|
||||
import abc
|
||||
import six
|
||||
|
||||
from watcher.applier import promise
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BasePrimitive(object):
|
||||
class BaseAction(object):
|
||||
def __init__(self):
|
||||
self._input_parameters = None
|
||||
self._applies_to = None
|
||||
self._input_parameters = {}
|
||||
self._applies_to = ""
|
||||
|
||||
@property
|
||||
def input_parameters(self):
|
||||
@@ -59,12 +58,18 @@ class BasePrimitive(object):
|
||||
def applies_to(self, a):
|
||||
self._applies_to = a
|
||||
|
||||
@promise.Promise
|
||||
@abc.abstractmethod
|
||||
def execute(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
@promise.Promise
|
||||
@abc.abstractmethod
|
||||
def undo(self):
|
||||
def revert(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def precondition(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def postcondition(self):
|
||||
raise NotImplementedError()
|
||||
@@ -19,30 +19,23 @@
|
||||
|
||||
|
||||
from watcher._i18n import _
|
||||
from watcher.applier.primitives import base
|
||||
from watcher.applier import promise
|
||||
from watcher.applier.actions import base
|
||||
from watcher.common import exception
|
||||
from watcher.common import keystone as kclient
|
||||
from watcher.common import nova as nclient
|
||||
from watcher.decision_engine.model import hypervisor_state as hstate
|
||||
|
||||
|
||||
class ChangeNovaServiceState(base.BasePrimitive):
|
||||
def __init__(self):
|
||||
"""This class allows us to change the state of nova-compute service."""
|
||||
super(ChangeNovaServiceState, self).__init__()
|
||||
self._host = self.applies_to
|
||||
self._state = self.input_parameters.get('state')
|
||||
class ChangeNovaServiceState(base.BaseAction):
|
||||
|
||||
@property
|
||||
def host(self):
|
||||
return self._host
|
||||
return self.applies_to
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
return self._state
|
||||
return self.input_parameters.get('state')
|
||||
|
||||
@promise.Promise
|
||||
def execute(self):
|
||||
target_state = None
|
||||
if self.state == hstate.HypervisorState.OFFLINE.value:
|
||||
@@ -51,8 +44,7 @@ class ChangeNovaServiceState(base.BasePrimitive):
|
||||
target_state = True
|
||||
return self.nova_manage_service(target_state)
|
||||
|
||||
@promise.Promise
|
||||
def undo(self):
|
||||
def revert(self):
|
||||
target_state = None
|
||||
if self.state == hstate.HypervisorState.OFFLINE.value:
|
||||
target_state = True
|
||||
@@ -72,3 +64,9 @@ class ChangeNovaServiceState(base.BasePrimitive):
|
||||
return wrapper.enable_service_nova_compute(self.host)
|
||||
else:
|
||||
return wrapper.disable_service_nova_compute(self.host)
|
||||
|
||||
def precondition(self):
|
||||
pass
|
||||
|
||||
def postcondition(self):
|
||||
pass
|
||||
@@ -19,7 +19,7 @@ from __future__ import unicode_literals
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.applier.primitives.loading import default
|
||||
from watcher.applier.actions.loading import default
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@@ -19,11 +19,11 @@ from __future__ import unicode_literals
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.common.loader.default import DefaultLoader
|
||||
from watcher.common.loader import default
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DefaultActionLoader(DefaultLoader):
|
||||
class DefaultActionLoader(default.DefaultLoader):
|
||||
def __init__(self):
|
||||
super(DefaultActionLoader, self).__init__(namespace='watcher_actions')
|
||||
@@ -17,27 +17,42 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.applier.primitives import base
|
||||
from watcher.applier import promise
|
||||
from watcher.applier.actions import base
|
||||
from watcher.common import exception
|
||||
from watcher.common import keystone as kclient
|
||||
from watcher.common import nova as nclient
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
class Migrate(base.BasePrimitive):
|
||||
def __init__(self):
|
||||
super(Migrate, self).__init__()
|
||||
self.instance_uuid = self.applies_to
|
||||
self.migration_type = self.input_parameters.get('migration_type')
|
||||
|
||||
class Migrate(base.BaseAction):
|
||||
@property
|
||||
def instance_uuid(self):
|
||||
return self.applies_to
|
||||
|
||||
@property
|
||||
def migration_type(self):
|
||||
return self.input_parameters.get('migration_type')
|
||||
|
||||
@property
|
||||
def dst_hypervisor(self):
|
||||
return self.input_parameters.get('dst_hypervisor')
|
||||
|
||||
@property
|
||||
def src_hypervisor(self):
|
||||
return self.input_parameters.get('src_hypervisor')
|
||||
|
||||
def migrate(self, destination):
|
||||
keystone = kclient.KeystoneClient()
|
||||
wrapper = nclient.NovaClient(keystone.get_credentials(),
|
||||
session=keystone.get_session())
|
||||
LOG.debug("Migrate instance %s to %s ", self.instance_uuid,
|
||||
destination)
|
||||
instance = wrapper.find_instance(self.instance_uuid)
|
||||
if instance:
|
||||
if self.migration_type is 'live':
|
||||
if self.migration_type == 'live':
|
||||
return wrapper.live_migrate_instance(
|
||||
instance_id=self.instance_uuid, dest_hostname=destination)
|
||||
else:
|
||||
@@ -45,10 +60,17 @@ class Migrate(base.BasePrimitive):
|
||||
else:
|
||||
raise exception.InstanceNotFound(name=self.instance_uuid)
|
||||
|
||||
@promise.Promise
|
||||
def execute(self):
|
||||
return self.migrate(self.input_parameters.get('dst_hypervisor_uuid'))
|
||||
return self.migrate(destination=self.dst_hypervisor)
|
||||
|
||||
@promise.Promise
|
||||
def undo(self):
|
||||
return self.migrate(self.input_parameters.get('src_hypervisor_uuid'))
|
||||
def revert(self):
|
||||
return self.migrate(destination=self.src_hypervisor)
|
||||
|
||||
def precondition(self):
|
||||
# todo(jed) check if the instance exist/ check if the instance is on
|
||||
# the src_hypervisor
|
||||
pass
|
||||
|
||||
def postcondition(self):
|
||||
# todo(jed) we can image to check extra parameters (nework reponse,ect)
|
||||
pass
|
||||
@@ -19,23 +19,28 @@
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
|
||||
from watcher.applier.primitives import base
|
||||
from watcher.applier import promise
|
||||
from watcher.applier.actions import base
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Nop(base.BasePrimitive):
|
||||
class Nop(base.BaseAction):
|
||||
|
||||
@property
|
||||
def message(self):
|
||||
return self.input_parameters.get('message')
|
||||
|
||||
@promise.Promise
|
||||
def execute(self):
|
||||
LOG.debug("executing action NOP message:%s ",
|
||||
self.input_parameters.get('message'))
|
||||
LOG.debug("executing action NOP message:%s ", self.message)
|
||||
return True
|
||||
|
||||
@promise.Promise
|
||||
def undo(self):
|
||||
LOG.debug("undo action NOP")
|
||||
def revert(self):
|
||||
LOG.debug("revert action NOP")
|
||||
return True
|
||||
|
||||
def precondition(self):
|
||||
pass
|
||||
|
||||
def postcondition(self):
|
||||
pass
|
||||
48
watcher/applier/actions/sleep.py
Normal file
48
watcher/applier/actions/sleep.py
Normal file
@@ -0,0 +1,48 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import time
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.applier.actions import base
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Sleep(base.BaseAction):
|
||||
|
||||
@property
|
||||
def duration(self):
|
||||
return int(self.input_parameters.get('duration'))
|
||||
|
||||
def execute(self):
|
||||
LOG.debug("Starting action Sleep duration:%s ", self.duration)
|
||||
time.sleep(self.duration)
|
||||
return True
|
||||
|
||||
def revert(self):
|
||||
LOG.debug("revert action Sleep")
|
||||
return True
|
||||
|
||||
def precondition(self):
|
||||
pass
|
||||
|
||||
def postcondition(self):
|
||||
pass
|
||||
@@ -16,24 +16,48 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.applier import base
|
||||
from watcher.applier.execution import default
|
||||
from watcher.applier.workflow_engine.loading import default
|
||||
from watcher import objects
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class DefaultApplier(base.BaseApplier):
|
||||
def __init__(self, manager_applier, context):
|
||||
def __init__(self, applier_manager, context):
|
||||
super(DefaultApplier, self).__init__()
|
||||
self.manager_applier = manager_applier
|
||||
self.context = context
|
||||
self.executor = default.DefaultActionPlanExecutor(manager_applier,
|
||||
context)
|
||||
self._applier_manager = applier_manager
|
||||
self._loader = default.DefaultWorkFlowEngineLoader()
|
||||
self._engine = None
|
||||
self._context = context
|
||||
|
||||
@property
|
||||
def context(self):
|
||||
return self._context
|
||||
|
||||
@property
|
||||
def applier_manager(self):
|
||||
return self._applier_manager
|
||||
|
||||
@property
|
||||
def engine(self):
|
||||
if self._engine is None:
|
||||
selected_workflow_engine = CONF.watcher_applier.workflow_engine
|
||||
LOG.debug("Loading workflow engine %s ", selected_workflow_engine)
|
||||
self._engine = self._loader.load(name=selected_workflow_engine)
|
||||
self._engine.context = self.context
|
||||
self._engine.applier_manager = self.applier_manager
|
||||
return self._engine
|
||||
|
||||
def execute(self, action_plan_uuid):
|
||||
LOG.debug("Executing action plan %s ", action_plan_uuid)
|
||||
action_plan = objects.ActionPlan.get_by_uuid(self.context,
|
||||
action_plan_uuid)
|
||||
# todo(jed) remove direct access to dbapi need filter in object
|
||||
filters = {'action_plan_id': action_plan.id}
|
||||
actions = objects.Action.dbapi.get_action_list(self.context, filters)
|
||||
return self.executor.execute(actions)
|
||||
return self.engine.execute(actions)
|
||||
|
||||
@@ -1,57 +0,0 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
from oslo_log import log
|
||||
|
||||
from watcher._i18n import _LE
|
||||
from watcher.applier.execution import base
|
||||
from watcher.applier.execution import deploy_phase
|
||||
from watcher.objects import action_plan
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DefaultActionPlanExecutor(base.BaseActionPlanExecutor):
|
||||
def __init__(self, manager_applier, context):
|
||||
super(DefaultActionPlanExecutor, self).__init__(manager_applier,
|
||||
context)
|
||||
self.deploy = deploy_phase.DeployPhase(self)
|
||||
|
||||
def execute(self, actions):
|
||||
for action in actions:
|
||||
try:
|
||||
self.notify(action, action_plan.Status.ONGOING)
|
||||
loaded_action = self.action_factory.make_action(action)
|
||||
result = self.deploy.execute_primitive(loaded_action)
|
||||
if result is False:
|
||||
self.notify(action, action_plan.Status.FAILED)
|
||||
self.deploy.rollback()
|
||||
return False
|
||||
else:
|
||||
self.deploy.populate(loaded_action)
|
||||
self.notify(action, action_plan.Status.SUCCEEDED)
|
||||
except Exception as e:
|
||||
LOG.expection(e)
|
||||
LOG.debug('The ActionPlanExecutor failed to execute the action'
|
||||
' %s ', action)
|
||||
|
||||
LOG.error(_LE("Trigger a rollback"))
|
||||
self.notify(action, action_plan.Status.FAILED)
|
||||
self.deploy.rollback()
|
||||
return False
|
||||
return True
|
||||
@@ -1,56 +0,0 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
from oslo_log import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DeployPhase(object):
|
||||
def __init__(self, executor):
|
||||
# todo(jed) oslo_conf 10 secondes
|
||||
self._max_timeout = 100000
|
||||
self._actions = []
|
||||
self._executor = executor
|
||||
|
||||
@property
|
||||
def actions(self):
|
||||
return self._actions
|
||||
|
||||
@property
|
||||
def max_timeout(self):
|
||||
return self._max_timeout
|
||||
|
||||
@max_timeout.setter
|
||||
def max_timeout(self, m):
|
||||
self._max_timeout = m
|
||||
|
||||
def populate(self, action):
|
||||
self._actions.append(action)
|
||||
|
||||
def execute_primitive(self, primitive):
|
||||
future = primitive.execute(primitive)
|
||||
return future.result(self.max_timeout)
|
||||
|
||||
def rollback(self):
|
||||
reverted = sorted(self.actions, reverse=True)
|
||||
for primitive in reverted:
|
||||
try:
|
||||
self.execute_primitive(primitive)
|
||||
except Exception as e:
|
||||
LOG.error(e)
|
||||
@@ -16,20 +16,24 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.applier.messaging.trigger import TriggerActionPlan
|
||||
from watcher.common.messaging.messaging_core import MessagingCore
|
||||
from watcher.applier.messaging import trigger
|
||||
from watcher.common.messaging import messaging_core
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
# Register options
|
||||
APPLIER_MANAGER_OPTS = [
|
||||
cfg.IntOpt('applier_worker', default='1', help='The number of worker'),
|
||||
cfg.IntOpt('workers',
|
||||
default='1',
|
||||
min=1,
|
||||
required=True,
|
||||
help='Number of workers for applier, default value is 1.'),
|
||||
cfg.StrOpt('topic_control',
|
||||
default='watcher.applier.control',
|
||||
help='The topic name used for'
|
||||
@@ -45,7 +49,11 @@ APPLIER_MANAGER_OPTS = [
|
||||
cfg.StrOpt('publisher_id',
|
||||
default='watcher.applier.api',
|
||||
help='The identifier used by watcher '
|
||||
'module on the message broker')
|
||||
'module on the message broker'),
|
||||
cfg.StrOpt('workflow_engine',
|
||||
default='taskflow',
|
||||
required=True,
|
||||
help='Select the engine to use to execute the workflow')
|
||||
]
|
||||
|
||||
opt_group = cfg.OptGroup(name='watcher_applier',
|
||||
@@ -55,7 +63,7 @@ CONF.register_group(opt_group)
|
||||
CONF.register_opts(APPLIER_MANAGER_OPTS, opt_group)
|
||||
|
||||
|
||||
class ApplierManager(MessagingCore):
|
||||
class ApplierManager(messaging_core.MessagingCore):
|
||||
def __init__(self):
|
||||
super(ApplierManager, self).__init__(
|
||||
CONF.watcher_applier.publisher_id,
|
||||
@@ -63,10 +71,7 @@ class ApplierManager(MessagingCore):
|
||||
CONF.watcher_applier.topic_status,
|
||||
api_version=self.API_VERSION,
|
||||
)
|
||||
# shared executor of the workflow
|
||||
self.executor = ThreadPoolExecutor(max_workers=1)
|
||||
# trigger action_plan
|
||||
self.topic_control.add_endpoint(TriggerActionPlan(self))
|
||||
self.topic_control.add_endpoint(trigger.TriggerActionPlan(self))
|
||||
|
||||
def join(self):
|
||||
self.topic_control.join()
|
||||
|
||||
@@ -20,6 +20,6 @@
|
||||
import enum
|
||||
|
||||
|
||||
class Events(enum.Enum):
|
||||
class EventTypes(enum.Enum):
|
||||
LAUNCH_ACTION_PLAN = "launch_action_plan"
|
||||
LAUNCH_ACTION = "launch_action"
|
||||
@@ -16,30 +16,35 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
from concurrent import futures
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.applier.action_plan.default import DefaultActionPlanHandler
|
||||
from watcher.applier.action_plan import default
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class TriggerActionPlan(object):
|
||||
def __init__(self, manager_applier):
|
||||
self.manager_applier = manager_applier
|
||||
def __init__(self, applier_manager):
|
||||
self.applier_manager = applier_manager
|
||||
workers = CONF.watcher_applier.workers
|
||||
self.executor = futures.ThreadPoolExecutor(max_workers=workers)
|
||||
|
||||
def do_launch_action_plan(self, context, action_plan_uuid):
|
||||
try:
|
||||
cmd = DefaultActionPlanHandler(context,
|
||||
self.manager_applier,
|
||||
action_plan_uuid)
|
||||
cmd = default.DefaultActionPlanHandler(context,
|
||||
self.applier_manager,
|
||||
action_plan_uuid)
|
||||
cmd.execute()
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
|
||||
def launch_action_plan(self, context, action_plan_uuid):
|
||||
LOG.debug("Trigger ActionPlan %s" % action_plan_uuid)
|
||||
LOG.debug("Trigger ActionPlan %s", action_plan_uuid)
|
||||
# submit
|
||||
self.manager_applier.executor.submit(self.do_launch_action_plan,
|
||||
context,
|
||||
action_plan_uuid)
|
||||
self.executor.submit(self.do_launch_action_plan, context,
|
||||
action_plan_uuid)
|
||||
return action_plan_uuid
|
||||
|
||||
@@ -1,50 +0,0 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
from concurrent.futures import Future
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
|
||||
class Promise(object):
|
||||
executor = ThreadPoolExecutor(
|
||||
max_workers=10)
|
||||
|
||||
def __init__(self, func):
|
||||
self.func = func
|
||||
|
||||
def resolve(self, *args, **kwargs):
|
||||
resolved_args = []
|
||||
resolved_kwargs = {}
|
||||
|
||||
for i, arg in enumerate(args):
|
||||
if isinstance(arg, Future):
|
||||
resolved_args.append(arg.result())
|
||||
else:
|
||||
resolved_args.append(arg)
|
||||
|
||||
for kw, arg in kwargs.items():
|
||||
if isinstance(arg, Future):
|
||||
resolved_kwargs[kw] = arg.result()
|
||||
else:
|
||||
resolved_kwargs[kw] = arg
|
||||
|
||||
return self.func(*resolved_args, **resolved_kwargs)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.executor.submit(self.resolve, *args, **kwargs)
|
||||
@@ -23,8 +23,8 @@ import oslo_messaging as om
|
||||
from watcher.applier.manager import APPLIER_MANAGER_OPTS
|
||||
from watcher.applier.manager import opt_group
|
||||
from watcher.common import exception
|
||||
from watcher.common.messaging.messaging_core import MessagingCore
|
||||
from watcher.common.messaging.notification_handler import NotificationHandler
|
||||
from watcher.common.messaging import messaging_core
|
||||
from watcher.common.messaging import notification_handler as notification
|
||||
from watcher.common import utils
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ CONF.register_group(opt_group)
|
||||
CONF.register_opts(APPLIER_MANAGER_OPTS, opt_group)
|
||||
|
||||
|
||||
class ApplierAPI(MessagingCore):
|
||||
class ApplierAPI(messaging_core.MessagingCore):
|
||||
|
||||
def __init__(self):
|
||||
super(ApplierAPI, self).__init__(
|
||||
@@ -43,7 +43,7 @@ class ApplierAPI(MessagingCore):
|
||||
CONF.watcher_applier.topic_status,
|
||||
api_version=self.API_VERSION,
|
||||
)
|
||||
self.handler = NotificationHandler(self.publisher_id)
|
||||
self.handler = notification.NotificationHandler(self.publisher_id)
|
||||
self.handler.register_observer(self)
|
||||
self.topic_status.add_endpoint(self.handler)
|
||||
transport = om.get_transport(CONF)
|
||||
|
||||
@@ -20,26 +20,34 @@ import abc
|
||||
|
||||
import six
|
||||
|
||||
from watcher.applier.messaging import events
|
||||
from watcher.applier.primitives import factory
|
||||
from watcher.applier.actions import factory
|
||||
from watcher.applier.messaging import event_types
|
||||
from watcher.common.messaging.events import event
|
||||
from watcher import objects
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BaseActionPlanExecutor(object):
|
||||
def __init__(self, manager_applier, context):
|
||||
self._manager_applier = manager_applier
|
||||
self._context = context
|
||||
class BaseWorkFlowEngine(object):
|
||||
def __init__(self):
|
||||
self._applier_manager = None
|
||||
self._context = None
|
||||
self._action_factory = factory.ActionFactory()
|
||||
|
||||
@property
|
||||
def context(self):
|
||||
return self._context
|
||||
|
||||
@context.setter
|
||||
def context(self, c):
|
||||
self._context = c
|
||||
|
||||
@property
|
||||
def manager_applier(self):
|
||||
return self._manager_applier
|
||||
def applier_manager(self):
|
||||
return self._applier_manager
|
||||
|
||||
@applier_manager.setter
|
||||
def applier_manager(self, a):
|
||||
self._applier_manager = a
|
||||
|
||||
@property
|
||||
def action_factory(self):
|
||||
@@ -50,11 +58,11 @@ class BaseActionPlanExecutor(object):
|
||||
db_action.state = state
|
||||
db_action.save()
|
||||
ev = event.Event()
|
||||
ev.type = events.Events.LAUNCH_ACTION
|
||||
ev.type = event_types.EventTypes.LAUNCH_ACTION
|
||||
ev.data = {}
|
||||
payload = {'action_uuid': action.uuid,
|
||||
'action_state': state}
|
||||
self.manager_applier.topic_status.publish_event(ev.type.name,
|
||||
self.applier_manager.topic_status.publish_event(ev.type.name,
|
||||
payload)
|
||||
|
||||
@abc.abstractmethod
|
||||
159
watcher/applier/workflow_engine/default.py
Normal file
159
watcher/applier/workflow_engine/default.py
Normal file
@@ -0,0 +1,159 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2016 b<>com
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
from oslo_log import log
|
||||
from taskflow import engines
|
||||
from taskflow.patterns import graph_flow as gf
|
||||
from taskflow import task
|
||||
|
||||
from watcher._i18n import _LE, _LW, _LC
|
||||
from watcher.applier.workflow_engine import base
|
||||
from watcher.objects import action as obj_action
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
|
||||
def decider(self, history):
|
||||
# FIXME(jed) not possible with the current Watcher Planner
|
||||
#
|
||||
# decider – A callback function that will be expected to
|
||||
# decide at runtime whether v should be allowed to execute
|
||||
# (or whether the execution of v should be ignored,
|
||||
# and therefore not executed). It is expected to take as single
|
||||
# keyword argument history which will be the execution results of
|
||||
# all u decideable links that have v as a target. It is expected
|
||||
# to return a single boolean
|
||||
# (True to allow v execution or False to not).
|
||||
return True
|
||||
|
||||
def execute(self, actions):
|
||||
try:
|
||||
# NOTE(jed) We want to have a strong separation of concern
|
||||
# between the Watcher planner and the Watcher Applier in order
|
||||
# to us the possibility to support several workflow engine.
|
||||
# We want to provide the 'taskflow' engine by
|
||||
# default although we still want to leave the possibility for
|
||||
# the users to change it.
|
||||
# todo(jed) we need to change the way the actions are stored.
|
||||
# The current implementation only use a linked list of actions.
|
||||
# todo(jed) add olso conf for retry and name
|
||||
flow = gf.Flow("watcher_flow")
|
||||
previous = None
|
||||
for a in actions:
|
||||
task = TaskFlowActionContainer(a, self)
|
||||
flow.add(task)
|
||||
if previous is None:
|
||||
previous = task
|
||||
# we have only one Action in the Action Plan
|
||||
if len(actions) == 1:
|
||||
nop = TaskFlowNop()
|
||||
flow.add(nop)
|
||||
flow.link(previous, nop)
|
||||
else:
|
||||
# decider == guard (UML)
|
||||
flow.link(previous, task, decider=self.decider)
|
||||
previous = task
|
||||
|
||||
e = engines.load(flow)
|
||||
e.run()
|
||||
return True
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
return False
|
||||
|
||||
|
||||
class TaskFlowActionContainer(task.Task):
|
||||
def __init__(self, db_action, engine):
|
||||
name = "action_type:{0} uuid:{1}".format(db_action.action_type,
|
||||
db_action.uuid)
|
||||
super(TaskFlowActionContainer, self).__init__(name=name)
|
||||
self._db_action = db_action
|
||||
self._engine = engine
|
||||
self.loaded_action = None
|
||||
|
||||
@property
|
||||
def action(self):
|
||||
if self.loaded_action is None:
|
||||
action = self.engine.action_factory.make_action(self._db_action)
|
||||
self.loaded_action = action
|
||||
return self.loaded_action
|
||||
|
||||
@property
|
||||
def engine(self):
|
||||
return self._engine
|
||||
|
||||
def pre_execute(self):
|
||||
try:
|
||||
self.engine.notify(self._db_action,
|
||||
obj_action.Status.ONGOING)
|
||||
LOG.debug("Precondition action %s", self.name)
|
||||
self.action.precondition()
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
self.engine.notify(self._db_action,
|
||||
obj_action.Status.FAILED)
|
||||
raise
|
||||
|
||||
def execute(self, *args, **kwargs):
|
||||
try:
|
||||
LOG.debug("Running action %s", self.name)
|
||||
|
||||
# todo(jed) remove return (true or false) raise an Exception
|
||||
result = self.action.execute()
|
||||
if result is not True:
|
||||
self.engine.notify(self._db_action,
|
||||
obj_action.Status.FAILED)
|
||||
else:
|
||||
self.engine.notify(self._db_action,
|
||||
obj_action.Status.SUCCEEDED)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
LOG.error(_LE('The WorkFlow Engine has failed '
|
||||
'to execute the action %s'), self.name)
|
||||
|
||||
self.engine.notify(self._db_action,
|
||||
obj_action.Status.FAILED)
|
||||
raise
|
||||
|
||||
def post_execute(self):
|
||||
try:
|
||||
LOG.debug("postcondition action %s", self.name)
|
||||
self.action.postcondition()
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
self.engine.notify(self._db_action,
|
||||
obj_action.Status.FAILED)
|
||||
raise
|
||||
|
||||
def revert(self, *args, **kwargs):
|
||||
LOG.warning(_LW("Revert action %s"), self.name)
|
||||
try:
|
||||
# todo(jed) do we need to update the states in case of failure ?
|
||||
self.action.revert()
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
LOG.critical(_LC("Oops! We need disaster recover plan"))
|
||||
|
||||
|
||||
class TaskFlowNop(task.Task):
|
||||
"""This class is use in case of the workflow have only one Action.
|
||||
|
||||
We need at least two atoms to create a link
|
||||
"""
|
||||
def execute(self):
|
||||
pass
|
||||
0
watcher/applier/workflow_engine/loading/__init__.py
Normal file
0
watcher/applier/workflow_engine/loading/__init__.py
Normal file
30
watcher/applier/workflow_engine/loading/default.py
Normal file
30
watcher/applier/workflow_engine/loading/default.py
Normal file
@@ -0,0 +1,30 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.common.loader import default
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DefaultWorkFlowEngineLoader(default.DefaultLoader):
|
||||
def __init__(self):
|
||||
super(DefaultWorkFlowEngineLoader, self).__init__(
|
||||
namespace='watcher_workflow_engines')
|
||||
Reference in New Issue
Block a user