Extend decision engine to support threading mode

With the events of eventlet removal, Watcher will need
to be adapted to support both modes, eventlet and threading, for
a couple of releases before removing all eventlet code.
This patch adds methods and classes that allow decision engine
modules to create futurist thread pools instead of green thread pools,
based on a environment variable that can be enabled by service.
It moves continuous audit handler instance to decison engine service,
so it can be started together with the main decision engine service.
Adds an environment variable that allows the user to disable
eventlet monkey patching and to use oslo.service threading backend.

Change-Id: I8a8be0a7cebdc44005fd77ec960543828c7da318
Signed-off-by: Douglas Viroel <viroel@gmail.com>
This commit is contained in:
Douglas Viroel
2025-06-10 10:39:17 -03:00
parent 081cd5fae9
commit f879b10b05
16 changed files with 270 additions and 58 deletions

View File

@@ -16,12 +16,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import futurist
from oslo_config import cfg
from oslo_log import log
from watcher.decision_engine.audit import continuous as c_handler
from watcher.common import executor
from watcher.decision_engine.audit import event as e_handler
from watcher.decision_engine.audit import oneshot as o_handler
@@ -35,10 +33,10 @@ class AuditEndpoint(object):
def __init__(self, messaging):
self._messaging = messaging
self._executor = futurist.GreenThreadPoolExecutor(
max_workers=CONF.watcher_decision_engine.max_audit_workers)
self.amount_workers = CONF.watcher_decision_engine.max_audit_workers
self._executor = (
executor.get_futurist_pool_executor(self.amount_workers))
self._oneshot_handler = o_handler.OneShotAuditHandler()
self._continuous_handler = c_handler.ContinuousAuditHandler().start()
self._event_handler = e_handler.EventAuditHandler()
@property

View File

@@ -13,6 +13,7 @@
# under the License.
from watcher.common import service as watcher_service
from watcher.decision_engine.audit import continuous as c_handler
from watcher.decision_engine import manager
from watcher.decision_engine import scheduling
@@ -31,6 +32,7 @@ class DecisionEngineService(watcher_service.Service):
# task, an one shot task to cancel ongoing audits and a periodic
# check for expired action plans
self._bg_scheduler = None
self._continuous_handler = None
@property
def bg_scheduler(self):
@@ -38,10 +40,17 @@ class DecisionEngineService(watcher_service.Service):
self._bg_scheduler = scheduling.DecisionEngineSchedulingService()
return self._bg_scheduler
@property
def continuous_handler(self):
if self._continuous_handler is None:
self._continuous_handler = c_handler.ContinuousAuditHandler()
return self._continuous_handler
def start(self):
"""Start service."""
super().start()
self.bg_scheduler.start()
self.continuous_handler.start()
def stop(self):
"""Stop service."""

View File

@@ -17,13 +17,14 @@
# limitations under the License.
import copy
import futurist
from futurist import waiters
from oslo_config import cfg
from oslo_log import log
from oslo_service import service
from watcher.common import executor
CONF = cfg.CONF
LOG = log.getLogger(__name__)
@@ -33,8 +34,8 @@ class DecisionEngineThreadPool(object, metaclass=service.Singleton):
def __init__(self):
self.amount_workers = CONF.watcher_decision_engine.max_general_workers
self._threadpool = futurist.GreenThreadPoolExecutor(
max_workers=self.amount_workers)
self._threadpool = (
executor.get_futurist_pool_executor(self.amount_workers))
def submit(self, fn, *args, **kwargs):
"""Will submit the job to the underlying threadpool