Skip to content
Snippets Groups Projects
test_xbus_emitter.py 2.26 KiB
Newer Older
Houzefa Abbasbhay's avatar
Houzefa Abbasbhay committed
##############################################################################
#
#    Xbus emitter for Odoo
#    Copyright (C) 2015 XCG Consulting <http://odoo.consulting>
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Affero General Public License as
#    published by the Free Software Foundation, either version 3 of the
#    License, or (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################

import odoo

from .util.odoo_tests import TestBase
from .util.uuidgen import genUuid

oury.balde's avatar
oury.balde committed
import mock  # Odoo req.
Houzefa Abbasbhay's avatar
Houzefa Abbasbhay committed

# Save this method so we can wrap it in a mock.
prev_sql_execute = odoo.sql_db.Cursor.execute


Houzefa Abbasbhay's avatar
Houzefa Abbasbhay committed
@odoo.tests.common.at_install(False)
@odoo.tests.common.post_install(True)
class Test(TestBase):
szeka.wong's avatar
szeka.wong committed
    def test_0000_create_xbus_emitter_job(self):
Houzefa Abbasbhay's avatar
Houzefa Abbasbhay committed
        """Basic job creation test using public methods provided by this
        module.
        """
szeka.wong's avatar
szeka.wong committed
        with mock.patch.object(
Christophe de Vienne's avatar
Christophe de Vienne committed
            self.env.cr,
            "execute",
            autospec=True,
            side_effect=self.env.cr.execute,
szeka.wong's avatar
szeka.wong committed
        ):
            # Use the default emitter loaded by this module.
Christophe de Vienne's avatar
Christophe de Vienne committed
            emitter = self.env["xbus.emitter"].search([], limit=1)
szeka.wong's avatar
szeka.wong committed
            self.assertTrue(emitter)
Houzefa Abbasbhay's avatar
Houzefa Abbasbhay committed

szeka.wong's avatar
szeka.wong committed
            EVENT_TYPE = genUuid()
Christophe de Vienne's avatar
Christophe de Vienne committed
            DATA = {"foo": "bar"}
Houzefa Abbasbhay's avatar
Houzefa Abbasbhay committed

szeka.wong's avatar
szeka.wong committed
            job = emitter.send_items(EVENT_TYPE, DATA)
            self.assertTrue(job)
            self.assertEqual(job.event_type, EVENT_TYPE)
            self.assertFalse(job.log)  # Not processed yet.
            self.assertEqual(job.items, '{"foo": "bar"}')  # JSON-ified
Christophe de Vienne's avatar
Christophe de Vienne committed
            self.assertEqual(job.state, "to_send")
Houzefa Abbasbhay's avatar
Houzefa Abbasbhay committed

szeka.wong's avatar
szeka.wong committed
            # Ensure a "NOTIFY" postgresql command has been sent.
            self.env.cr.execute.assert_any_call(
Christophe de Vienne's avatar
Christophe de Vienne committed
                "NOTIFY xbus_emitter_job, '%s'" % emitter.id
szeka.wong's avatar
szeka.wong committed
            )