diff --git a/watcher/applier/workflow_engine/base.py b/watcher/applier/workflow_engine/base.py index 3e0c60ff4..389ca262f 100644 --- a/watcher/applier/workflow_engine/base.py +++ b/watcher/applier/workflow_engine/base.py @@ -90,6 +90,7 @@ class BaseWorkFlowEngine(loadable.Loadable): eager=True) db_action.state = state db_action.save() + return db_action @abc.abstractmethod def execute(self, actions): @@ -149,9 +150,9 @@ class BaseTaskFlowActionContainer(flow_task.Task): self.engine.context, self._db_action.action_plan_id) if action_plan.state in CANCEL_STATE: raise exception.ActionPlanCancelled(uuid=action_plan.uuid) - self.do_pre_execute() + db_action = self.do_pre_execute() notifications.action.send_execution_notification( - self.engine.context, self._db_action, + self.engine.context, db_action, fields.NotificationAction.EXECUTION, fields.NotificationPhase.START) except exception.ActionPlanCancelled as e: @@ -159,9 +160,10 @@ class BaseTaskFlowActionContainer(flow_task.Task): raise except Exception as e: LOG.exception(e) - self.engine.notify(self._db_action, objects.action.State.FAILED) + db_action = self.engine.notify(self._db_action, + objects.action.State.FAILED) notifications.action.send_execution_notification( - self.engine.context, self._db_action, + self.engine.context, db_action, fields.NotificationAction.EXECUTION, fields.NotificationPhase.ERROR, priority=fields.NotificationPriority.ERROR) @@ -169,19 +171,19 @@ class BaseTaskFlowActionContainer(flow_task.Task): def execute(self, *args, **kwargs): def _do_execute_action(*args, **kwargs): try: - self.do_execute(*args, **kwargs) + db_action = self.do_execute(*args, **kwargs) notifications.action.send_execution_notification( - self.engine.context, self._db_action, + self.engine.context, db_action, fields.NotificationAction.EXECUTION, fields.NotificationPhase.END) except Exception as e: LOG.exception(e) LOG.error('The workflow engine has failed' 'to execute the action: %s', self.name) - self.engine.notify(self._db_action, - objects.action.State.FAILED) + db_action = self.engine.notify(self._db_action, + objects.action.State.FAILED) notifications.action.send_execution_notification( - self.engine.context, self._db_action, + self.engine.context, db_action, fields.NotificationAction.EXECUTION, fields.NotificationPhase.ERROR, priority=fields.NotificationPriority.ERROR) @@ -227,9 +229,10 @@ class BaseTaskFlowActionContainer(flow_task.Task): self.do_post_execute() except Exception as e: LOG.exception(e) - self.engine.notify(self._db_action, objects.action.State.FAILED) + db_action = self.engine.notify(self._db_action, + objects.action.State.FAILED) notifications.action.send_execution_notification( - self.engine.context, self._db_action, + self.engine.context, db_action, fields.NotificationAction.EXECUTION, fields.NotificationPhase.ERROR, priority=fields.NotificationPriority.ERROR) diff --git a/watcher/applier/workflow_engine/default.py b/watcher/applier/workflow_engine/default.py index 4080de7bf..22630f607 100644 --- a/watcher/applier/workflow_engine/default.py +++ b/watcher/applier/workflow_engine/default.py @@ -111,9 +111,11 @@ class TaskFlowActionContainer(base.BaseTaskFlowActionContainer): super(TaskFlowActionContainer, self).__init__(name, db_action, engine) def do_pre_execute(self): - self.engine.notify(self._db_action, objects.action.State.ONGOING) + db_action = self.engine.notify(self._db_action, + objects.action.State.ONGOING) LOG.debug("Pre-condition action: %s", self.name) self.action.pre_condition() + return db_action def do_execute(self, *args, **kwargs): LOG.debug("Running action: %s", self.name) @@ -121,11 +123,11 @@ class TaskFlowActionContainer(base.BaseTaskFlowActionContainer): # NOTE: For result is False, set action state fail result = self.action.execute() if result is False: - self.engine.notify(self._db_action, - objects.action.State.FAILED) + return self.engine.notify(self._db_action, + objects.action.State.FAILED) else: - self.engine.notify(self._db_action, - objects.action.State.SUCCEEDED) + return self.engine.notify(self._db_action, + objects.action.State.SUCCEEDED) def do_post_execute(self): LOG.debug("Post-condition action: %s", self.name) @@ -146,14 +148,15 @@ class TaskFlowActionContainer(base.BaseTaskFlowActionContainer): result = self.action.abort() if result: # Aborted the action. - self.engine.notify(self._db_action, - objects.action.State.CANCELLED) + return self.engine.notify(self._db_action, + objects.action.State.CANCELLED) else: - self.engine.notify(self._db_action, - objects.action.State.SUCCEEDED) + return self.engine.notify(self._db_action, + objects.action.State.SUCCEEDED) except Exception as e: - self.engine.notify(self._db_action, objects.action.State.FAILED) LOG.exception(e) + return self.engine.notify(self._db_action, + objects.action.State.FAILED) class TaskFlowNop(flow_task.Task):