Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from odoo import api, fields
from odoo.osv import orm
from odoo.tools import config, frozendict
from .models.analytic_dimension import get_analytic_size
class AddMethod(object):
"""Utility decorator to add methods to an object or a class."""
def __init__(self, obj):
self.obj = obj
def __call__(self, func):
setattr(self.obj, func.__name__, func)
return func
class MetaAnalytic(orm.MetaModel):
"""Allow the model to use the classes of the analytic_structure module
in a more streamlined way.
The metaclass' behavior is specified by adding the following attributes:
* _analytic: define the analytic structures to be used by the model.
* _dimensions: bind an analytic dimension to the model.
A description of the syntax expected for each attribute is available in
the README file.
Notes:
* This metaclass may directly modify attributes that are used by OpenERP,
specifically _inherits and the fields of the class.
* New superclasses are used to define or override methods, in order to
avoid interacting with the model's own method (re)definitions.
"""
def __new__(cls, name, bases, nmspc):
analytic = nmspc.get("_analytic", {})
para = nmspc.get("_para_analytic", {})
dimension = nmspc.get("_dimension", {})
defaults = nmspc.get("_defaults", None)
if defaults is None:
defaults = {}
nmspc["_defaults"] = defaults
orm_name = nmspc.get("_name", None)
if orm_name is None:
orm_name = nmspc.get("_inherit")
# Analytic fields should be defined in the _analytic attribute.
if analytic or para:
bases = cls._setup_analytic_fields(
analytic, para, defaults, orm_name, name, bases, nmspc
)
# The bound dimension should be defined in the _dimension attribute.
if dimension:
bases = cls._setup_bound_dimension(
dimension, defaults, orm_name, name, bases, nmspc
)
return super(MetaAnalytic, cls).__new__(cls, name, bases, nmspc)
def __init__(self, name, bases, nmspc):
return super(MetaAnalytic, self).__init__(name, bases, nmspc)
@classmethod
def _setup_analytic_fields(
cls, analytic, para, defaults, orm_name, name, bases, nmspc
):
"""Generate analytic and para-analytic fields on the model."""
# If _analytic uses a shortcut, convert it into a prefix-model mapping.
if analytic is True:
analytic = {"a": orm_name.replace(".", "_")}
elif isinstance(analytic, str):
analytic = {"a": analytic}
# Create a field that will be used for replacement in the view
if analytic:
nmspc["analytic_dimensions"] = fields.Char(
string="Analytic Dimensions",
compute=api.one(
lambda self: (setattr(self, "analytic_dimensions", ""))
),
readonly=True,
)
col_pattern = "{pre}{n}_{suf}"
size = get_analytic_size()
# Generate the fields directly into the nmspc.
all_analytic = []
for prefix, model_name in list(analytic.items()):
# Analytic fields
all_analytic.append((model_name, prefix, "id"))
for n in range(1, size + 1):
col_name = col_pattern.format(pre=prefix, n=n, suf="id")
domain_field = "nd_id.ns{n}_id.model_name".format(n=n)
nmspc[col_name] = fields.Many2one(
"Generated Analytic Field",
domain=[
(domain_field, "=", model_name),
("view_type", "=", False),
("disabled_per_company", "=", False),
],
ondelete="restrict",
track_visibility="onchange",
index=True,
)
for key, value in list(para.items()):
# Para-analytic fields
prefix, suffix = key
model_name = value["model"]
all_analytic.append((model_name, prefix, suffix))
raise ValueError("Para-analytic suffix cannot be 'id'")
field_type = value["type"]
args = value["args"]
kwargs = value["kwargs"]
for n in range(1, size + 1):
col_name = col_pattern.format(pre=prefix, n=n, suf=suffix)
nmspc[col_name] = field_type(*args, **kwargs)
if "default" in value:
defaults[col_name] = value["default"]
# In order to preserve inheritance and possible overrides, work on a
# new class that inherits the given bases, then make our model class
# inherit from this class.
superclass_name = "_{name}_SuperAnalytic".format(name=name)
# Set _register to False in order to prevent its instantiation.
superclass = type(superclass_name, bases, {"_register": False})
@AddMethod(superclass)
@api.model
Etienne Ferriere
committed
def get_analytic_field_names(self):
field_names = []
analytic_osv = self.env["analytic.structure"]
for model_name, prefix, suffix in all_analytic:
for ordering in analytic_osv.get_dimensions_names(model_name):
field_names.append("%s%s_%s" % (prefix, ordering, suffix))
return field_names
@AddMethod(superclass)
@api.model
def fields_get(self, allfields=None, attributes=None):
"""Override this method to rename analytic fields."""
res = super(superclass, self).fields_get(
allfields=allfields, attributes=attributes
)
analytic_osv = self.env["analytic.structure"]
for model_name, prefix, suffix in all_analytic:
res = analytic_osv.analytic_fields_get(
model_name, res, prefix, suffix
)
return res
@AddMethod(superclass)
@api.model
def fields_view_get(
self, view_id=None, view_type="form", toolbar=False, submenu=False
):
"""Override this method to hide unused analytic fields."""
res = super(superclass, self).fields_view_get(
view_id=view_id,
view_type=view_type,
toolbar=toolbar,
submenu=submenu,
)
analytic_osv = self.env["analytic.structure"]
for model_name, prefix, suffix in all_analytic:
res = analytic_osv.analytic_fields_view_get(
model_name, res, prefix, suffix
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
)
return res
@AddMethod(superclass)
@api.model
@api.returns(orm_name, lambda value: value.id)
def create(self, vals, **kwargs):
"""Performs analytic field validation"""
res = super(superclass, self).create(vals, **kwargs)
# Throws in case of error
res._validate_analytic_fields(frozendict(analytic))
return res
@AddMethod(superclass)
@api.multi
def write(self, vals, **kwargs):
"""Performs analytic field validation"""
res = super(superclass, self).write(vals, **kwargs)
# Throws in case of error
self._validate_analytic_fields(frozendict(analytic))
return res
@AddMethod(superclass)
@api.multi
def _validate_analytic_fields(self, analytic):
"""Validation function to validate analytic fields.
The base implementation doesn't actually do anything.
:param analytic: frozendict, analytic field parameters, such as
they would appear in the '_analytic' in the expanded
form, ie. as a prefix => model_name mapping.
:raises: Validation error when applicable.
"""
pass
return (superclass,)
@classmethod
def _setup_bound_dimension(
cls, dimension, defaults, orm_name, name, bases, nmspc
):
"""Bind a dimension to the model, creating a code for each record."""
if dimension is True:
dimension = {}
elif isinstance(dimension, str):
dimension = {"name": dimension}
dimension_name = dimension.get("name", None)
if dimension_name is None:
dimension_name = nmspc.get("_description", False) or orm_name
column = dimension.get("column", "analytic_id")
ref_module = dimension.get("ref_module", "")
ref_id = dimension.get("ref_id", None)
if ref_id is None:
ref_id = orm_name.replace(".", "_") + "_analytic_dimension_id"
code_name = dimension.get("code_name", "name")
code_description = dimension.get("code_description", "description")
# To use an inherited, renamed parent field, you have to give its name.
sync_parent = dimension.get("sync_parent", False)
if sync_parent is True:
sync_parent = nmspc.get("_parent_name", "parent_id")
parent_column = dimension.get("parent_column", column)
bound_code_is_required = dimension.get("required", True)
rel_name = dimension.get("rel_name", tuple())
if rel_name is True:
rel_name = "Name"
if isinstance(rel_name, str):
rel_name = (rel_name, code_name)
rel_description = dimension.get("rel_description", tuple())
if rel_description is True:
rel_description = "Description"
if isinstance(rel_description, str):
rel_description = (rel_description, code_description)
rel_active = dimension.get("rel_active", tuple())
if rel_active is True:
rel_active = "Active"
if isinstance(rel_active, str):
rel_active = (rel_active, "active")
rel_view_type = dimension.get("rel_view_type", tuple())
if rel_view_type is True:
rel_view_type = "View type"
if isinstance(rel_view_type, str):
rel_view_type = (rel_view_type, "view_type")
rel_disabled_per_company = dimension.get(
"rel_disabled_per_company", tuple()
)
if rel_disabled_per_company is True:
rel_disabled_per_company = "Disabled in my company"
if isinstance(rel_disabled_per_company, str):
rel_disabled_per_company = (
rel_disabled_per_company,
"disabled_per_company",
)
# By default, only use inherits if we can be sure there is no conflict
# on the required fields 'name' and 'nd_id'.
# There can still be conflicts on analytic_code's optional fields.
use_inherits = dimension.get("use_inherits", None)
if use_inherits is None:
use_inherits = not (
any(field in nmspc for field in ("name", "nd_id"))
or nmspc.get("_inherits", False)
or nmspc.get("_inherit", False)
)
use_code_name_methods = dimension.get("use_code_name_methods", False)
code_ref_ids = dimension.get("code_ref_ids", False)
if code_ref_ids is True:
code_ref_ids = ref_id
code_ref_module = dimension.get("code_ref_module", "")
if use_inherits:
inherits = nmspc.get("_inherits", {})
inherits["analytic.code"] = column
nmspc["_inherits"] = inherits
# Default column for the underlying analytic code.
if column not in nmspc:
nmspc[column] = fields.Many2one(
"Bound Analytic Code",
required=bound_code_is_required,
ondelete="restrict",
)
rel_cols = [
cols
for cols in [
rel_name + ("name", "Char", True, ""),
rel_description + ("description", "Char", False, ""),
rel_active + ("active", "Boolean", False, True),
rel_view_type + ("view_type", "Boolean", False, False),
]
if len(cols) == 6
]
if rel_cols:
# NOT a method nor a class member. 'self' is the analytic_code OSV.
def _record_from_code_id(self):
"""Get the entries to update from the modified codes."""
obj = self.env.get(orm_name)
domain = [(column, "in", self.ids)]
return obj.search(domain)
for string, model_col, code_col, dtype, req, default in rel_cols:
nmspc[model_col] = getattr(fields, dtype)(
string=string,
related=".".join([column, code_col]),
relation="analytic.code",
required=req,
ondelete="restrict",
store=True,
)
if model_col not in defaults:
defaults[model_col] = default
# In order to preserve inheritance and possible overrides, work on a
# new class that inherits the given bases, then make our model class
# inherit from this class.
superclass_name = "_{name}_SuperDimension".format(name=name)
# Set _register to False in order to prevent its instantiation.
superclass = type(superclass_name, bases, {"_register": False})
# We must keep the old api here !!!!!!!
# If we switch to the new, the method is call through a wrapper
# then, 'self' is a !#@*ing (!) object of the same type of __cls__
# but totally temporary.
# We don't want that cause we set _bound_dimension_id.
# Keep the old api until we fix all this module.
@AddMethod(superclass)
def _setup_complete(self):
"""Load or create the analytic dimension bound to the model."""
super(superclass, self)._setup_complete()
data_obj = self.env["ir.model.data"].sudo()
try:
self._bound_dimension_id = data_obj.get_object_reference(
ref_module, ref_id
)[1]
except ValueError:
vals = {"name": dimension_name}
self._bound_dimension_id = data_obj._update(
"analytic.dimension",
ref_module,
vals,
xml_id=ref_id,
noupdate=True,
)
if code_ref_ids:
prefix = config.get_misc("analytic", "code_ref_prefix", False)
# This function is called as a method and can be overridden.
@AddMethod(superclass)
def _generate_code_ref_id(self, context=None):
data_obj = self.env["ir.model.data"]
records = self
for record in records:
code = record[column]
code_ref_id_builder = [prefix] if prefix else []
if "company_id" in record and record.company_id:
code_ref_id_builder.append(record.company_id.code)
code_ref_id_builder.append("ANC")
code_ref_id_builder.append(code_ref_ids)
code_ref_id_builder.append(code.name)
vals = {
"name": "_".join(code_ref_id_builder),
"module": code_ref_module,
"model": "analytic.code",
"res_id": code.id,
}
data_obj.create(vals)
@AddMethod(superclass)
@api.model
def _get_bound_dimension_id(self):
return (
self.env["ir.model.data"]
.sudo()
.get_object_reference(ref_module, ref_id)[1]
)
@AddMethod(superclass)
@api.model
def _create_analytic_code(self, vals, code_vals):
# Will be set if a new code is created
new_code = False
if use_inherits:
code_vals.update(vals)
else:
code_vals["name"] = vals.get(code_name)
code_vals["description"] = vals.get(code_description)
if code_vals.get("name"):
# OpenERP bug: related fields do not work properly on
# creation.
for rel in rel_cols:
model_col, code_col = rel[1:3]
if model_col in vals:
code_vals[code_col] = vals[model_col]
elif model_col in self._defaults:
code_vals[code_col] = self._defaults[model_col]
# We have to create the code separately, even with
# inherits.
code_obj = self.env["analytic.code"]
code_vals["nd_id"] = self._get_bound_dimension_id()
# These are behind-the-scenes links so bypass security rules.
code = code_obj.sudo().create(code_vals)
vals[column] = code.id
new_code = code
return new_code, vals
@AddMethod(superclass)
@api.model
@api.returns(orm_name, lambda value: value.id)
def create(self, vals, **kwargs):
"""Create the analytic code."""
context = self.env.context
code_vals = {}
if sync_parent:
cp = self._get_code_parent(vals)
if cp is not None and cp:
code_vals["code_parent_id"] = cp.id
# Direct changes to the 'bound analytic code' field are ignored
# unless the 'force_code_id' context key is passed as True.
force_code_id = vals.pop(column, False)
# Will be set if a new code is created
new_code = False
if context and context.get("force_code_id", False):
self._force_code(force_code_id, code_vals)
vals[column] = force_code_id
else:
new_code, vals = self._create_analytic_code(vals, code_vals)
res = super(superclass, self).create(vals, **kwargs)
if not getattr(res, column):
if sync_parent:
cp = self._get_code_parent(vals)
if cp is not None and cp:
code_vals["code_parent_id"] = cp.id
new_code, vals = self._create_analytic_code(
{
field: extract(getattr(res, field), field_data["type"])
for field, field_data in list(res.fields_get().items())
if field
in (list(vals.keys()) + [code_name, code_description])
},
code_vals,
)
super(superclass, res).write({column: new_code.id})
if code_ref_ids:
self._generate_code_ref_id(res)
if new_code:
# These are behind-the-scenes links so bypass security rules.
new_code.sudo().write(
{"origin_id": "{},{}".format(self._name, res.id)}
)
return res
@AddMethod(superclass)
@api.multi
def write(self, vals, **kwargs):
"""Update the analytic code's name if it is not inherited,
and its parent code if parent-child relations are synchronized.
"""
context = self.env.context
code_vals = {}
news = []
standard_process = False
if sync_parent:
cp = self._get_code_parent(vals)
if cp is not None and cp:
code_vals["code_parent_id"] = cp.id
else:
parent = getattr(self, sync_parent)
if parent:
cp = getattr(parent, parent_column)
if cp is not None and cp:
code_vals["code_parent_id"] = cp.id
# Direct changes to the 'bound analytic code' field are ignored
# unless the 'force_code_id' context key is passed as True.
force_code_id = vals.pop(column, False)
if context and context.get("force_code_id", False):
self._force_code(force_code_id, code_vals)
vals[column] = force_code_id
elif use_inherits:
vals.update(code_vals)
else:
name_col = rel_name[1] if rel_name else code_name
rel_description[1] if rel_description else code_description
if name_col in vals:
code_vals["name"] = vals[name_col]
if description_col in vals:
code_vals["description"] = vals[description_col]
standard_process = True
res = super(superclass, self).write(vals, **kwargs)
# If updating records with no code, create these.
if standard_process:
code_obj = self.env["analytic.code"]
for rec in self:
code = getattr(rec, column)
rec_code_vals = code_vals.copy()
rec_vals = dict().copy()
if not rec_code_vals.get("name"):
rec_code_vals["name"] = rec.read([name_col])[0][
name_col
]
if not rec_code_vals.get("description"):
rec_code_vals["description"] = self.read(
[description_col]
)[0][description_col]
if not code and rec_code_vals.get("name"):
news.append(rec.id)
rec_code_vals["nd_id"] = rec._get_bound_dimension_id()
rec_code_vals["origin_id"] = "{},{}".format(
self._name, rec.id
)
# These are behind-the-scenes links so bypass security
# rules.
rec_vals[column] = (
code_obj.sudo().create(rec_code_vals).id
)
super(superclass, rec).write(rec_vals, **kwargs)
elif rec_code_vals:
# These are behind-the-scenes links so bypass security
# rules.
code.sudo().write(rec_code_vals)
if code_ref_ids and news is not False:
for new in news:
self._generate_code_ref_id(new)
return res
@AddMethod(superclass)
def unlink(self, **kwargs):
"""When removing this object, remove all associated analytic
codes referenced by this object.
Note: the method will fail if the code is referenced by any other
object due to the RESTRICT constraint. That is the intended
behavior.
"""
# Find all related codes
codes = self.env["analytic.code"]
for record in self:
codes |= getattr(record, column)
res = super(superclass, self).unlink()
# These are behind-the-scenes links so bypass security rules.
codes.sudo().unlink(**kwargs)
return res
@AddMethod(superclass)
def _force_code(self, force_code_id, code_vals):
code_obj = self.env["analytic.code"]
if not force_code_id:
raise ValueError(
"An analytic code ID MUST be specified if the "
"force_code_id key is enabled in the context"
)
force_code = code_obj.browse(force_code_id)
force_code_dim_id = force_code.nd_id.id
if force_code_dim_id != self._get_bound_dimension_id():
raise ValueError(
"If specified, codes must belong to the bound "
"analytic dimension {}".format(dimension_name)
)
if code_vals:
# These are behind-the-scenes links so bypass security rules.
force_code.sudo().write(code_vals)
if sync_parent:
# This function is called as a method and can be overridden.
@AddMethod(superclass)
def _get_code_parent(self, vals):
"""If parent_id is in the submitted values, return the analytic
code of this parent, to be used as the child's code's parent.
"""
parent_id = vals.get(sync_parent, None)
if parent_id is not None:
if parent_id:
parent_model = self.fields_get([sync_parent])[
sync_parent
].get("relation")
if parent_model:
res = getattr(
self.env[parent_model].browse(parent_id),
)
return res if res else False
else:
return False
return None
if use_code_name_methods:
@AddMethod(superclass)
def name_get(self):
"""Return the analytic code's name."""
code_reads = self.read([column])
c2m = { # Code IDs to model IDs
code_read[column][0]: code_read["id"]
for code_read in code_reads
if code_read[column] is not False
}
names = (
self.env["analytic.code"]
.browse(list(c2m.keys()))
.name_get()
)
return [(c2m[cid], name) for cid, name in names if cid in c2m]
@AddMethod(superclass)
def name_search(
self, name, args=None, operator="ilike", limit=100
):
"""Return the records whose analytic code matches the name."""
code_obj = self.env.get("analytic.code")
args.append(("nd_id", "=", self._get_bound_dimension_id()))
names = code_obj.name_search(name, args, operator, limit)
if not names:
return []
dom = [(column, "in", zip(*names)[0])]
records = self.search(dom)
code_reads = records.read([column])
c2m = { # Code IDs to model IDs
code_read[column][0]: code_read["id"]
for code_read in code_reads
if code_read[column] is not False
}
return [
(c2m[cid], cname) for cid, cname in names if cid in c2m
]
return (superclass,)
def extract(value, typ):
"""Returns the ID, if the value is a browse record.
Returns the IDs, if the value is a browse record list.
Otherwise, returns the value.
"""
result = value
if typ in ["many2many"]:
res = value.ids
res.sort()
result = [(6, 0, list(set(res)))]
elif typ == "many2one":
result = value.id
elif typ == "one2many":
result = False
return result