Skip to content
Snippets Groups Projects
MetaAnalytic.py 30 KiB
Newer Older
from odoo import api, fields
from odoo.osv import orm
from odoo.tools import config, frozendict

from .models.analytic_dimension import get_analytic_size


class AddMethod(object):
    """Utility decorator to add methods to an object or a class."""

    def __init__(self, obj):
        self.obj = obj

    def __call__(self, func):
        setattr(self.obj, func.__name__, func)
        return func


class MetaAnalytic(orm.MetaModel):
    """Allow the model to use the classes of the analytic_structure module
    in a more streamlined way.

    The metaclass' behavior is specified by adding the following attributes:

    * _analytic: define the analytic structures to be used by the model.
    * _dimensions: bind an analytic dimension to the model.

    A description of the syntax expected for each attribute is available in
    the README file.

    Notes:
    * This metaclass may directly modify attributes that are used by OpenERP,
    specifically _inherits and the fields of the class.
    * New superclasses are used to define or override methods, in order to
    avoid interacting with the model's own method (re)definitions.
    """

    def __new__(cls, name, bases, nmspc):

        analytic = nmspc.get("_analytic", {})
        para = nmspc.get("_para_analytic", {})
        dimension = nmspc.get("_dimension", {})
        defaults = nmspc.get("_defaults", None)
            nmspc["_defaults"] = defaults
        orm_name = nmspc.get("_name", None)
            orm_name = nmspc.get("_inherit")

        # Analytic fields should be defined in the _analytic attribute.
        if analytic or para:
            bases = cls._setup_analytic_fields(
                analytic, para, defaults, orm_name, name, bases, nmspc
            )

        # The bound dimension should be defined in the _dimension attribute.
        if dimension:
            bases = cls._setup_bound_dimension(
                dimension, defaults, orm_name, name, bases, nmspc
            )

        return super(MetaAnalytic, cls).__new__(cls, name, bases, nmspc)

    def __init__(self, name, bases, nmspc):
        return super(MetaAnalytic, self).__init__(name, bases, nmspc)

    def generate_formatter(fieldname):

        if not isinstance(fieldname, str):
            return fieldname

        class analytic_code_formatter(object):

            fields = [fieldname]

            def generate(model, vals):
                return vals.get(fieldname)

        return analytic_code_formatter

    @classmethod
    def _setup_analytic_fields(
        cls, analytic, para, defaults, orm_name, name, bases, nmspc
    ):
        """Generate analytic and para-analytic fields on the model."""

        # If _analytic uses a shortcut, convert it into a prefix-model mapping.
        if analytic is True:
            analytic = {"a": orm_name.replace(".", "_")}
            analytic = {"a": analytic}

        # Create a field that will be used for replacement in the view
        if analytic:
            nmspc["analytic_dimensions"] = fields.Char(
                compute=api.one(
                    lambda self: (setattr(self, "analytic_dimensions", ""))
                ),
        col_pattern = "{pre}{n}_{suf}"
        size = get_analytic_size()

        # Generate the fields directly into the nmspc.
        all_analytic = []

        for prefix, model_name in list(analytic.items()):
            # Analytic fields
            all_analytic.append((model_name, prefix, "id"))
                col_name = col_pattern.format(pre=prefix, n=n, suf="id")
                domain_field = "nd_id.ns{n}_id.model_name".format(n=n)
                        (domain_field, "=", model_name),
                        ("view_type", "=", False),
                        ("disabled_per_company", "=", False),
                    ondelete="restrict",
                    track_visibility="onchange",
                    index=True,
                )

        for key, value in list(para.items()):
            # Para-analytic fields
            prefix, suffix = key
            model_name = value["model"]
            all_analytic.append((model_name, prefix, suffix))
                raise ValueError("Para-analytic suffix cannot be 'id'")

            field_type = value["type"]
            args = value["args"]
            kwargs = value["kwargs"]
            for n in range(1, size + 1):
                col_name = col_pattern.format(pre=prefix, n=n, suf=suffix)
                nmspc[col_name] = field_type(*args, **kwargs)
                if "default" in value:
                    defaults[col_name] = value["default"]

        # In order to preserve inheritance and possible overrides, work on a
        # new class that inherits the given bases, then make our model class
        # inherit from this class.
        superclass_name = "_{name}_SuperAnalytic".format(name=name)
        # Set _register to False in order to prevent its instantiation.
        superclass = type(superclass_name, bases, {"_register": False})
        def get_analytic_field_names(self):

            field_names = []
            analytic_osv = self.env["analytic.structure"]
            for model_name, prefix, suffix in all_analytic:
                for ordering in analytic_osv.get_dimensions_names(model_name):
                    field_names.append("%s%s_%s" % (prefix, ordering, suffix))

            return field_names

        @AddMethod(superclass)
        @api.model
        def fields_get(self, allfields=None, attributes=None):
            """Override this method to rename analytic fields."""

            res = super(superclass, self).fields_get(
                allfields=allfields, attributes=attributes
            analytic_osv = self.env["analytic.structure"]

            for model_name, prefix, suffix in all_analytic:
                res = analytic_osv.analytic_fields_get(
                    model_name, res, prefix, suffix
                )

            return res

        @AddMethod(superclass)
        @api.model
        def fields_view_get(
            self, view_id=None, view_type="form", toolbar=False, submenu=False
        ):
            """Override this method to hide unused analytic fields."""

            res = super(superclass, self).fields_view_get(
                view_id=view_id,
                view_type=view_type,
                toolbar=toolbar,
            analytic_osv = self.env["analytic.structure"]

            for model_name, prefix, suffix in all_analytic:
                res = analytic_osv.analytic_fields_view_get(
                    model_name, res, prefix, suffix
                )

            return res

        @AddMethod(superclass)
        @api.model
        @api.returns(orm_name, lambda value: value.id)
        def create(self, vals, **kwargs):
            """Performs analytic field validation"""
            res = super(superclass, self).create(vals, **kwargs)
            # Throws in case of error
            res._validate_analytic_fields(frozendict(analytic))
            return res

        @AddMethod(superclass)
        @api.multi
        def write(self, vals, **kwargs):
            """Performs analytic field validation"""
            res = super(superclass, self).write(vals, **kwargs)
            # Throws in case of error
            self._validate_analytic_fields(frozendict(analytic))
            return res

        @AddMethod(superclass)
        @api.multi
        def _validate_analytic_fields(self, analytic):
            """Validation function to validate analytic fields.
            The base implementation doesn't actually do anything.
            :param analytic: frozendict, analytic field parameters, such as
                   they would appear in the '_analytic' in the expanded
                   form, ie. as a prefix => model_name mapping.
            :raises: Validation error when applicable.
            """
            pass

        @AddMethod(superclass)
        @api.model
        def _required_analytic_fields(self):

            return (
                self.env["analytic.structure"]
                .search(
                    [
                        (
                            "model_name",
                            "=",
                            self._analytic
                            if hasattr(self, "_analytic")
                            and isinstance(self._analytic, str)
                            else self._name.replace(".", "_"),
                        ),
                        ("required", "=", True),
                    ]
                )
                .mapped("ordering")
            )

        return (superclass,)

    @classmethod
    def _setup_bound_dimension(
        cls, dimension, defaults, orm_name, name, bases, nmspc
    ):
        """Bind a dimension to the model, creating a code for each record."""

        if dimension is True:
            dimension = {}
        elif isinstance(dimension, str):
            dimension = {"name": dimension}
        automatic_generation = dimension.get("automatic", True)

        dimension_name = dimension.get("name", None)
            dimension_name = nmspc.get("_description", False) or orm_name
        column = dimension.get("column", "analytic_id")
        ref_module = dimension.get("ref_module", "")
        ref_id = dimension.get("ref_id", None)
            ref_id = orm_name.replace(".", "_") + "_analytic_dimension_id"
        code_name = cls.generate_formatter(dimension.get("code_name", "name"))
        code_description = cls.generate_formatter(
            dimension.get("code_description", "description")
        )

        # To use an inherited, renamed parent field, you have to give its name.
        sync_parent = dimension.get("sync_parent", False)
            sync_parent = nmspc.get("_parent_name", "parent_id")
        parent_column = dimension.get("parent_column", column)
        bound_code_is_required = dimension.get(
            "required", automatic_generation
        )
        rel_name = dimension.get("rel_name", tuple())
        if rel_name is True:
            rel_name = "Name"
        if isinstance(rel_name, str):
            rel_name = (rel_name, code_name)

        rel_description = dimension.get("rel_description", tuple())
        if rel_description is True:
            rel_description = "Description"
        if isinstance(rel_description, str):
            rel_description = (rel_description, code_description)

        rel_active = dimension.get("rel_active", tuple())
        if rel_active is True:
            rel_active = "Active"
        if isinstance(rel_active, str):
            rel_active = (rel_active, "active")
        rel_view_type = dimension.get("rel_view_type", tuple())
        if rel_view_type is True:
            rel_view_type = "View type"
        if isinstance(rel_view_type, str):
            rel_view_type = (rel_view_type, "view_type")

        rel_disabled_per_company = dimension.get(
            "rel_disabled_per_company", tuple()
        )
        if rel_disabled_per_company is True:
            rel_disabled_per_company = "Disabled in my company"
        if isinstance(rel_disabled_per_company, str):
            rel_disabled_per_company = (
                rel_disabled_per_company,
                "disabled_per_company",
            )

        # By default, only use inherits if we can be sure there is no conflict
        # on the required fields 'name' and 'nd_id'.
        # There can still be conflicts on analytic_code's optional fields.
        use_inherits = dimension.get("use_inherits", None)
        if use_inherits is None:
            use_inherits = not (
                any(field in nmspc for field in ("name", "nd_id"))
                or nmspc.get("_inherits", False)
                or nmspc.get("_inherit", False)
        use_code_name_methods = dimension.get("use_code_name_methods", False)
        code_ref_ids = dimension.get("code_ref_ids", False)
        if code_ref_ids is True:
            code_ref_ids = ref_id

        code_ref_module = dimension.get("code_ref_module", "")
            inherits = nmspc.get("_inherits", {})
            inherits["analytic.code"] = column
            nmspc["_inherits"] = inherits

        # Default column for the underlying analytic code.
        if column not in nmspc:
            nmspc[column] = fields.Many2one(
                required=bound_code_is_required,
            cols
            for cols in [
                rel_name + ("name", "Char", True, ""),
                rel_description + ("description", "Char", False, ""),
                rel_active + ("active", "Boolean", False, True),
                rel_view_type + ("view_type", "Boolean", False, False),
            ]
            if len(cols) == 6
        ]

        if rel_cols:
            # NOT a method nor a class member. 'self' is the analytic_code OSV.
            def _record_from_code_id(self):
                """Get the entries to update from the modified codes."""
                obj = self.env.get(orm_name)
                domain = [(column, "in", self.ids)]
                return obj.search(domain)

            for string, model_col, code_col, dtype, req, default in rel_cols:
                nmspc[model_col] = getattr(fields, dtype)(
                    string=string,
                    related=".".join([column, code_col]),
                    relation="analytic.code",
                    required=req,
                    ondelete="restrict",
                    store=True,
                )
                if model_col not in defaults:
                    defaults[model_col] = default

        # In order to preserve inheritance and possible overrides, work on a
        # new class that inherits the given bases, then make our model class
        # inherit from this class.
        superclass_name = "_{name}_SuperDimension".format(name=name)
        # Set _register to False in order to prevent its instantiation.
        superclass = type(superclass_name, bases, {"_register": False})

        # We must keep the old api here !!!!!!!
        # If we switch to the new, the method is call through a wrapper
        # then, 'self' is a !#@*ing (!) object of the same type of __cls__
        # but totally temporary.
        # We don't want that cause we set _bound_dimension_id.
        # Keep the old api until we fix all this module.
        @AddMethod(superclass)
        def _setup_complete(self):
            """Load or create the analytic dimension bound to the model."""

            super(superclass, self)._setup_complete()

            data_obj = self.env["ir.model.data"].sudo()
            try:
                self._bound_dimension_id = data_obj.get_object_reference(
                    ref_module, ref_id
                )[1]
            except ValueError:
                vals = {"name": dimension_name}
                self._bound_dimension_id = data_obj._update(
                    "analytic.dimension",
                    ref_module,
                    vals,
                    xml_id=ref_id,
                    noupdate=True,
            prefix = config.get_misc("analytic", "code_ref_prefix", False)

            # This function is called as a method and can be overridden.
            @AddMethod(superclass)
            def _generate_code_ref_id(self, context=None):
                data_obj = self.env["ir.model.data"]
                records = self

                for record in records:
                    code = record[column]
                    code_ref_id_builder = [prefix] if prefix else []
                    if "company_id" in record and record.company_id:
                        code_ref_id_builder.append(record.company_id.code)
                    code_ref_id_builder.append("ANC")
                    code_ref_id_builder.append(code_ref_ids)
                    code_ref_id_builder.append(code.name)

                    vals = {
                        "name": "_".join(code_ref_id_builder),
                        "module": code_ref_module,
                        "model": "analytic.code",
                        "res_id": code.id,
                    }
                    data_obj.create(vals)

        @AddMethod(superclass)
        @api.model
        def _get_bound_dimension_id(self):
            return (
                self.env["ir.model.data"]
                .sudo()
                .get_object_reference(ref_module, ref_id)[1]
            )

        @AddMethod(superclass)
        @api.model
        def _create_analytic_code(self, vals, code_vals):

            # Will be set if a new code is created
            new_code = False

            if use_inherits:
                code_vals.update(vals)
            else:
                if set(code_name.fields).issubset(set(vals.keys())):
                    code_vals["name"] = code_name.generate(self, vals)
                if set(code_description.fields).issubset(set(vals.keys())):
                    code_vals["description"] = code_description.generate(
                        self, vals
                    )
            if code_vals.get("name"):

                # OpenERP bug: related fields do not work properly on
                # creation.
                for rel in rel_cols:
                    model_col, code_col = rel[1:3]
                    if model_col in vals:
                        code_vals[code_col] = vals[model_col]
                    elif model_col in self._defaults:
                        code_vals[code_col] = self._defaults[model_col]

                # We have to create the code separately, even with
                # inherits.
                code_obj = self.env["analytic.code"]
                code_vals["nd_id"] = self._get_bound_dimension_id()
                # These are behind-the-scenes links so bypass security rules.
                code = code_obj.sudo().create(code_vals)
                vals[column] = code.id
                new_code = code

            return new_code, vals

            @AddMethod(superclass)
            @api.model
            @api.returns(orm_name, lambda value: value.id)
            def create(self, vals, **kwargs):
                """Create the analytic code."""

                if sync_parent:
                    cp = self._get_code_parent(vals)
                    if cp is not None and cp:
                        code_vals["code_parent_id"] = cp.id
                # Direct changes to the 'bound analytic code' field are ignored
                # unless the 'force_code_id' context key is passed as True.
                force_code_id = vals.pop(column, False)
                # Will be set if a new code is created
                new_code = False
                if context and context.get("force_code_id", False):
                    self._force_code(force_code_id, code_vals)
                    vals[column] = force_code_id
                else:
                    new_code, vals = self._create_analytic_code(
                        vals, code_vals
                    )
                res = super(superclass, self).create(vals, **kwargs)
                if not getattr(res, column):

                    if sync_parent:
                        cp = self._get_code_parent(vals)
                            code_vals["code_parent_id"] = cp.id
                    new_code, vals = self._create_analytic_code(
                        {
                            field: extract(
                                getattr(res, field), field_data["type"]
                            )
                            for field, field_data in list(
                                res.fields_get().items()
                            )
                            if field
                            in (
                                list(vals.keys())
                    if new_code:
                        super(superclass, res).write({column: new_code.id})
                if code_ref_ids:
                    self._generate_code_ref_id(res)
                if new_code:
                    # These are behind-the-scenes links so bypass security
                    # rules.
                    new_code.sudo().write(
                        {"origin_id": "{},{}".format(self._name, res.id)}
                    )

                return res
            @AddMethod(superclass)
            @api.multi
            def write(self, vals, **kwargs):
                """Update the analytic code's name if it is not inherited,
                and its parent code if parent-child relations are synchronized.
                """
                context = self.env.context
                code_vals = {}
                news = []
                standard_process = False
                if sync_parent:
                    cp = self._get_code_parent(vals)
                    if cp is not None and cp:
                        code_vals["code_parent_id"] = cp.id
                    else:
                        parent = getattr(self, sync_parent)
                        if parent:
                            cp = getattr(parent, parent_column)
                            if cp is not None and cp:
                                code_vals["code_parent_id"] = cp.id
                # Direct changes to the 'bound analytic code' field are ignored
                # unless the 'force_code_id' context key is passed as True.
                force_code_id = vals.pop(column, False)
                if context and context.get("force_code_id", False):
                    self._force_code(force_code_id, code_vals)
                    vals[column] = force_code_id
                else:
                    name_col = rel_name[1] if rel_name else code_name
                    description_col = (
                        rel_description[1]
                        if rel_description
                        else code_description
                    )
                    if set(name_col.fields).issubset(set(vals.keys())):
                        code_vals["name"] = name_col.generate(self, vals)

                    if set(description_col.fields).issubset(set(vals.keys())):
                        code_vals["description"] = description_col.generate(
                            self, vals
                    standard_process = True

                res = super(superclass, self).write(vals, **kwargs)

                # If updating records with no code, create these.
                if standard_process:
                    code_obj = self.env["analytic.code"]

                    for rec in self:

                        code = getattr(rec, column)
                        rec_code_vals = code_vals.copy()
                        rec_vals = dict().copy()
                            rec_code_vals["name"] = name_col.generate(
                                rec, rec.read(name_col.fields)[0]
                            )
                        if not rec_code_vals.get("description"):
                            rec_code_vals[
                                "description"
                            ] = description_col.generate(
                                rec, rec.read(description_col.fields)[0]
                            )
                        if not code and rec_code_vals.get("name"):
                            news.append(rec.id)
                            rec_code_vals[
                                "nd_id"
                            ] = rec._get_bound_dimension_id()  # noqa: E501
                            rec_code_vals["origin_id"] = "{},{}".format(
                                self._name, rec.id
                            )
                            # These are behind-the-scenes links so bypass
                            # security rules.
                            rec_vals[column] = (
                                code_obj.sudo().create(rec_code_vals).id
                            )
                            super(superclass, rec).write(rec_vals, **kwargs)
                        elif rec_code_vals:
                            # These are behind-the-scenes links so bypass
                            # security rules.
                            code.sudo().write(rec_code_vals)
                if code_ref_ids and news is not False:
                    for new in news:
                        self._generate_code_ref_id(new)

                return res

        @AddMethod(superclass)
        def unlink(self, **kwargs):
            """When removing this object, remove all associated analytic
            codes referenced by this object.
            Note: the method will fail if the code is referenced by any other
            object due to the RESTRICT constraint. That is the intended
            behavior.
            """

            # Find all related codes
            codes = self.env["analytic.code"]

            for record in self:
                codes |= getattr(record, column)

            res = super(superclass, self).unlink()

            # These are behind-the-scenes links so bypass security rules.
            codes.sudo().unlink(**kwargs)

            return res

        @AddMethod(superclass)
        def _force_code(self, force_code_id, code_vals):

            code_obj = self.env["analytic.code"]

            if not force_code_id:
                raise ValueError(
                    "An analytic code ID MUST be specified if the "
                    "force_code_id key is enabled in the context"
                )
            force_code = code_obj.browse(force_code_id)
            force_code_dim_id = force_code.nd_id.id
            if force_code_dim_id != self._get_bound_dimension_id():
                raise ValueError(
                    "If specified, codes must belong to the bound "
                    "analytic dimension {}".format(dimension_name)
                )
            if code_vals:
                # These are behind-the-scenes links so bypass security rules.
                force_code.sudo().write(code_vals)

        if sync_parent:
            # This function is called as a method and can be overridden.
            @AddMethod(superclass)
            def _get_code_parent(self, vals):
                """If parent_id is in the submitted values, return the analytic
                code of this parent, to be used as the child's code's parent.
                """
                parent_id = vals.get(sync_parent, None)
                if parent_id is not None:

                    if parent_id:

                        parent_model = self.fields_get([sync_parent])[
                            sync_parent
                        ].get("relation")

                        if parent_model:
                            res = getattr(
                                self.env[parent_model].browse(parent_id),
                            )

                        return res if res else False
                    else:
                        return False
                return None

        if use_code_name_methods:

            @AddMethod(superclass)
            def name_get(self):
                """Return the analytic code's name."""

                code_reads = self.read([column])
                c2m = {  # Code IDs to model IDs
                    code_read[column][0]: code_read["id"]
                    for code_read in code_reads
                    if code_read[column] is not False
                }
                names = (
                    self.env["analytic.code"]
                    .browse(list(c2m.keys()))
                    .name_get()
                )
                return [(c2m[cid], name) for cid, name in names if cid in c2m]

            @AddMethod(superclass)
            def name_search(
                self, name, args=None, operator="ilike", limit=100
            ):
                """Return the records whose analytic code matches the name."""

                code_obj = self.env.get("analytic.code")
                args.append(("nd_id", "=", self._get_bound_dimension_id()))
                names = code_obj.name_search(name, args, operator, limit)
                if not names:
                    return []
                dom = [(column, "in", zip(*names)[0])]
                records = self.search(dom)
                code_reads = records.read([column])
                c2m = {  # Code IDs to model IDs
                    code_read[column][0]: code_read["id"]
                    for code_read in code_reads
                    if code_read[column] is not False
                }
                return [
                    (c2m[cid], cname) for cid, cname in names if cid in c2m
                ]

        return (superclass,)


def extract(value, typ):
    """Returns the ID, if the value is a browse record.
    Returns the IDs, if the value is a browse record list.
    Otherwise, returns the value.
    """

    result = value
    if typ in ["many2many"]:
        res = value.ids
        res.sort()
        result = [(6, 0, list(set(res)))]
    elif typ == "many2one":
    elif typ == "one2many":