diff --git a/cmd/program.go b/cmd/program.go index f63320edc50f3c5389e1605854d2ab26baf5e74a_Y21kL3Byb2dyYW0uZ28=..a61599c262161399bea891146c54be991e2772cd_Y21kL3Byb2dyYW0uZ28= 100644 --- a/cmd/program.go +++ b/cmd/program.go @@ -304,6 +304,10 @@ } } +func (program *Program[E]) GetSchema() database.SchemaSet { + return program.dbSchema +} + func (program *Program[E]) ParseArgs(args []string) int { if _, err := program.BootstrapParser.ParseArgs(args); err != nil { program.Logger.Err(err).Msg("could not parse command line") diff --git a/xbusemitter/emission_queue.go b/xbus/emissionqueue/emission_queue.go similarity index 91% rename from xbusemitter/emission_queue.go rename to xbus/emissionqueue/emission_queue.go index f63320edc50f3c5389e1605854d2ab26baf5e74a_eGJ1c2VtaXR0ZXIvZW1pc3Npb25fcXVldWUuZ28=..a61599c262161399bea891146c54be991e2772cd_eGJ1cy9lbWlzc2lvbnF1ZXVlL2VtaXNzaW9uX3F1ZXVlLmdv 100644 --- a/xbusemitter/emission_queue.go +++ b/xbus/emissionqueue/emission_queue.go @@ -1,4 +1,4 @@ -package xbusemitter +package emissionqueue import ( "encoding/json" @@ -8,6 +8,7 @@ "time" "orus.io/orus-io/go-orusapi/database" + "orus.io/orus-io/go-orusapi/xbus/emissionqueue/migration" "xbus.io/go-xbus/v4/envelope" ) @@ -11,7 +12,9 @@ "xbus.io/go-xbus/v4/envelope" ) -const EmissionQueueTableName = "xbus_emission_queue" +const TableName = "xbus_emission_queue" + +var Schema = database.Schema{Name: "emissionqueue", Source: migration.Source} type EmissionQueue interface { AddMessage(db *database.SQLHelper, msg Message) error @@ -53,7 +56,7 @@ // AddMessage pushes a message in the queue. func (eq *emissionQueue) AddMessage(db *database.SQLHelper, msg Message) error { - query := database.SQ.Insert(EmissionQueueTableName). + query := database.SQ.Insert(TableName). Columns("created", "related_to", "msgtype", "content", "chunks"). Values(time.Now().UTC(), msg.RelatedTo, msg.MsgType, msg.Content, &msg.Chunks) diff --git a/xbusemitter/emission_queue_test.go b/xbus/emissionqueue/emission_queue_test.go similarity index 76% rename from xbusemitter/emission_queue_test.go rename to xbus/emissionqueue/emission_queue_test.go index f63320edc50f3c5389e1605854d2ab26baf5e74a_eGJ1c2VtaXR0ZXIvZW1pc3Npb25fcXVldWVfdGVzdC5nbw==..a61599c262161399bea891146c54be991e2772cd_eGJ1cy9lbWlzc2lvbnF1ZXVlL2VtaXNzaW9uX3F1ZXVlX3Rlc3QuZ28= 100644 --- a/xbusemitter/emission_queue_test.go +++ b/xbus/emissionqueue/emission_queue_test.go @@ -1,4 +1,4 @@ -package xbusemitter_test +package emissionqueue_test import ( "context" @@ -9,8 +9,8 @@ "orus.io/orus-io/go-orusapi/database" "orus.io/orus-io/go-orusapi/testutils" - "orus.io/orus-io/go-orusapi/xbusemitter" - "orus.io/orus-io/go-orusapi/xbusemitter/migration" + "orus.io/orus-io/go-orusapi/xbus/emissionqueue" + "orus.io/orus-io/go-orusapi/xbus/emissionqueue/migration" ) func TestEmissionQueue(t *testing.T) { @@ -18,9 +18,9 @@ db := database.GetTestDB(context.TODO(), t, migration.Source) defer db.Close() - queue := xbusemitter.NewEmissionQueue() + queue := emissionqueue.NewEmissionQueue() assert.NotNil(t, queue) sqlHelper := database.NewSQLHelper(context.Background(), db, log) @@ -22,8 +22,8 @@ assert.NotNil(t, queue) sqlHelper := database.NewSQLHelper(context.Background(), db, log) - require.NoError(t, queue.AddMessage(&sqlHelper, xbusemitter.NewMessage("AddMessage", "test", []byte("abcde")))) + require.NoError(t, queue.AddMessage(&sqlHelper, emissionqueue.NewMessage("AddMessage", "test", []byte("abcde")))) require.NoError(t, queue.Add(&sqlHelper, "Add", "test", "fghij")) @@ -28,7 +28,7 @@ require.NoError(t, queue.Add(&sqlHelper, "Add", "test", "fghij")) - var messages []xbusemitter.Message + var messages []emissionqueue.Message require.NoError(t, database.Select(db, &messages, database.SQ. Select("id", "related_to", "msgtype", "content"). @@ -31,8 +31,8 @@ require.NoError(t, database.Select(db, &messages, database.SQ. Select("id", "related_to", "msgtype", "content"). - From(xbusemitter.EmissionQueueTableName). + From(emissionqueue.TableName). OrderBy("id"), &log)) diff --git a/xbusemitter/message.go b/xbus/emissionqueue/message.go similarity index 98% rename from xbusemitter/message.go rename to xbus/emissionqueue/message.go index f63320edc50f3c5389e1605854d2ab26baf5e74a_eGJ1c2VtaXR0ZXIvbWVzc2FnZS5nbw==..a61599c262161399bea891146c54be991e2772cd_eGJ1cy9lbWlzc2lvbnF1ZXVlL21lc3NhZ2UuZ28= 100644 --- a/xbusemitter/message.go +++ b/xbus/emissionqueue/message.go @@ -1,4 +1,4 @@ -package xbusemitter +package emissionqueue import ( "encoding/json" diff --git a/xbusemitter/migration/0001_emissionqueue.up.sql b/xbus/emissionqueue/migration/0001_emissionqueue.up.sql similarity index 100% rename from xbusemitter/migration/0001_emissionqueue.up.sql rename to xbus/emissionqueue/migration/0001_emissionqueue.up.sql diff --git a/xbusemitter/migration/migration.go b/xbus/emissionqueue/migration/migration.go similarity index 100% rename from xbusemitter/migration/migration.go rename to xbus/emissionqueue/migration/migration.go diff --git a/xbusemitter/emitter.go b/xbus/emitter/emitter.go similarity index 94% rename from xbusemitter/emitter.go rename to xbus/emitter/emitter.go index f63320edc50f3c5389e1605854d2ab26baf5e74a_eGJ1c2VtaXR0ZXIvZW1pdHRlci5nbw==..a61599c262161399bea891146c54be991e2772cd_eGJ1cy9lbWl0dGVyL2VtaXR0ZXIuZ28= 100644 --- a/xbusemitter/emitter.go +++ b/xbus/emitter/emitter.go @@ -1,4 +1,4 @@ -package xbusemitter +package emitter import ( "context" @@ -20,6 +20,8 @@ "xbus.io/go-xbus/v4" "xbus.io/go-xbus/v4/api" "xbus.io/go-xbus/v4/envelope" + + "orus.io/orus-io/go-orusapi/xbus/emissionqueue" ) var ErrIntOverflow = errors.New("integer overflow") @@ -128,10 +130,10 @@ return nil } -func (e *Emitter) load() ([]Message, error) { +func (e *Emitter) load() ([]emissionqueue.Message, error) { if e.bufferSize < 0 { // no upper boundary since uint64 max overflows int return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize) } query := database.SQ. Select("id", "msgtype", "content", "chunks"). @@ -132,10 +134,10 @@ if e.bufferSize < 0 { // no upper boundary since uint64 max overflows int return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize) } query := database.SQ. Select("id", "msgtype", "content", "chunks"). - From(EmissionQueueTableName). + From(emissionqueue.TableName). Where(squirrel.Eq{"status": "pending"}). OrderBy("id ASC"). Limit(uint64(e.bufferSize)) @@ -148,7 +150,7 @@ sqh := database.NewSQLHelper(e.ctx, tx, e.log) - var messages []Message + var messages []emissionqueue.Message if err := sqh.Select(&messages, query); err != nil { return nil, err } @@ -156,10 +158,10 @@ return messages, nil } -func (e *Emitter) listSent() ([]Message, error) { +func (e *Emitter) listSent() ([]emissionqueue.Message, error) { if e.bufferSize < 0 { // no upper boundary since uint64 max overflows int return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize) } query := database.SQ. Select("id", "envelope_id", "related_to", "msgtype", "content", "chunks"). @@ -160,10 +162,10 @@ if e.bufferSize < 0 { // no upper boundary since uint64 max overflows int return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize) } query := database.SQ. Select("id", "envelope_id", "related_to", "msgtype", "content", "chunks"). - From(EmissionQueueTableName). + From(emissionqueue.TableName). Where(squirrel.Eq{"status": []string{"sent", "running"}}). OrderBy("id ASC"). Limit(uint64(e.bufferSize)) @@ -176,7 +178,7 @@ sqh := database.NewSQLHelper(e.ctx, tx, e.log) - var messages []Message + var messages []emissionqueue.Message if err := sqh.Select(&messages, query); err != nil { return nil, err } @@ -213,7 +215,7 @@ values["log"] = strings.Join(logs, "\n") } - query := database.SQ.Update(EmissionQueueTableName).Where( + query := database.SQ.Update(emissionqueue.TableName).Where( squirrel.Eq{"envelope_id": uuid.UUID(envelopeID)}, ).SetMap(values). Suffix("RETURNING related_to") @@ -282,7 +284,7 @@ } } -func (e *Emitter) attemptEmit(msg *Message) error { +func (e *Emitter) attemptEmit(msg *emissionqueue.Message) error { tx, err := database.Begin(e.ctx, e.db) if err != nil { return err @@ -299,7 +301,7 @@ msg.EnvelopeID = id - query := squirrel.Update(EmissionQueueTableName). + query := squirrel.Update(emissionqueue.TableName). Where(squirrel.Eq{"id": msg.ID}). Set("envelope_id", id) @@ -330,7 +332,7 @@ return err } - query = squirrel.Update(EmissionQueueTableName). + query = squirrel.Update(emissionqueue.TableName). Where(squirrel.Eq{"id": msg.ID}). Set("envelope_id", uuid.UUID(env.ID())). Set("status", "sent"). @@ -343,7 +345,7 @@ return tx.Commit() } -func (e *Emitter) emit(msg *Message) error { +func (e *Emitter) emit(msg *emissionqueue.Message) error { for { if err := e.attemptEmit(msg); err != nil { e.log.Err(err).Msg("failed to emit to xbus (will reattempt later)") diff --git a/xbusemitter/option.go b/xbus/emitter/option.go similarity index 78% rename from xbusemitter/option.go rename to xbus/emitter/option.go index f63320edc50f3c5389e1605854d2ab26baf5e74a_eGJ1c2VtaXR0ZXIvb3B0aW9uLmdv..a61599c262161399bea891146c54be991e2772cd_eGJ1cy9lbWl0dGVyL29wdGlvbi5nbw== 100644 --- a/xbusemitter/option.go +++ b/xbus/emitter/option.go @@ -1,5 +1,5 @@ -package xbusemitter +package emitter import ( "context" @@ -2,5 +2,7 @@ import ( "context" + "xbus.io/go-xbus/v4" + "orus.io/orus-io/go-orusapi/cmd" @@ -6,7 +8,6 @@ "orus.io/orus-io/go-orusapi/cmd" - "orus.io/orus-io/go-orusapi/xbusemitter/migration" - "xbus.io/go-xbus/v4" + "orus.io/orus-io/go-orusapi/xbus/emissionqueue/migration" ) func WithXbusEmitter[E any]() cmd.Option[E] { return cmd.CombineOptions( @@ -9,8 +10,8 @@ ) func WithXbusEmitter[E any]() cmd.Option[E] { return cmd.CombineOptions( - cmd.WithSchema[E]("xbusemitter", migration.Source), + cmd.WithSchema[E]("emissionqueue", migration.Source), cmd.WithXbusActors(func(program *cmd.Program[E]) []cmd.XbusActorFactory { return []cmd.XbusActorFactory{{ Name: program.Name + ".emitter",