Add policies for API access control to watcher project.
Change-Id: Ibdbe494c636dfaeca9cf2ef8724d0dade1f19c7f blueprint: watcher-policies
This commit is contained in:
@@ -21,14 +21,17 @@
|
||||
# https://bugs.launchpad.net/watcher/+bug/1255115.
|
||||
|
||||
# NOTE(deva): import auth_token so we can override a config option
|
||||
from keystonemiddleware import auth_token # noqa
|
||||
# import mock
|
||||
|
||||
import copy
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
import pecan
|
||||
import pecan.testing
|
||||
from six.moves.urllib import parse as urlparse
|
||||
|
||||
from watcher.api import hooks
|
||||
from watcher.common import context as watcher_context
|
||||
from watcher.tests.db import base
|
||||
|
||||
PATH_PREFIX = '/v1'
|
||||
@@ -243,3 +246,41 @@ class FunctionalTest(base.DbTestCase):
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
class AdminRoleTest(base.DbTestCase):
|
||||
def setUp(self):
|
||||
super(AdminRoleTest, self).setUp()
|
||||
token_info = {
|
||||
'token': {
|
||||
'project': {
|
||||
'id': 'admin'
|
||||
},
|
||||
'user': {
|
||||
'id': 'admin'
|
||||
}
|
||||
}
|
||||
}
|
||||
self.context = watcher_context.RequestContext(
|
||||
auth_token_info=token_info,
|
||||
project_id='admin',
|
||||
user_id='admin')
|
||||
|
||||
def make_context(*args, **kwargs):
|
||||
# If context hasn't been constructed with token_info
|
||||
if not kwargs.get('auth_token_info'):
|
||||
kwargs['auth_token_info'] = copy.deepcopy(token_info)
|
||||
if not kwargs.get('project_id'):
|
||||
kwargs['project_id'] = 'admin'
|
||||
if not kwargs.get('user_id'):
|
||||
kwargs['user_id'] = 'admin'
|
||||
if not kwargs.get('roles'):
|
||||
kwargs['roles'] = ['admin']
|
||||
|
||||
context = watcher_context.RequestContext(*args, **kwargs)
|
||||
return watcher_context.RequestContext.from_dict(context.to_dict())
|
||||
|
||||
p = mock.patch.object(watcher_context, 'make_context',
|
||||
side_effect=make_context)
|
||||
self.mock_make_context = p.start()
|
||||
self.addCleanup(p.stop)
|
||||
|
||||
@@ -11,8 +11,9 @@
|
||||
# limitations under the License.
|
||||
|
||||
import datetime
|
||||
|
||||
import json
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
from wsme import types as wtypes
|
||||
|
||||
@@ -482,3 +483,49 @@ class TestDelete(api_base.FunctionalTest):
|
||||
self.assertEqual(403, response.status_int)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertTrue(response.json['error_message'])
|
||||
|
||||
|
||||
class TestActionPolicyEnforcement(api_base.FunctionalTest):
|
||||
|
||||
def _common_policy_check(self, rule, func, *arg, **kwarg):
|
||||
self.policy.set_rules({
|
||||
"admin_api": "(role:admin or role:administrator)",
|
||||
"default": "rule:admin_api",
|
||||
rule: "rule:defaut"})
|
||||
response = func(*arg, **kwarg)
|
||||
self.assertEqual(403, response.status_int)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertTrue(
|
||||
"Policy doesn't allow %s to be performed." % rule,
|
||||
json.loads(response.json['error_message'])['faultstring'])
|
||||
|
||||
def test_policy_disallow_get_all(self):
|
||||
self._common_policy_check(
|
||||
"action:get_all", self.get_json, '/actions',
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_get_one(self):
|
||||
action = obj_utils.create_test_action(self.context)
|
||||
self._common_policy_check(
|
||||
"action:get", self.get_json,
|
||||
'/actions/%s' % action.uuid,
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_detail(self):
|
||||
self._common_policy_check(
|
||||
"action:detail", self.get_json,
|
||||
'/actions/detail',
|
||||
expect_errors=True)
|
||||
|
||||
|
||||
class TestActionPolicyEnforcementWithAdminContext(TestListAction,
|
||||
api_base.AdminRoleTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestActionPolicyEnforcementWithAdminContext, self).setUp()
|
||||
self.policy.set_rules({
|
||||
"admin_api": "(role:admin or role:administrator)",
|
||||
"default": "rule:admin_api",
|
||||
"action:detail": "rule:default",
|
||||
"action:get": "rule:default",
|
||||
"action:get_all": "rule:default"})
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
|
||||
import datetime
|
||||
import itertools
|
||||
import json
|
||||
import mock
|
||||
import pecan
|
||||
|
||||
@@ -575,3 +576,65 @@ class TestPatchStateTransitionOk(api_base.FunctionalTest):
|
||||
self.assertEqual(self.new_state, updated_ap['state'])
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertEqual(200, response.status_code)
|
||||
|
||||
|
||||
class TestActionPlanPolicyEnforcement(api_base.FunctionalTest):
|
||||
|
||||
def _common_policy_check(self, rule, func, *arg, **kwarg):
|
||||
self.policy.set_rules({
|
||||
"admin_api": "(role:admin or role:administrator)",
|
||||
"default": "rule:admin_api",
|
||||
rule: "rule:defaut"})
|
||||
response = func(*arg, **kwarg)
|
||||
self.assertEqual(403, response.status_int)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertTrue(
|
||||
"Policy doesn't allow %s to be performed." % rule,
|
||||
json.loads(response.json['error_message'])['faultstring'])
|
||||
|
||||
def test_policy_disallow_get_all(self):
|
||||
self._common_policy_check(
|
||||
"action_plan:get_all", self.get_json, '/action_plans',
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_get_one(self):
|
||||
action_plan = obj_utils.create_test_action_plan(self.context)
|
||||
self._common_policy_check(
|
||||
"action_plan:get", self.get_json,
|
||||
'/action_plans/%s' % action_plan.uuid,
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_detail(self):
|
||||
self._common_policy_check(
|
||||
"action_plan:detail", self.get_json,
|
||||
'/action_plans/detail',
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_update(self):
|
||||
action_plan = obj_utils.create_test_action_plan(self.context)
|
||||
self._common_policy_check(
|
||||
"action_plan:update", self.patch_json,
|
||||
'/action_plans/%s' % action_plan.uuid,
|
||||
[{'path': '/state', 'value': 'DELETED', 'op': 'replace'}],
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_delete(self):
|
||||
action_plan = obj_utils.create_test_action_plan(self.context)
|
||||
self._common_policy_check(
|
||||
"action_plan:delete", self.delete,
|
||||
'/action_plans/%s' % action_plan.uuid, expect_errors=True)
|
||||
|
||||
|
||||
class TestActionPlanPolicyEnforcementWithAdminContext(TestListActionPlan,
|
||||
api_base.AdminRoleTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestActionPlanPolicyEnforcementWithAdminContext, self).setUp()
|
||||
self.policy.set_rules({
|
||||
"admin_api": "(role:admin or role:administrator)",
|
||||
"default": "rule:admin_api",
|
||||
"action_plan:delete": "rule:default",
|
||||
"action_plan:detail": "rule:default",
|
||||
"action_plan:get": "rule:default",
|
||||
"action_plan:get_all": "rule:default",
|
||||
"action_plan:update": "rule:default"})
|
||||
|
||||
@@ -12,8 +12,9 @@
|
||||
|
||||
import datetime
|
||||
import itertools
|
||||
|
||||
import json
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
from six.moves.urllib import parse as urlparse
|
||||
@@ -654,3 +655,81 @@ class TestDelete(api_base.FunctionalTest):
|
||||
self.assertEqual(404, response.status_int)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertTrue(response.json['error_message'])
|
||||
|
||||
|
||||
class TestAuaditTemplatePolicyEnforcement(api_base.FunctionalTest):
|
||||
|
||||
def _common_policy_check(self, rule, func, *arg, **kwarg):
|
||||
self.policy.set_rules({
|
||||
"admin_api": "(role:admin or role:administrator)",
|
||||
"default": "rule:admin_api",
|
||||
rule: "rule:defaut"})
|
||||
response = func(*arg, **kwarg)
|
||||
self.assertEqual(403, response.status_int)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertTrue(
|
||||
"Policy doesn't allow %s to be performed." % rule,
|
||||
json.loads(response.json['error_message'])['faultstring'])
|
||||
|
||||
def test_policy_disallow_get_all(self):
|
||||
self._common_policy_check(
|
||||
"audit_template:get_all", self.get_json, '/audit_templates',
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_get_one(self):
|
||||
audit_template = obj_utils.create_test_audit_template(self.context)
|
||||
self._common_policy_check(
|
||||
"audit_template:get", self.get_json,
|
||||
'/audit_templates/%s' % audit_template.uuid,
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_detail(self):
|
||||
self._common_policy_check(
|
||||
"audit_template:detail", self.get_json,
|
||||
'/audit_templates/detail',
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_update(self):
|
||||
audit_template = obj_utils.create_test_audit_template(self.context)
|
||||
self._common_policy_check(
|
||||
"audit_template:update", self.patch_json,
|
||||
'/audit_templates/%s' % audit_template.uuid,
|
||||
[{'path': '/state', 'value': 'SUBMITTED', 'op': 'replace'}],
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_create(self):
|
||||
fake_goal1 = obj_utils.get_test_goal(
|
||||
self.context, id=1, uuid=utils.generate_uuid(), name="dummy_1")
|
||||
fake_goal1.create()
|
||||
fake_strategy1 = obj_utils.get_test_strategy(
|
||||
self.context, id=1, uuid=utils.generate_uuid(), name="strategy_1",
|
||||
goal_id=fake_goal1.id)
|
||||
fake_strategy1.create()
|
||||
|
||||
audit_template_dict = post_get_test_audit_template(
|
||||
goal=fake_goal1.uuid,
|
||||
strategy=fake_strategy1.uuid)
|
||||
self._common_policy_check(
|
||||
"audit_template:create", self.post_json, '/audit_templates',
|
||||
audit_template_dict, expect_errors=True)
|
||||
|
||||
def test_policy_disallow_delete(self):
|
||||
audit_template = obj_utils.create_test_audit_template(self.context)
|
||||
self._common_policy_check(
|
||||
"audit_template:delete", self.delete,
|
||||
'/audit_templates/%s' % audit_template.uuid, expect_errors=True)
|
||||
|
||||
|
||||
class TestAuditTemplatePolicyWithAdminContext(TestListAuditTemplate,
|
||||
api_base.AdminRoleTest):
|
||||
def setUp(self):
|
||||
super(TestAuditTemplatePolicyWithAdminContext, self).setUp()
|
||||
self.policy.set_rules({
|
||||
"admin_api": "(role:admin or role:administrator)",
|
||||
"default": "rule:admin_api",
|
||||
"audit_template:create": "rule:default",
|
||||
"audit_template:delete": "rule:default",
|
||||
"audit_template:detail": "rule:default",
|
||||
"audit_template:get": "rule:default",
|
||||
"audit_template:get_all": "rule:default",
|
||||
"audit_template:update": "rule:default"})
|
||||
|
||||
@@ -11,8 +11,9 @@
|
||||
# limitations under the License.
|
||||
|
||||
import datetime
|
||||
|
||||
import json
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
from wsme import types as wtypes
|
||||
@@ -606,3 +607,74 @@ class TestDelete(api_base.FunctionalTest):
|
||||
self.assertEqual(404, response.status_int)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertTrue(response.json['error_message'])
|
||||
|
||||
|
||||
class TestAuaditPolicyEnforcement(api_base.FunctionalTest):
|
||||
|
||||
def _common_policy_check(self, rule, func, *arg, **kwarg):
|
||||
self.policy.set_rules({
|
||||
"admin_api": "(role:admin or role:administrator)",
|
||||
"default": "rule:admin_api",
|
||||
rule: "rule:defaut"})
|
||||
response = func(*arg, **kwarg)
|
||||
self.assertEqual(403, response.status_int)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertTrue(
|
||||
"Policy doesn't allow %s to be performed." % rule,
|
||||
json.loads(response.json['error_message'])['faultstring'])
|
||||
|
||||
def test_policy_disallow_get_all(self):
|
||||
self._common_policy_check(
|
||||
"audit:get_all", self.get_json, '/audits',
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_get_one(self):
|
||||
audit = obj_utils.create_test_audit(self.context)
|
||||
self._common_policy_check(
|
||||
"audit:get", self.get_json,
|
||||
'/audits/%s' % audit.uuid,
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_detail(self):
|
||||
self._common_policy_check(
|
||||
"audit:detail", self.get_json,
|
||||
'/audits/detail',
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_update(self):
|
||||
audit = obj_utils.create_test_audit(self.context)
|
||||
self._common_policy_check(
|
||||
"audit:update", self.patch_json,
|
||||
'/audits/%s' % audit.uuid,
|
||||
[{'path': '/state', 'value': 'SUBMITTED', 'op': 'replace'}],
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_create(self):
|
||||
audit_dict = post_get_test_audit(state=objects.audit.State.PENDING)
|
||||
del audit_dict['uuid']
|
||||
del audit_dict['state']
|
||||
self._common_policy_check(
|
||||
"audit:create", self.post_json, '/audits', audit_dict,
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_delete(self):
|
||||
audit = obj_utils.create_test_audit(self.context)
|
||||
self._common_policy_check(
|
||||
"audit:delete", self.delete,
|
||||
'/audits/%s' % audit.uuid, expect_errors=True)
|
||||
|
||||
|
||||
class TestAuditEnforcementWithAdminContext(TestListAudit,
|
||||
api_base.AdminRoleTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestAuditEnforcementWithAdminContext, self).setUp()
|
||||
self.policy.set_rules({
|
||||
"admin_api": "(role:admin or role:administrator)",
|
||||
"default": "rule:admin_api",
|
||||
"audit:create": "rule:default",
|
||||
"audit:delete": "rule:default",
|
||||
"audit:detail": "rule:default",
|
||||
"audit:get": "rule:default",
|
||||
"audit:get_all": "rule:default",
|
||||
"audit:update": "rule:default"})
|
||||
|
||||
@@ -10,6 +10,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
|
||||
from oslo_config import cfg
|
||||
from six.moves.urllib import parse as urlparse
|
||||
|
||||
@@ -118,3 +120,49 @@ class TestListGoal(api_base.FunctionalTest):
|
||||
cfg.CONF.set_override('max_limit', 3, 'api', enforce_type=True)
|
||||
response = self.get_json('/goals')
|
||||
self.assertEqual(3, len(response['goals']))
|
||||
|
||||
|
||||
class TestGoalPolicyEnforcement(api_base.FunctionalTest):
|
||||
|
||||
def _common_policy_check(self, rule, func, *arg, **kwarg):
|
||||
self.policy.set_rules({
|
||||
"admin_api": "(role:admin or role:administrator)",
|
||||
"default": "rule:admin_api",
|
||||
rule: "rule:defaut"})
|
||||
response = func(*arg, **kwarg)
|
||||
self.assertEqual(403, response.status_int)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertTrue(
|
||||
"Policy doesn't allow %s to be performed." % rule,
|
||||
json.loads(response.json['error_message'])['faultstring'])
|
||||
|
||||
def test_policy_disallow_get_all(self):
|
||||
self._common_policy_check(
|
||||
"goal:get_all", self.get_json, '/goals',
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_get_one(self):
|
||||
goal = obj_utils.create_test_goal(self.context)
|
||||
self._common_policy_check(
|
||||
"goal:get", self.get_json,
|
||||
'/goals/%s' % goal.uuid,
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_detail(self):
|
||||
self._common_policy_check(
|
||||
"goal:detail", self.get_json,
|
||||
'/goals/detail',
|
||||
expect_errors=True)
|
||||
|
||||
|
||||
class TestGoalPolicyEnforcementWithAdminContext(TestListGoal,
|
||||
api_base.AdminRoleTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestGoalPolicyEnforcementWithAdminContext, self).setUp()
|
||||
self.policy.set_rules({
|
||||
"admin_api": "(role:admin or role:administrator)",
|
||||
"default": "rule:admin_api",
|
||||
"goal:detail": "rule:default",
|
||||
"goal:get_all": "rule:default",
|
||||
"goal:get_one": "rule:default"})
|
||||
|
||||
@@ -10,6 +10,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
|
||||
from oslo_config import cfg
|
||||
from six.moves.urllib import parse as urlparse
|
||||
|
||||
@@ -187,3 +189,49 @@ class TestListStrategy(api_base.FunctionalTest):
|
||||
self.assertEqual(2, len(strategies))
|
||||
for strategy in strategies:
|
||||
self.assertEqual(goal1['uuid'], strategy['goal_uuid'])
|
||||
|
||||
|
||||
class TestStrategyPolicyEnforcement(api_base.FunctionalTest):
|
||||
|
||||
def _common_policy_check(self, rule, func, *arg, **kwarg):
|
||||
self.policy.set_rules({
|
||||
"admin_api": "(role:admin or role:administrator)",
|
||||
"default": "rule:admin_api",
|
||||
rule: "rule:defaut"})
|
||||
response = func(*arg, **kwarg)
|
||||
self.assertEqual(403, response.status_int)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertTrue(
|
||||
"Policy doesn't allow %s to be performed." % rule,
|
||||
json.loads(response.json['error_message'])['faultstring'])
|
||||
|
||||
def test_policy_disallow_get_all(self):
|
||||
self._common_policy_check(
|
||||
"strategy:get_all", self.get_json, '/strategies',
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_get_one(self):
|
||||
strategy = obj_utils.create_test_strategy(self.context)
|
||||
self._common_policy_check(
|
||||
"strategy:get", self.get_json,
|
||||
'/strategies/%s' % strategy.uuid,
|
||||
expect_errors=True)
|
||||
|
||||
def test_policy_disallow_detail(self):
|
||||
self._common_policy_check(
|
||||
"strategy:detail", self.get_json,
|
||||
'/strategies/detail',
|
||||
expect_errors=True)
|
||||
|
||||
|
||||
class TestStrategyEnforcementWithAdminContext(TestListStrategy,
|
||||
api_base.AdminRoleTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestStrategyEnforcementWithAdminContext, self).setUp()
|
||||
self.policy.set_rules({
|
||||
"admin_api": "(role:admin or role:administrator)",
|
||||
"default": "rule:admin_api",
|
||||
"strategy:detail": "rule:default",
|
||||
"strategy:get": "rule:default",
|
||||
"strategy:get_all": "rule:default"})
|
||||
|
||||
@@ -29,6 +29,7 @@ import testscenarios
|
||||
from watcher.common import context as watcher_context
|
||||
from watcher.objects import base as objects_base
|
||||
from watcher.tests import conf_fixture
|
||||
from watcher.tests import policy_fixture
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@@ -68,6 +69,8 @@ class TestCase(BaseTestCase):
|
||||
project_id='fake_project',
|
||||
user_id='fake_user')
|
||||
|
||||
self.policy = self.useFixture(policy_fixture.PolicyFixture())
|
||||
|
||||
def make_context(*args, **kwargs):
|
||||
# If context hasn't been constructed with token_info
|
||||
if not kwargs.get('auth_token_info'):
|
||||
|
||||
@@ -16,9 +16,40 @@
|
||||
policy_data = """
|
||||
{
|
||||
"admin_api": "role:admin or role:administrator",
|
||||
"public_api": "is_public_api:True",
|
||||
"trusted_call": "rule:admin_api or rule:public_api",
|
||||
"default": "rule:trusted_call",
|
||||
"show_password": "!",
|
||||
"default": "rule:admin_api",
|
||||
|
||||
"action:detail": "",
|
||||
"action:get": "",
|
||||
"action:get_all": "",
|
||||
|
||||
"action_plan:delete": "",
|
||||
"action_plan:detail": "",
|
||||
"action_plan:get": "",
|
||||
"action_plan:get_all": "",
|
||||
"action_plan:update": "",
|
||||
|
||||
"audit:create": "",
|
||||
"audit:delete": "",
|
||||
"audit:detail": "",
|
||||
"audit:get": "",
|
||||
"audit:get_all": "",
|
||||
"audit:update": "",
|
||||
|
||||
"audit_template:create": "",
|
||||
"audit_template:delete": "",
|
||||
"audit_template:detail": "",
|
||||
"audit_template:get": "",
|
||||
"audit_template:get_all": "",
|
||||
"audit_template:update": "",
|
||||
|
||||
"goal:detail": "",
|
||||
"goal:get": "",
|
||||
"goal:get_all": "",
|
||||
|
||||
"strategy:detail": "",
|
||||
"strategy:get": "",
|
||||
"strategy:get_all": ""
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
@@ -16,25 +16,29 @@ import os
|
||||
|
||||
import fixtures
|
||||
from oslo_config import cfg
|
||||
from oslo_policy import _parser
|
||||
from oslo_policy import opts as policy_opts
|
||||
|
||||
from watcher.common import policy as w_policy
|
||||
from watcher.common import policy as watcher_policy
|
||||
from watcher.tests import fake_policy
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class PolicyFixture(fixtures.Fixture):
|
||||
def __init__(self, compat=None):
|
||||
self.compat = compat
|
||||
|
||||
def setUp(self):
|
||||
super(PolicyFixture, self).setUp()
|
||||
def _setUp(self):
|
||||
self.policy_dir = self.useFixture(fixtures.TempDir())
|
||||
self.policy_file_name = os.path.join(self.policy_dir.path,
|
||||
'policy.json')
|
||||
with open(self.policy_file_name, 'w') as policy_file:
|
||||
policy_file.write(fake_policy.get_policy_data(self.compat))
|
||||
CONF.set_override('policy_file', self.policy_file_name,
|
||||
enforce_type=True)
|
||||
w_policy._ENFORCER = None
|
||||
self.addCleanup(w_policy.get_enforcer().clear)
|
||||
policy_file.write(fake_policy.policy_data)
|
||||
policy_opts.set_defaults(CONF)
|
||||
CONF.set_override('policy_file', self.policy_file_name, 'oslo_policy')
|
||||
watcher_policy._ENFORCER = None
|
||||
self.addCleanup(watcher_policy.init().clear)
|
||||
|
||||
def set_rules(self, rules):
|
||||
policy = watcher_policy._ENFORCER
|
||||
policy.set_rules({k: _parser.parse_rule(v)
|
||||
for k, v in rules.items()})
|
||||
|
||||
Reference in New Issue
Block a user