Skip to content
Snippets Groups Projects
Commit a5d9ce720d6d authored by Houzefa Abbasbhay's avatar Houzefa Abbasbhay :slight_smile:
Browse files

Add a "delayed" mode that uses a cron task to send items (ensures they are...

Add a "delayed" mode that uses a cron task to send items (ensures they are only sent after transaction commits)
parent d1a6e8e33b25
No related branches found
No related tags found
No related merge requests found
...@@ -43,4 +43,6 @@ ...@@ -43,4 +43,6 @@
'data': [ 'data': [
'security/ir.model.access.csv', 'security/ir.model.access.csv',
'data/job_runner_cron_task.xml',
'views/xbus_emitter.xml', 'views/xbus_emitter.xml',
...@@ -46,4 +48,5 @@ ...@@ -46,4 +48,5 @@
'views/xbus_emitter.xml', 'views/xbus_emitter.xml',
'views/xbus_emitter_job.xml',
], ],
'external_dependencies': { 'external_dependencies': {
......
<?xml version="1.0" encoding="utf-8"?>
<openerp>
<data noupdate="1">
<!-- Cron task to run jobs. -->
<record id="xbus_emitter_job_runner_cron_task" model="ir.cron">
<field name="args">()</field>
<field name="doall" eval="False" />
<field name="function">send_jobs</field>
<field name="interval_number">1</field>
<field name="interval_type">minutes</field>
<field name="model">xbus.emitter.job</field>
<field name="name">Xbus emitter sender</field>
<field name="numbercall">-1</field>
</record>
</data>
</openerp>
# flake8: noqa # flake8: noqa
import xbus_emitter import xbus_emitter
import xbus_emitter_job
import json
import logging import logging
from openerp import _ from openerp import _
from openerp import api from openerp import api
...@@ -12,7 +13,7 @@ ...@@ -12,7 +13,7 @@
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class xbus_emitter(models.Model): class XbusEmitter(models.Model):
"""Configuration profile to emit Xbus messages from Odoo. """Configuration profile to emit Xbus messages from Odoo.
""" """
...@@ -66,6 +67,6 @@ ...@@ -66,6 +67,6 @@
return self.id, '%s@%s' % (self.login, self.front_url) return self.id, '%s@%s' % (self.login, self.front_url)
@api.one @api.one
def send_items(self, event_type, items): def send_items(self, event_type, items, delayed=False):
"""Send the data to Xbus, inside one event with the specified event """Send the data to Xbus, inside one event with the specified event
type. type.
...@@ -70,4 +71,5 @@ ...@@ -70,4 +71,5 @@
"""Send the data to Xbus, inside one event with the specified event """Send the data to Xbus, inside one event with the specified event
type. type.
:param event_type: Name of an Xbus event type. :param event_type: Name of an Xbus event type.
:type event_type: String. :type event_type: String.
...@@ -72,3 +74,4 @@ ...@@ -72,3 +74,4 @@
:param event_type: Name of an Xbus event type. :param event_type: Name of an Xbus event type.
:type event_type: String. :type event_type: String.
:type items: Iterable. :type items: Iterable.
...@@ -74,4 +77,11 @@ ...@@ -74,4 +77,11 @@
:type items: Iterable. :type items: Iterable.
:return: The result of an "end_event" Xbus API call.
:param delayed: Whether the emission should only happen after the
database transaction is committed; a scheduled task will run the
emission in that case. Errors, if any, will be logged.
:type delayed: Boolean.
:return: The result of an "end_event" Xbus API call in immediate
(non-delayed) mode; the ID of a job in delayed mode.
""" """
...@@ -76,5 +86,11 @@ ...@@ -76,5 +86,11 @@
""" """
try: if delayed:
return self._send_items_impl(event_type, items) # Just add to the job queue.
self.env['xbus.emitter.job'].create({
'emitter_id': self.id,
'event_type': event_type,
'items': json.dumps(items),
})
...@@ -80,8 +96,14 @@ ...@@ -80,8 +96,14 @@
except (zmq.ZMQBaseError, ZmqRpcError) as error: else:
raise exceptions.Warning(_( # Not delayed; run now and return the result.
'Error when sending data to Xbus: %s'
) % error) try:
return self._send_items_impl(event_type, items)
except (zmq.ZMQBaseError, ZmqRpcError) as error:
raise exceptions.Warning(_(
'Error when sending data to Xbus: %s'
) % error)
def _send_items_impl(self, event_type, items): def _send_items_impl(self, event_type, items):
"""Implementation of the Xbus data sending function that can throw. """Implementation of the Xbus data sending function that can throw.
......
import json
from openerp import _
from openerp import api
from openerp import fields
from openerp import models
class XbusEmitterJob(models.Model):
"""Message to be emitted (or already emitted) to Xbus.
"""
_name = 'xbus.emitter.job'
emitter_id = fields.Many2one(
comodel_name='xbus.emitter',
string='Xbus emitter',
ondelete='cascade',
help='The Xbus emitter profile to send the message with.',
required=True,
)
event_type = fields.Char(
string='Xbus event type',
help='The event type to use when sending items to Xbus.',
required=True,
)
log = fields.Text(
string='Log',
help='Information about the emission of this message.',
readonly=True,
)
items = fields.Text(
string='Items',
help='The data to sent to Xbus, JSON encoded.',
required=True,
)
state = fields.Selection(
selection=[
('to_send', 'To send'),
('sent_success', 'Sent (success)'),
('sent_error', 'Sent (error)'),
],
string='State',
required=True,
track_visibility='onchange',
default='to_send',
)
@api.model
def send_jobs(self):
"""Run jobs that have not yet been sent."""
for job in self.search([('state', '=', 'to_send')]):
try:
job.emitter_id.send_items(
job.event_type, json.loads(job.items)
)
job.write({'log': _('Success!'), 'state': 'sent_success'})
except Exception as error:
job.write({
'log': _('Error when sending data to Xbus: %s') % error,
'state': 'sent_error',
})
return True
id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
xbus_emitter_erp_manager_access,xbus_emitter_erp_manager_access,model_xbus_emitter,base.group_erp_manager,1,1,1,1 xbus_emitter_erp_manager_access,xbus_emitter_erp_manager_access,model_xbus_emitter,base.group_erp_manager,1,1,1,1
xbus_emitter_job_erp_manager_access,xbus_emitter_job_erp_manager_access,model_xbus_emitter_job,base.group_erp_manager,1,1,1,1
xbus_emitter_job_user_access,xbus_emitter_job_user_access,model_xbus_emitter_job,base.group_user,1,0,0,0
xbus_emitter_user_access,xbus_emitter_user_access,model_xbus_emitter,base.group_user,1,0,0,0 xbus_emitter_user_access,xbus_emitter_user_access,model_xbus_emitter,base.group_user,1,0,0,0
<?xml version="1.0" encoding="utf-8"?>
<openerp>
<data>
<!-- Views for the xbus.emitter.job model. -->
<record id="xbus_emitter_job_search" model="ir.ui.view">
<field name="name">xbus_emitter_job_search</field>
<field name="model">xbus.emitter.job</field>
<field name="arch" type="xml">
<search string="Xbus emitter jobs">
<field name="emitter_id" />
<field name="event_type" />
<field name="state" />
<field name="log" />
</search>
</field>
</record>
<record id="xbus_emitter_job_list" model="ir.ui.view">
<field name="name">xbus_emitter_job_list</field>
<field name="model">xbus.emitter.job</field>
<field name="arch" type="xml">
<tree string="Xbus emitter jobs">
<field name="create_date" />
<field name="emitter_id" />
<field name="event_type" />
<field name="state" />
<field name="log" />
</tree>
</field>
</record>
<record id="xbus_emitter_job_form" model="ir.ui.view">
<field name="name">xbus_emitter_job_form</field>
<field name="model">xbus.emitter.job</field>
<field name="arch" type="xml">
<form string="Xbus emitter job">
<sheet>
<group>
<field name="create_date" />
<field name="emitter_id" />
<field name="event_type" />
<field name="state" />
<field name="log" />
</group>
<group>
<field name="items" />
</group>
</sheet>
</form>
</field>
</record>
<!-- Add a menu command to access Xbus emitter jobs. -->
<record id="xbus_emitter_job_action" model="ir.actions.act_window">
<field name="name">Xbus emitter jobs</field>
<field name="res_model">xbus.emitter.job</field>
<field name="view_mode">tree,form</field>
<field name="view_type">form</field>
</record>
<menuitem id="xbus_emitter_job_menu_command" name="Xbus emitter jobs"
parent="base.menu_config" action="xbus_emitter_job_action" />
</data>
</openerp>
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment