############################################################################## # # Xbus Common, for Odoo # Copyright © 2023, 2024 XCG Consulting <https://xcg-consulting.fr> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # ############################################################################## import uuid from odoo import api, fields, models, tools # type: ignore[import-untyped] class XbusEnvelope(models.Model): """Represents an xbus envelope.""" _name = "xbus.envelope" _description = "Xbus Envelope" _auto = False # No automatic column for this table, it is used with an external system _log_access = False _order = "create_date DESC, id DESC" _rec_name = "uid" create_date = fields.Datetime(string="Creation Date", readonly=True) uid = fields.Char(string="UID", default=lambda self: uuid.uuid4()) comment = fields.Char() message_ids = fields.One2many( string="Messages", comodel_name="xbus.message", inverse_name="envelope_id" ) message_count = fields.Integer( string="Number of messages", compute="_compute_message_count", store=False ) message_type = fields.Char( string="Message Type", compute="_compute_first_record_of_messages", help="This field is linked to the type of the first message in the envelope", ) header = fields.Binary( string="Message Header", compute="_compute_first_record_of_messages", help="This field is linked to the header of the first message in the envelope", ) body = fields.Binary( string="Message Body", compute="_compute_first_record_of_messages", help="This field is linked to the body of the first message in the envelope", ) header_json = fields.Text( string="Message Header Json", compute="_compute_first_record_of_messages", help="""This field is linked to the header json of the first message in the envelope""", ) body_json = fields.Text( string="Message Body Json", compute="_compute_first_record_of_messages", help="""This field is linked to the body json of the first message in the envelope""", ) @api.depends("message_ids") def _compute_first_record_of_messages(self): for record in self: first_record = ( record.message_ids[0] if len(record.message_ids) > 0 else self.env["xbus.message"] ) record.message_type = first_record.type if first_record else False record.header = first_record.header if first_record else False record.body = first_record.body if first_record else False record.header_json = first_record.header_json if first_record else False record.body_json = first_record.body_json if first_record else False @api.depends("message_ids") def _compute_message_count(self): for record in self: record.message_count = len(record.message_ids) # Action to display only messages coming from this envelope def action_open_message(self): action = self.env["ir.actions.actions"]._for_xml_id( "xbus_common.xbus_message_action" ) action["context"] = {"default_envelope_id": self.id} # for message creations action["domain"] = [("envelope_id", "=", self.id)] return action def init(self): super().init() self.env.cr.execute( """CREATE TABLE IF NOT EXISTS xbus_envelope ( id serial primary key, create_date timestamp without time zone DEFAULT (now() at time zone 'utc'), comment character varying not null, uid uuid not null )""" ) tools.create_unique_index( self._cr, "xbus_envelope_uid_uniq", self._table, ["uid"] ) tools.create_index( self._cr, "xbus_envelope_create_date_index", self._table, ["create_date"] )