############################################################################## # # Xbus emitter for Odoo # Copyright © 2015, 2022, 2023, 2024 XCG Consulting <https://xcg-consulting.fr/> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # ############################################################################## import json from unittest import mock from odoo.addons.xbus_common.tests.modelcase import ModelCase class Test(ModelCase): def test_create_xbus_emitter_job(self): """Basic job creation test using public methods provided by this module.""" with mock.patch.object( self.env.cr, "execute", autospec=True, side_effect=self.env.cr.execute, ): emitter = self.env.ref("xbus_emitter.xbus_emitter_demo_1") self.assertTrue(emitter) message_type = "test_1" body = json.dumps({"foo": "barré"}).encode("UTF-8") job = emitter.emit([(message_type, b"test", body, None)]) self.assertTrue(job) self.assertEqual(job.message_ids[0].type, message_type) self.assertEqual(job.message_ids[0].header, b"test") self.assertEqual(job.message_ids[0].body, b'{"foo": "barr\\u00e9"}') self.assertFalse(job.log) # Not processed yet. self.assertEqual(job.state, "to_send") self.assertFalse(job.process_state) # Ensure a "NOTIFY" postgresql command has been sent. self.env.cr.execute.assert_any_call( "NOTIFY xbus_emitter_job, %s", (str(emitter.id),) ) def test_linked_document(self): """Add an Xbus message with a linked source record; ensure the message has the linked documents.""" job = self.env.ref("xbus_emitter.xbus_emitter_demo_1").emit( [ ( "test_2", None, json.dumps({"foo": "bar"}).encode("UTF-8"), [self.env.user, self.env.company], ) ] ) self.assertTrue(job) message = job.envelope_id.message_ids[0] found_user, found_company = False, False for link in message.link_ids: if link.res_model == "res.users": self.assertFalse(found_user) self.assertEqual(link.res_id, self.env.user.id) found_user = True elif link.res_model == "res.company": self.assertFalse(found_company) self.assertEqual(link.res_id, self.env.company.id) found_company = True else: # pragma: no cover self.fail() self.assertTrue(found_user and found_company)