############################################################################## # # Xbus emitter for Odoo # Copyright (C) 2015, 2022, 2023 XCG Consulting <https://xcg-consulting.fr/> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # ############################################################################## from unittest import mock from odoo.tests import TransactionCase class Test(TransactionCase): def test_create_xbus_emitter_job(self): """Basic job creation test using public methods provided by this module. """ with mock.patch.object( self.env.cr, "execute", autospec=True, side_effect=self.env.cr.execute, ): # Use the default emitter loaded by this module. emitter = self.env["xbus.emitter"].search([], limit=1) self.assertTrue(emitter) event_type = "test_event_type" data = {"foo": "bar"} job = emitter.send_item(event_type, data) self.assertTrue(job) self.assertEqual(job.event_type, event_type) self.assertFalse(job.log) # Not processed yet. self.assertEqual(job.items, '{"foo": "bar"}') # JSON-ified self.assertEqual(job.state, "to_send") # Ensure a "NOTIFY" postgresql command has been sent. self.env.cr.execute.assert_any_call( "NOTIFY xbus_emitter_job, '%s'" % emitter.id )