Skip to content
Snippets Groups Projects
test_xbus_emitter.py 2.07 KiB
Newer Older
Houzefa Abbasbhay's avatar
Houzefa Abbasbhay committed
##############################################################################
#
#    Xbus emitter for Odoo
#    Copyright (C) 2015, 2022, 2023 XCG Consulting <https://xcg-consulting.fr/>
Houzefa Abbasbhay's avatar
Houzefa Abbasbhay committed
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Affero General Public License as
#    published by the Free Software Foundation, either version 3 of the
#    License, or (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################

from unittest import mock
Vincent Hatakeyama's avatar
Vincent Hatakeyama committed

class Test(TransactionCase):
    def test_create_xbus_emitter_job(self):
Houzefa Abbasbhay's avatar
Houzefa Abbasbhay committed
        """Basic job creation test using public methods provided by this
        module.
        """
szeka.wong's avatar
szeka.wong committed
        with mock.patch.object(
Christophe de Vienne's avatar
Christophe de Vienne committed
            self.env.cr,
            "execute",
            autospec=True,
            side_effect=self.env.cr.execute,
szeka.wong's avatar
szeka.wong committed
        ):
            # Use the default emitter loaded by this module.
Christophe de Vienne's avatar
Christophe de Vienne committed
            emitter = self.env["xbus.emitter"].search([], limit=1)
szeka.wong's avatar
szeka.wong committed
            self.assertTrue(emitter)
Houzefa Abbasbhay's avatar
Houzefa Abbasbhay committed

Houzefa Abbasbhay's avatar
Houzefa Abbasbhay committed

szeka.wong's avatar
szeka.wong committed
            self.assertTrue(job)
szeka.wong's avatar
szeka.wong committed
            self.assertFalse(job.log)  # Not processed yet.
            self.assertEqual(job.items, '{"foo": "bar"}')  # JSON-ified
Christophe de Vienne's avatar
Christophe de Vienne committed
            self.assertEqual(job.state, "to_send")
Houzefa Abbasbhay's avatar
Houzefa Abbasbhay committed

szeka.wong's avatar
szeka.wong committed
            # Ensure a "NOTIFY" postgresql command has been sent.
            self.env.cr.execute.assert_any_call(
Christophe de Vienne's avatar
Christophe de Vienne committed
                "NOTIFY xbus_emitter_job, '%s'" % emitter.id
szeka.wong's avatar
szeka.wong committed
            )