Merge "Add policies for API access control to watcher project."

This commit is contained in:
Jenkins
2016-07-06 16:17:43 +00:00
committed by Gerrit Code Review
22 changed files with 693 additions and 104 deletions

View File

@@ -21,14 +21,17 @@
# https://bugs.launchpad.net/watcher/+bug/1255115.
# NOTE(deva): import auth_token so we can override a config option
from keystonemiddleware import auth_token # noqa
# import mock
import copy
import mock
from oslo_config import cfg
import pecan
import pecan.testing
from six.moves.urllib import parse as urlparse
from watcher.api import hooks
from watcher.common import context as watcher_context
from watcher.tests.db import base
PATH_PREFIX = '/v1'
@@ -243,3 +246,41 @@ class FunctionalTest(base.DbTestCase):
return True
except Exception:
return False
class AdminRoleTest(base.DbTestCase):
def setUp(self):
super(AdminRoleTest, self).setUp()
token_info = {
'token': {
'project': {
'id': 'admin'
},
'user': {
'id': 'admin'
}
}
}
self.context = watcher_context.RequestContext(
auth_token_info=token_info,
project_id='admin',
user_id='admin')
def make_context(*args, **kwargs):
# If context hasn't been constructed with token_info
if not kwargs.get('auth_token_info'):
kwargs['auth_token_info'] = copy.deepcopy(token_info)
if not kwargs.get('project_id'):
kwargs['project_id'] = 'admin'
if not kwargs.get('user_id'):
kwargs['user_id'] = 'admin'
if not kwargs.get('roles'):
kwargs['roles'] = ['admin']
context = watcher_context.RequestContext(*args, **kwargs)
return watcher_context.RequestContext.from_dict(context.to_dict())
p = mock.patch.object(watcher_context, 'make_context',
side_effect=make_context)
self.mock_make_context = p.start()
self.addCleanup(p.stop)

View File

@@ -11,8 +11,9 @@
# limitations under the License.
import datetime
import json
import mock
from oslo_config import cfg
from wsme import types as wtypes
@@ -482,3 +483,49 @@ class TestDelete(api_base.FunctionalTest):
self.assertEqual(403, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
class TestActionPolicyEnforcement(api_base.FunctionalTest):
def _common_policy_check(self, rule, func, *arg, **kwarg):
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
rule: "rule:defaut"})
response = func(*arg, **kwarg)
self.assertEqual(403, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
json.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
self._common_policy_check(
"action:get_all", self.get_json, '/actions',
expect_errors=True)
def test_policy_disallow_get_one(self):
action = obj_utils.create_test_action(self.context)
self._common_policy_check(
"action:get", self.get_json,
'/actions/%s' % action.uuid,
expect_errors=True)
def test_policy_disallow_detail(self):
self._common_policy_check(
"action:detail", self.get_json,
'/actions/detail',
expect_errors=True)
class TestActionPolicyEnforcementWithAdminContext(TestListAction,
api_base.AdminRoleTest):
def setUp(self):
super(TestActionPolicyEnforcementWithAdminContext, self).setUp()
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
"action:detail": "rule:default",
"action:get": "rule:default",
"action:get_all": "rule:default"})

View File

