Skip to content
Snippets Groups Projects
Commit e5335ca39b43 authored by Vincent Hatakeyama's avatar Vincent Hatakeyama
Browse files

:sparkles: Add possibility to emit request

parent 4b0be37db052
Branches branch/11.0
No related tags found
1 merge request!28✨ Add possibility to emit request
Pipeline #55378 passed
......@@ -6,6 +6,8 @@
Heavy refactor of the module to use common message and envelope models.
Added new request mode.
16.0.1.0.2
----------
......
......@@ -30,6 +30,12 @@
actually emit the data (all this module does is store it in a table and
send a signal to that external component).
The module also allows sending requests.
To handle responses, the method queue_response_handling from the model
`xbus.emitter.job` must be implemented.
Note that the requests are received in a different transaction than the one they are
sent in.
Setup
-----
......
......@@ -38,8 +38,8 @@
)
@returns("xbus.emitter.job")
def emit_envelopes(self, envelopes: models.Model):
def emit_envelopes(self, envelopes: models.Model, request: bool = False):
"""Emit the envelopes with the provided emitter. It can only be done with a
single emitter.
:param envelopes: Envelopes to send.
......@@ -42,7 +42,8 @@
"""Emit the envelopes with the provided emitter. It can only be done with a
single emitter.
:param envelopes: Envelopes to send.
:param request: True if the emission is a request.
:return: The new jobs. Odoo "xbus.emitter.job" record set.
:rtype: models.Model
......@@ -54,6 +55,7 @@
{
"emitter_id": self.id,
"envelope_id": envelope.id,
"is_request": request,
}
for envelope in envelopes
]
......@@ -70,9 +72,13 @@
return emitter_jobs
# It is best to create envelope and their messages with create rather than using
# this method. If new fields are added, this method will probably be unable to
# handle them without changes.
# Code will also avoid the potential issues in this method, making it more robust.
@returns("xbus.emitter.job")
def emit(
self,
messages_content: list[
tuple[str, bytes | None, bytes, Collection[models.Model] | None]
],
......@@ -73,9 +79,11 @@
@returns("xbus.emitter.job")
def emit(
self,
messages_content: list[
tuple[str, bytes | None, bytes, Collection[models.Model] | None]
],
request: bool = False,
comment: str = "emit",
):
"""Create an envelope and messages for each emitter and send them.
......@@ -84,6 +92,8 @@
message type. The second one is the optional header. The third one is the body.
The fourth one is an optional record set, to indicate a link between the message
and those record sets.
:param request: True if the emission is a request.
:param comment: comment for the envelope
:return: The new jobs. Odoo "xbus.emitter.job" record set.
:rtype: models.Model
......@@ -102,9 +112,10 @@
# Create one envelope by emitter
envelopes = list(
self.env["xbus.envelope"].create(
[{"comment": "emit", "message_ids": messages_values.copy()}] * len(self)
[{"comment": comment, "message_ids": messages_values.copy()}]
* len(self)
)
)
# Then send them
jobs = self.env["xbus.emitter.job"]
for emitter in self:
......@@ -106,9 +117,9 @@
)
)
# Then send them
jobs = self.env["xbus.emitter.job"]
for emitter in self:
jobs |= emitter.emit_envelopes(envelopes.pop())
jobs |= emitter.emit_envelopes(envelopes.pop(), request)
return jobs
def _send_notifications(self):
......
......@@ -94,9 +94,23 @@
date_sent = fields.Datetime(string="Envelope Sending Date", readonly=True)
date_done = fields.Datetime(string="Process Termination Date", readonly=True)
is_request = fields.Boolean(string="Request")
response_envelope_id = fields.Many2one(
string="Response Envelope",
comodel_name="xbus.envelope",
index=True,
ondelete="cascade",
readonly=True,
)
# This field is not to be updated by Odoo, it should be False then xbus-odoo updates
# it. It is required so that the field is set as not null in the database.
response_acked = fields.Boolean(readonly=True, default=False, required=True)
_sql_constraints = (
(
"envelope_uniq_per_emitter",
"unique(emitter_id, envelope_id)",
"An envelope can only be emitted by a single emitter",
),
......@@ -97,8 +111,13 @@
_sql_constraints = (
(
"envelope_uniq_per_emitter",
"unique(emitter_id, envelope_id)",
"An envelope can only be emitted by a single emitter",
),
(
"response_only_for_requests",
"check((not is_request and response_envelope_id is null) or is_request)",
"A response can only be given to a request",
),
)
......@@ -103,5 +122,20 @@
)
def notify_response(self):
"""Called to indicate a response was received"""
self.invalidate_recordset(["response_envelope_id"])
self.filtered(lambda job: job.is_request).queue_response_handling()
return True
def queue_response_handling(self):
"""When handling response, this method must be implemented."""
# This method should finish quickly and delegate the actual work for later.
# One way to do that is to use the queue_job module from OCA:
# https://github.com/OCA/queue
# Another is to define a new model, store the data in it and use a cron job to
# handle the responses.
raise NotImplementedError # pragma: no cover
@api.model
@api.autovacuum
def _gc_jobs(self):
......
......@@ -18,4 +18,5 @@
#
##############################################################################
import uuid
from datetime import datetime, timedelta
......@@ -21,6 +22,7 @@
from datetime import datetime, timedelta
from unittest import mock
from odoo import Command
from odoo.addons.xbus_common.tests.modelcase import ModelCase
......@@ -22,8 +24,10 @@
from odoo import Command
from odoo.addons.xbus_common.tests.modelcase import ModelCase
from ..models.xbus_emitter_job import XbusEmitterJob
def past_dt(time_delta):
return (datetime.now() - time_delta).replace(microsecond=0)
......@@ -123,3 +127,37 @@
klass._gc_jobs() # pylint: disable=protected-access
self.assertEqual(3, len(klass.search([])))
def test_notify_response(self):
jobs = self.env.ref("xbus_emitter.xbus_emitter_demo_1").emit_envelopes(
self.envelopes[0], True
)
# simulate external writing (xbus-odoo) of the response
uid = str(uuid.uuid4())
self.cr.execute(
"INSERT INTO xbus_envelope (comment, uid) VALUES ('response', %s) "
"RETURNING id",
(uid,),
)
envelope_id = self.cr.fetchall()[0][0]
uid = str(uuid.uuid4())
self.cr.execute(
"INSERT INTO xbus_message (envelope_id, uid, type, header, body) VALUES "
"(%s, %s, %s, %s, %s) RETURNING id",
(envelope_id, uid, "test_1", None, b"{'body': 'value'}"),
)
self.cr.execute(
"UPDATE xbus_emitter_job SET response_envelope_id=%s WHERE id IN %s",
(envelope_id, tuple(jobs.ids)),
)
# notify_response would be called by xbus-odoo
with mock.patch.object(
XbusEmitterJob,
"queue_response_handling",
autospec=True,
):
rpc_reply = jobs.notify_response()
self.assertTrue(rpc_reply)
XbusEmitterJob.queue_response_handling.assert_called_once()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment