Newer
Older
# -*- encoding: utf-8 -*-
import logging
import lasso
import openerp
from openerp.osv import osv, fields
from openerp import SUPERUSER_ID
_logger = logging.getLogger(__name__)
class res_users(osv.Model):
_inherit = 'res.users'
_columns = {
'saml_provider_id': fields.many2one(
'auth.saml.provider',
string='SAML Provider',
),
'saml_uid': fields.char(
Florent Aide
committed
'SAML User ID',
help="SAML Provider user_id",
),
}
_sql_constraints = [
(
'uniq_users_saml_provider_saml_uid',
'unique(saml_provider_id, saml_uid)',
'SAML UID must be unique per provider'
),
]
def _auth_saml_validate(self, cr, uid, provider, token, context=None):
""" return the validation data corresponding to the access token """
p = self.pool.get('auth.saml.provider')
login = p._get_lasso_for_provider(cr, uid, provider, context=context)
try:
login.processAuthnResponseMsg(token)
except (lasso.DsError, lasso.ProfileCannotVerifySignatureError):
raise Exception('Lasso Profile cannot verify signature')
except lasso.Error, e:
raise Exception(repr(e))
try:
login.acceptSso()
except lasso.Error:
raise Exception('Invalid assertion')
# TODO use a real token validation from LASSO
# TODO push into the validation result a real UPN
validation = {'user_id': login.assertion.subject.nameId.content}
"""
if p.data_endpoint:
data = self._auth_oauth_rpc(cr, uid, p.data_endpoint, access_token)
validation.update(data)
"""
return validation
def _auth_saml_signin(
self, cr, uid, provider, validation, saml_response, context=None
):
""" retrieve and sign into openerp the user corresponding to provider
and validated access token
:param provider: saml provider id (int)
:param validation: result of validation of access token (dict)
:return: user login (str)
:raise: openerp.exceptions.AccessDenied if signin failed
This method can be overridden to add alternative signin methods.
"""
token_osv = self.pool.get('auth_saml.token')
saml_uid = validation['user_id']
user_ids = self.search(
cr, uid,
[
("saml_uid", "=", saml_uid),
('saml_provider_id', '=', provider),
]
)
if not user_ids:
raise openerp.exceptions.AccessDenied()
# TODO replace assert by proper raise... asserts do not execute in
# production code...
assert len(user_ids) == 1
# browse the user because we'll need this in the response
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
user_id = user.id
# now find if a token for this user/provider already exists
token_ids = token_osv.search(
cr, uid,
[
('saml_provider_id', '=', provider),
('user_id', '=', user_id),
]
)
if token_ids:
token_osv.write(
cr, uid, token_ids,
{'saml_access_token': saml_response},
context=context
)
else:
token_osv.create(
cr, uid,
{
'saml_access_token': saml_response,
'saml_provider_id': provider,
'user_id': user_id,
},
context=context
)
return user.login
def auth_saml(self, cr, uid, provider, saml_response, context=None):
validation = self._auth_saml_validate(
cr, uid, provider, saml_response
)
# required check
if not validation.get('user_id'):
raise openerp.exceptions.AccessDenied()
# retrieve and sign in user
login = self._auth_saml_signin(
cr, uid, provider, validation, saml_response, context=context
)
if not login:
raise openerp.exceptions.AccessDenied()
# return user credentials
return cr.dbname, login, saml_response
def check_credentials(self, cr, uid, token):
"""token can be a password if the user has used the normal form...
but we are more interested in the case when they are tokens
and the interesting code is inside the except clause
"""
token_osv = self.pool.get('auth_saml.token')
super(res_users, self).check_credentials(cr, uid, token)
# since normal auth did not succeed we now try to find if the user
# has an active token attached to his uid
res = token_osv.search(
('user_id', '=', uid),
# if the user is not found we re-raise the AccessDenied
if not res:
# TODO: maybe raise a defined exception instead of the last
# exception that occurred in our execution frame