@@ -12,6 +12,7 @@
import datetime
import itertools
import json
import mock
import pecan
@@ -575,3 +576,65 @@ class TestPatchStateTransitionOk(api_base.FunctionalTest):
self.assertEqual(self.new_state, updated_ap['state'])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_code)
class TestActionPlanPolicyEnforcement(api_base.FunctionalTest):
def _common_policy_check(self, rule, func, *arg, **kwarg):
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
rule: "rule:defaut"})
response = func(*arg, **kwarg)
self.assertEqual(403, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
json.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
self._common_policy_check(
"action_plan:get_all", self.get_json, '/action_plans',
expect_errors=True)
def test_policy_disallow_get_one(self):
action_plan = obj_utils.create_test_action_plan(self.context)
self._common_policy_check(
"action_plan:get", self.get_json,
'/action_plans/%s' % action_plan.uuid,
expect_errors=True)
def test_policy_disallow_detail(self):
self._common_policy_check(
"action_plan:detail", self.get_json,
'/action_plans/detail',
expect_errors=True)
def test_policy_disallow_update(self):
action_plan = obj_utils.create_test_action_plan(self.context)
self._common_policy_check(
"action_plan:update", self.patch_json,
'/action_plans/%s' % action_plan.uuid,
[{'path': '/state', 'value': 'DELETED', 'op': 'replace'}],
expect_errors=True)
def test_policy_disallow_delete(self):
action_plan = obj_utils.create_test_action_plan(self.context)
self._common_policy_check(
"action_plan:delete", self.delete,
'/action_plans/%s' % action_plan.uuid, expect_errors=True)
class TestActionPlanPolicyEnforcementWithAdminContext(TestListActionPlan,
api_base.AdminRoleTest):
def setUp(self):
super(TestActionPlanPolicyEnforcementWithAdminContext, self).setUp()
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
"action_plan:delete": "rule:default",
"action_plan:detail": "rule:default",
"action_plan:get": "rule:default",
"action_plan:get_all": "rule:default",
"action_plan:update": "rule:default"})

View File

@@ -12,8 +12,9 @@
import datetime
import itertools
import json
import mock
from oslo_config import cfg
from oslo_utils import timeutils
from six.moves.urllib import parse as urlparse
@@ -654,3 +655,81 @@ class TestDelete(api_base.FunctionalTest):
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
class TestAuaditTemplatePolicyEnforcement(api_base.FunctionalTest):
def _common_policy_check(self, rule, func, *arg, **kwarg):
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
rule: "rule:defaut"})
response = func(*arg, **kwarg)
self.assertEqual(403, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
json.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
self._common_policy_check(
"audit_template:get_all", self.get_json, '/audit_templates',
expect_errors=True)
def test_policy_disallow_get_one(self):
audit_template = obj_utils.create_test_audit_template(self.context)
self._common_policy_check(
"audit_template:get", self.get_json,
'/audit_templates/%s' % audit_template.uuid,
expect_errors=True)
def test_policy_disallow_detail(self):
self._common_policy_check(
"audit_template:detail", self.get_json,
'/audit_templates/detail',
expect_errors=True)
def test_policy_disallow_update(self):
audit_template = obj_utils.create_test_audit_template(self.context)
self._common_policy_check(
"audit_template:update", self.patch_json,
'/audit_templates/%s' % audit_template.uuid,
[{'path': '/state', 'value': 'SUBMITTED', 'op': 'replace'}],
expect_errors=True)
def test_policy_disallow_create(self):
fake_goal1 = obj_utils.get_test_goal(
self.context, id=1, uuid=utils.generate_uuid(), name="dummy_1")
fake_goal1.create()
fake_strategy1 = obj_utils.get_test_strategy(
self.context, id=1, uuid=utils.generate_uuid(), name="strategy_1",
goal_id=fake_goal1.id)
fake_strategy1.create()
audit_template_dict = post_get_test_audit_template(
goal=fake_goal1.uuid,
strategy=fake_strategy1.uuid)
self._common_policy_check(
"audit_template:create", self.post_json, '/audit_templates',
audit_template_dict, expect_errors=True)
def test_policy_disallow_delete(self):
audit_template = obj_utils.create_test_audit_template(self.context)
self._common_policy_check(
"audit_template:delete", self.delete,
'/audit_templates/%s' % audit_template.uuid, expect_errors=True)
class TestAuditTemplatePolicyWithAdminContext(TestListAuditTemplate,
api_base.AdminRoleTest):
def setUp(self):
super(TestAuditTemplatePolicyWithAdminContext, self).setUp()
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
"audit_template:create": "rule:default",
"audit_template:delete": "rule:default",
"audit_template:detail": "rule:default",
"audit_template:get": "rule:default",
"audit_template:get_all": "rule:default",
"audit_template:update": "rule:default"})

View File

@@ -11,8 +11,9 @@
# limitations under the License.
import datetime
import json
import mock
from oslo_config import cfg
from oslo_utils import timeutils
from wsme import types as wtypes
@@ -684,3 +685,74 @@ class TestDelete(api_base.FunctionalTest):
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
class TestAuaditPolicyEnforcement(api_base.FunctionalTest):
def _common_policy_check(self, rule, func, *arg, **kwarg):
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
rule: "rule:defaut"})
response = func(*arg, **kwarg)
self.assertEqual(403, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
json.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
self._common_policy_check(
"audit:get_all", self.get_json, '/audits',
expect_errors=True)
def test_policy_disallow_get_one(self):
audit = obj_utils.create_test_audit(self.context)
self._common_policy_check(
"audit:get", self.get_json,
'/audits/%s' % audit.uuid,
expect_errors=True)
def test_policy_disallow_detail(self):
self._common_policy_check(
"audit:detail", self.get_json,
'/audits/detail',
expect_errors=True)
def test_policy_disallow_update(self):
audit = obj_utils.create_test_audit(self.context)
self._common_policy_check(
"audit:update", self.patch_json,
'/audits/%s' % audit.uuid,
[{'path': '/state', 'value': 'SUBMITTED', 'op': 'replace'}],
expect_errors=True)
def test_policy_disallow_create(self):
audit_dict = post_get_test_audit(state=objects.audit.State.PENDING)
del audit_dict['uuid']
del audit_dict['state']
self._common_policy_check(
"audit:create", self.post_json, '/audits', audit_dict,
expect_errors=True)
def test_policy_disallow_delete(self):
audit = obj_utils.create_test_audit(self.context)
self._common_policy_check(
"audit:delete", self.delete,
'/audits/%s' % audit.uuid, expect_errors=True)
class TestAuditEnforcementWithAdminContext(TestListAudit,
api_base.AdminRoleTest):
def setUp(self):
super(TestAuditEnforcementWithAdminContext, self).setUp()
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
"audit:create": "rule:default",
"audit:delete": "rule:default",
"audit:detail": "rule:default",
"audit:get": "rule:default",
"audit:get_all": "rule:default",
"audit:update": "rule:default"})

View File

@@ -10,6 +10,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from oslo_config import cfg
from six.moves.urllib import parse as urlparse
@@ -118,3 +120,49 @@ class TestListGoal(api_base.FunctionalTest):
cfg.CONF.set_override('max_limit', 3, 'api', enforce_type=True)
response = self.get_json('/goals')
self.assertEqual(3, len(response['goals']))
class TestGoalPolicyEnforcement(api_base.FunctionalTest):
def _common_policy_check(self, rule, func, *arg, **kwarg):
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
rule: "rule:defaut"})
response = func(*arg, **kwarg)
self.assertEqual(403, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
json.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
self._common_policy_check(
"goal:get_all", self.get_json, '/goals',
expect_errors=True)
def test_policy_disallow_get_one(self):
goal = obj_utils.create_test_goal(self.context)
self._common_policy_check(
"goal:get", self.get_json,
'/goals/%s' % goal.uuid,
expect_errors=True)
def test_policy_disallow_detail(self):
self._common_policy_check(
"goal:detail", self.get_json,
'/goals/detail',
expect_errors=True)
class TestGoalPolicyEnforcementWithAdminContext(TestListGoal,
api_base.AdminRoleTest):
def setUp(self):
super(TestGoalPolicyEnforcementWithAdminContext, self).setUp()
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
"goal:detail": "rule:default",
"goal:get_all": "rule:default",
"goal:get_one": "rule:default"})

View File

@@ -10,6 +10,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from oslo_config import cfg
from six.moves.urllib import parse as urlparse
@@ -187,3 +189,49 @@ class TestListStrategy(api_base.FunctionalTest):
self.assertEqual(2, len(strategies))
for strategy in strategies:
self.assertEqual(goal1['uuid'], strategy['goal_uuid'])
class TestStrategyPolicyEnforcement(api_base.FunctionalTest):
def _common_policy_check(self, rule, func, *arg, **kwarg):
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
rule: "rule:defaut"})
response = func(*arg, **kwarg)
self.assertEqual(403, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
json.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
self._common_policy_check(
"strategy:get_all", self.get_json, '/strategies',
expect_errors=True)
def test_policy_disallow_get_one(self):
strategy = obj_utils.create_test_strategy(self.context)
self._common_policy_check(
"strategy:get", self.get_json,
'/strategies/%s' % strategy.uuid,
expect_errors=True)
def test_policy_disallow_detail(self):
self._common_policy_check(
"strategy:detail", self.get_json,
'/strategies/detail',
expect_errors=True)
class TestStrategyEnforcementWithAdminContext(TestListStrategy,
api_base.AdminRoleTest):
def setUp(self):
super(TestStrategyEnforcementWithAdminContext, self).setUp()
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
"strategy:detail": "rule:default",
"strategy:get": "rule:default",
"strategy:get_all": "rule:default"})

View File

@@ -29,6 +29,7 @@ import testscenarios
from watcher.common import context as watcher_context
from watcher.objects import base as objects_base
from watcher.tests import conf_fixture
from watcher.tests import policy_fixture
CONF = cfg.CONF
@@ -68,6 +69,8 @@ class TestCase(BaseTestCase):
project_id='fake_project',
user_id='fake_user')
self.policy = self.useFixture(policy_fixture.PolicyFixture())
def make_context(*args, **kwargs):
# If context hasn't been constructed with token_info
if not kwargs.get('auth_token_info'):

View File

@@ -16,9 +16,40 @@
policy_data = """
{
"admin_api": "role:admin or role:administrator",
"public_api": "is_public_api:True",
"trusted_call": "rule:admin_api or rule:public_api",
"default": "rule:trusted_call",
"show_password": "!",
"default": "rule:admin_api",
"action:detail": "",
"action:get": "",
"action:get_all": "",
"action_plan:delete": "",
"action_plan:detail": "",
"action_plan:get": "",
"action_plan:get_all": "",
"action_plan:update": "",
"audit:create": "",
"audit:delete": "",
"audit:detail": "",
"audit:get": "",
"audit:get_all": "",
"audit:update": "",
"audit_template:create": "",
"audit_template:delete": "",
"audit_template:detail": "",
"audit_template:get": "",
"audit_template:get_all": "",
"audit_template:update": "",
"goal:detail": "",
"goal:get": "",
"goal:get_all": "",
"strategy:detail": "",
"strategy:get": "",
"strategy:get_all": ""
}
"""

View File

@@ -16,25 +16,29 @@ import os
import fixtures
from oslo_config import cfg
from oslo_policy import _parser
from oslo_policy import opts as policy_opts
from watcher.common import policy as w_policy
from watcher.common import policy as watcher_policy
from watcher.tests import fake_policy
CONF = cfg.CONF
class PolicyFixture(fixtures.Fixture):
def __init__(self, compat=None):
self.compat = compat
def setUp(self):
super(PolicyFixture, self).setUp()
def _setUp(self):
self.policy_dir = self.useFixture(fixtures.TempDir())
self.policy_file_name = os.path.join(self.policy_dir.path,
'policy.json')
with open(self.policy_file_name, 'w') as policy_file:
policy_file.write(fake_policy.get_policy_data(self.compat))
CONF.set_override('policy_file', self.policy_file_name,
enforce_type=True)
w_policy._ENFORCER = None
self.addCleanup(w_policy.get_enforcer().clear)
policy_file.write(fake_policy.policy_data)
policy_opts.set_defaults(CONF)
CONF.set_override('policy_file', self.policy_file_name, 'oslo_policy')
watcher_policy._ENFORCER = None
self.addCleanup(watcher_policy.init().clear)
def set_rules(self, rules):
policy = watcher_policy._ENFORCER
policy.set_rules({k: _parser.parse_rule(v)
for k, v in rules.items()})