Skip to content
Snippets Groups Projects
Commit a61599c26216 authored by Christophe de Vienne's avatar Christophe de Vienne
Browse files

Split xbusemitter into xbus/emitter & xbus/emissionqueue

parent f63320edc50f
No related branches found
No related tags found
No related merge requests found
Pipeline #119675 failed
...@@ -304,6 +304,10 @@ ...@@ -304,6 +304,10 @@
} }
} }
func (program *Program[E]) GetSchema() database.SchemaSet {
return program.dbSchema
}
func (program *Program[E]) ParseArgs(args []string) int { func (program *Program[E]) ParseArgs(args []string) int {
if _, err := program.BootstrapParser.ParseArgs(args); err != nil { if _, err := program.BootstrapParser.ParseArgs(args); err != nil {
program.Logger.Err(err).Msg("could not parse command line") program.Logger.Err(err).Msg("could not parse command line")
......
package xbusemitter package emissionqueue
import ( import (
"encoding/json" "encoding/json"
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
"time" "time"
"orus.io/orus-io/go-orusapi/database" "orus.io/orus-io/go-orusapi/database"
"orus.io/orus-io/go-orusapi/xbus/emissionqueue/migration"
"xbus.io/go-xbus/v4/envelope" "xbus.io/go-xbus/v4/envelope"
) )
...@@ -11,7 +12,9 @@ ...@@ -11,7 +12,9 @@
"xbus.io/go-xbus/v4/envelope" "xbus.io/go-xbus/v4/envelope"
) )
const EmissionQueueTableName = "xbus_emission_queue" const TableName = "xbus_emission_queue"
var Schema = database.Schema{Name: "emissionqueue", Source: migration.Source}
type EmissionQueue interface { type EmissionQueue interface {
AddMessage(db *database.SQLHelper, msg Message) error AddMessage(db *database.SQLHelper, msg Message) error
...@@ -53,7 +56,7 @@ ...@@ -53,7 +56,7 @@
// AddMessage pushes a message in the queue. // AddMessage pushes a message in the queue.
func (eq *emissionQueue) AddMessage(db *database.SQLHelper, msg Message) error { func (eq *emissionQueue) AddMessage(db *database.SQLHelper, msg Message) error {
query := database.SQ.Insert(EmissionQueueTableName). query := database.SQ.Insert(TableName).
Columns("created", "related_to", "msgtype", "content", "chunks"). Columns("created", "related_to", "msgtype", "content", "chunks").
Values(time.Now().UTC(), msg.RelatedTo, msg.MsgType, msg.Content, &msg.Chunks) Values(time.Now().UTC(), msg.RelatedTo, msg.MsgType, msg.Content, &msg.Chunks)
......
package xbusemitter_test package emissionqueue_test
import ( import (
"context" "context"
...@@ -9,8 +9,8 @@ ...@@ -9,8 +9,8 @@
"orus.io/orus-io/go-orusapi/database" "orus.io/orus-io/go-orusapi/database"
"orus.io/orus-io/go-orusapi/testutils" "orus.io/orus-io/go-orusapi/testutils"
"orus.io/orus-io/go-orusapi/xbusemitter" "orus.io/orus-io/go-orusapi/xbus/emissionqueue"
"orus.io/orus-io/go-orusapi/xbusemitter/migration" "orus.io/orus-io/go-orusapi/xbus/emissionqueue/migration"
) )
func TestEmissionQueue(t *testing.T) { func TestEmissionQueue(t *testing.T) {
...@@ -18,9 +18,9 @@ ...@@ -18,9 +18,9 @@
db := database.GetTestDB(context.TODO(), t, migration.Source) db := database.GetTestDB(context.TODO(), t, migration.Source)
defer db.Close() defer db.Close()
queue := xbusemitter.NewEmissionQueue() queue := emissionqueue.NewEmissionQueue()
assert.NotNil(t, queue) assert.NotNil(t, queue)
sqlHelper := database.NewSQLHelper(context.Background(), db, log) sqlHelper := database.NewSQLHelper(context.Background(), db, log)
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
assert.NotNil(t, queue) assert.NotNil(t, queue)
sqlHelper := database.NewSQLHelper(context.Background(), db, log) sqlHelper := database.NewSQLHelper(context.Background(), db, log)
require.NoError(t, queue.AddMessage(&sqlHelper, xbusemitter.NewMessage("AddMessage", "test", []byte("abcde")))) require.NoError(t, queue.AddMessage(&sqlHelper, emissionqueue.NewMessage("AddMessage", "test", []byte("abcde"))))
require.NoError(t, queue.Add(&sqlHelper, "Add", "test", "fghij")) require.NoError(t, queue.Add(&sqlHelper, "Add", "test", "fghij"))
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
require.NoError(t, queue.Add(&sqlHelper, "Add", "test", "fghij")) require.NoError(t, queue.Add(&sqlHelper, "Add", "test", "fghij"))
var messages []xbusemitter.Message var messages []emissionqueue.Message
require.NoError(t, require.NoError(t,
database.Select(db, &messages, database.Select(db, &messages,
database.SQ. database.SQ.
Select("id", "related_to", "msgtype", "content"). Select("id", "related_to", "msgtype", "content").
...@@ -31,8 +31,8 @@ ...@@ -31,8 +31,8 @@
require.NoError(t, require.NoError(t,
database.Select(db, &messages, database.Select(db, &messages,
database.SQ. database.SQ.
Select("id", "related_to", "msgtype", "content"). Select("id", "related_to", "msgtype", "content").
From(xbusemitter.EmissionQueueTableName). From(emissionqueue.TableName).
OrderBy("id"), OrderBy("id"),
&log)) &log))
......
package xbusemitter package emissionqueue
import ( import (
"encoding/json" "encoding/json"
......
package xbusemitter package emitter
import ( import (
"context" "context"
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
"xbus.io/go-xbus/v4" "xbus.io/go-xbus/v4"
"xbus.io/go-xbus/v4/api" "xbus.io/go-xbus/v4/api"
"xbus.io/go-xbus/v4/envelope" "xbus.io/go-xbus/v4/envelope"
"orus.io/orus-io/go-orusapi/xbus/emissionqueue"
) )
var ErrIntOverflow = errors.New("integer overflow") var ErrIntOverflow = errors.New("integer overflow")
...@@ -128,10 +130,10 @@ ...@@ -128,10 +130,10 @@
return nil return nil
} }
func (e *Emitter) load() ([]Message, error) { func (e *Emitter) load() ([]emissionqueue.Message, error) {
if e.bufferSize < 0 { if e.bufferSize < 0 {
// no upper boundary since uint64 max overflows int // no upper boundary since uint64 max overflows int
return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize) return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize)
} }
query := database.SQ. query := database.SQ.
Select("id", "msgtype", "content", "chunks"). Select("id", "msgtype", "content", "chunks").
...@@ -132,10 +134,10 @@ ...@@ -132,10 +134,10 @@
if e.bufferSize < 0 { if e.bufferSize < 0 {
// no upper boundary since uint64 max overflows int // no upper boundary since uint64 max overflows int
return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize) return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize)
} }
query := database.SQ. query := database.SQ.
Select("id", "msgtype", "content", "chunks"). Select("id", "msgtype", "content", "chunks").
From(EmissionQueueTableName). From(emissionqueue.TableName).
Where(squirrel.Eq{"status": "pending"}). Where(squirrel.Eq{"status": "pending"}).
OrderBy("id ASC"). OrderBy("id ASC").
Limit(uint64(e.bufferSize)) Limit(uint64(e.bufferSize))
...@@ -148,7 +150,7 @@ ...@@ -148,7 +150,7 @@
sqh := database.NewSQLHelper(e.ctx, tx, e.log) sqh := database.NewSQLHelper(e.ctx, tx, e.log)
var messages []Message var messages []emissionqueue.Message
if err := sqh.Select(&messages, query); err != nil { if err := sqh.Select(&messages, query); err != nil {
return nil, err return nil, err
} }
...@@ -156,10 +158,10 @@ ...@@ -156,10 +158,10 @@
return messages, nil return messages, nil
} }
func (e *Emitter) listSent() ([]Message, error) { func (e *Emitter) listSent() ([]emissionqueue.Message, error) {
if e.bufferSize < 0 { if e.bufferSize < 0 {
// no upper boundary since uint64 max overflows int // no upper boundary since uint64 max overflows int
return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize) return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize)
} }
query := database.SQ. query := database.SQ.
Select("id", "envelope_id", "related_to", "msgtype", "content", "chunks"). Select("id", "envelope_id", "related_to", "msgtype", "content", "chunks").
...@@ -160,10 +162,10 @@ ...@@ -160,10 +162,10 @@
if e.bufferSize < 0 { if e.bufferSize < 0 {
// no upper boundary since uint64 max overflows int // no upper boundary since uint64 max overflows int
return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize) return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize)
} }
query := database.SQ. query := database.SQ.
Select("id", "envelope_id", "related_to", "msgtype", "content", "chunks"). Select("id", "envelope_id", "related_to", "msgtype", "content", "chunks").
From(EmissionQueueTableName). From(emissionqueue.TableName).
Where(squirrel.Eq{"status": []string{"sent", "running"}}). Where(squirrel.Eq{"status": []string{"sent", "running"}}).
OrderBy("id ASC"). OrderBy("id ASC").
Limit(uint64(e.bufferSize)) Limit(uint64(e.bufferSize))
...@@ -176,7 +178,7 @@ ...@@ -176,7 +178,7 @@
sqh := database.NewSQLHelper(e.ctx, tx, e.log) sqh := database.NewSQLHelper(e.ctx, tx, e.log)
var messages []Message var messages []emissionqueue.Message
if err := sqh.Select(&messages, query); err != nil { if err := sqh.Select(&messages, query); err != nil {
return nil, err return nil, err
} }
...@@ -213,7 +215,7 @@ ...@@ -213,7 +215,7 @@
values["log"] = strings.Join(logs, "\n") values["log"] = strings.Join(logs, "\n")
} }
query := database.SQ.Update(EmissionQueueTableName).Where( query := database.SQ.Update(emissionqueue.TableName).Where(
squirrel.Eq{"envelope_id": uuid.UUID(envelopeID)}, squirrel.Eq{"envelope_id": uuid.UUID(envelopeID)},
).SetMap(values). ).SetMap(values).
Suffix("RETURNING related_to") Suffix("RETURNING related_to")
...@@ -282,7 +284,7 @@ ...@@ -282,7 +284,7 @@
} }
} }
func (e *Emitter) attemptEmit(msg *Message) error { func (e *Emitter) attemptEmit(msg *emissionqueue.Message) error {
tx, err := database.Begin(e.ctx, e.db) tx, err := database.Begin(e.ctx, e.db)
if err != nil { if err != nil {
return err return err
...@@ -299,7 +301,7 @@ ...@@ -299,7 +301,7 @@
msg.EnvelopeID = id msg.EnvelopeID = id
query := squirrel.Update(EmissionQueueTableName). query := squirrel.Update(emissionqueue.TableName).
Where(squirrel.Eq{"id": msg.ID}). Where(squirrel.Eq{"id": msg.ID}).
Set("envelope_id", id) Set("envelope_id", id)
...@@ -330,7 +332,7 @@ ...@@ -330,7 +332,7 @@
return err return err
} }
query = squirrel.Update(EmissionQueueTableName). query = squirrel.Update(emissionqueue.TableName).
Where(squirrel.Eq{"id": msg.ID}). Where(squirrel.Eq{"id": msg.ID}).
Set("envelope_id", uuid.UUID(env.ID())). Set("envelope_id", uuid.UUID(env.ID())).
Set("status", "sent"). Set("status", "sent").
...@@ -343,7 +345,7 @@ ...@@ -343,7 +345,7 @@
return tx.Commit() return tx.Commit()
} }
func (e *Emitter) emit(msg *Message) error { func (e *Emitter) emit(msg *emissionqueue.Message) error {
for { for {
if err := e.attemptEmit(msg); err != nil { if err := e.attemptEmit(msg); err != nil {
e.log.Err(err).Msg("failed to emit to xbus (will reattempt later)") e.log.Err(err).Msg("failed to emit to xbus (will reattempt later)")
......
package xbusemitter package emitter
import ( import (
"context" "context"
...@@ -2,5 +2,7 @@ ...@@ -2,5 +2,7 @@
import ( import (
"context" "context"
"xbus.io/go-xbus/v4"
"orus.io/orus-io/go-orusapi/cmd" "orus.io/orus-io/go-orusapi/cmd"
...@@ -6,7 +8,6 @@ ...@@ -6,7 +8,6 @@
"orus.io/orus-io/go-orusapi/cmd" "orus.io/orus-io/go-orusapi/cmd"
"orus.io/orus-io/go-orusapi/xbusemitter/migration" "orus.io/orus-io/go-orusapi/xbus/emissionqueue/migration"
"xbus.io/go-xbus/v4"
) )
func WithXbusEmitter[E any]() cmd.Option[E] { func WithXbusEmitter[E any]() cmd.Option[E] {
return cmd.CombineOptions( return cmd.CombineOptions(
...@@ -9,8 +10,8 @@ ...@@ -9,8 +10,8 @@
) )
func WithXbusEmitter[E any]() cmd.Option[E] { func WithXbusEmitter[E any]() cmd.Option[E] {
return cmd.CombineOptions( return cmd.CombineOptions(
cmd.WithSchema[E]("xbusemitter", migration.Source), cmd.WithSchema[E]("emissionqueue", migration.Source),
cmd.WithXbusActors(func(program *cmd.Program[E]) []cmd.XbusActorFactory { cmd.WithXbusActors(func(program *cmd.Program[E]) []cmd.XbusActorFactory {
return []cmd.XbusActorFactory{{ return []cmd.XbusActorFactory{{
Name: program.Name + ".emitter", Name: program.Name + ".emitter",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment