diff --git a/xbusemitter/emission_queue.go b/xbusemitter/emission_queue.go new file mode 100644 index 0000000000000000000000000000000000000000..2f6044b42451d1e6786f7f847043cf99b02f260d_eGJ1c2VtaXR0ZXIvZW1pc3Npb25fcXVldWUuZ28= --- /dev/null +++ b/xbusemitter/emission_queue.go @@ -0,0 +1,90 @@ +package xbusemitter + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "time" + + "orus.io/orus-io/go-orusapi/database" + "xbus.io/go-xbus/v4/envelope" +) + +const EmissionQueueTableName = "xbus_emission_queue" + +type EmissionQueue interface { + AddMessage(db *database.SQLHelper, msg Message) error + Add(db *database.SQLHelper, relatedTo string, msgType string, payload interface{}) error +} + +func MustMessageFromXbusMessage(msg envelope.Message) Message { + m, err := MessageFromXbusMessage(msg) + if err != nil { + panic(err) + } + + return m +} + +func MessageFromXbusMessage(msg envelope.Message) (Message, error) { + var chunks [][]byte + reader := msg.ChunkReader() + for { + b, err := reader.ReadBytes() + if errors.Is(err, io.EOF) { + break + } + chunks = append(chunks, b) + } + + m := NewChunkedMessage(msg.MsgType(), chunks) + + return m, nil +} + +// NewEmissionQueue creates a EmissionQueue. +func NewEmissionQueue() EmissionQueue { + return &emissionQueue{} +} + +// emissionQueue queues messages for emission to xbus. +type emissionQueue struct{} + +// AddMessage pushes a message in the queue. +func (eq *emissionQueue) AddMessage(db *database.SQLHelper, msg Message) error { + query := database.SQ.Insert(EmissionQueueTableName). + Columns("created", "related_to", "msgtype", "content", "chunks"). + Values(time.Now().UTC(), msg.RelatedTo, msg.MsgType, msg.Content, &msg.Chunks) + + if _, err := db.Exec(query); err != nil { + return fmt.Errorf("emissionqueue: cannot insert the message: %w", err) + } + + return nil +} + +// Add marshals the given payload and pushes a message in the queue. +func (eq *emissionQueue) Add( + db *database.SQLHelper, relatedTo string, msgType string, payload interface{}, +) error { + if eq == nil { + return nil + } + if bArr, ok := payload.([][]byte); ok { + msg := NewChunkedMessage(msgType, bArr) + + return eq.AddMessage(db, msg) + } + var ( + b []byte + err error + ) + b, err = json.Marshal(payload) + if err != nil { + return fmt.Errorf( + "xbus.EmissionQueue: failed to marshal. err was: %w. payload is %#v", err, payload) + } + + return eq.AddMessage(db, NewMessage(relatedTo, msgType, b)) +} diff --git a/xbusemitter/emission_queue_test.go b/xbusemitter/emission_queue_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2f6044b42451d1e6786f7f847043cf99b02f260d_eGJ1c2VtaXR0ZXIvZW1pc3Npb25fcXVldWVfdGVzdC5nbw== --- /dev/null +++ b/xbusemitter/emission_queue_test.go @@ -0,0 +1,51 @@ +package xbusemitter_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "orus.io/orus-io/go-orusapi/database" + "orus.io/orus-io/go-orusapi/testutils" + + "orus.io/orus-io/go-orusapi/xbusemitter" + "orus.io/orus-io/go-orusapi/xbusemitter/migration" +) + +func TestEmissionQueue(t *testing.T) { + log := testutils.GetLogger(t) + db := database.GetTestDB(context.TODO(), t, migration.Source) + defer db.Close() + + queue := xbusemitter.NewEmissionQueue() + + assert.NotNil(t, queue) + + sqlHelper := database.NewSQLHelper(context.Background(), db, log) + + require.NoError(t, queue.AddMessage(&sqlHelper, xbusemitter.NewMessage("AddMessage", "test", []byte("abcde")))) + require.NoError(t, queue.Add(&sqlHelper, "Add", "test", "fghij")) + + var messages []xbusemitter.Message + require.NoError(t, + database.Select(db, &messages, + database.SQ. + Select("id", "related_to", "msgtype", "content"). + From(xbusemitter.EmissionQueueTableName). + OrderBy("id"), + &log)) + + var relatedTo []string + require.Len(t, messages, 2) + if assert.NoError(t, messages[0].RelatedTo.AssignTo(&relatedTo)) { + assert.Equal(t, []string{"AddMessage"}, relatedTo) + } + assert.Equal(t, "test", messages[0].MsgType) + assert.Equal(t, "abcde", string(messages[0].Content)) + if assert.NoError(t, messages[1].RelatedTo.AssignTo(&relatedTo)) { + assert.Equal(t, []string{"Add"}, relatedTo) + } + assert.Equal(t, "test", messages[1].MsgType) + assert.Equal(t, `"fghij"`, string(messages[1].Content)) +} diff --git a/xbusemitter/emitter.go b/xbusemitter/emitter.go new file mode 100644 index 0000000000000000000000000000000000000000..2f6044b42451d1e6786f7f847043cf99b02f260d_eGJ1c2VtaXR0ZXIvZW1pdHRlci5nbw== --- /dev/null +++ b/xbusemitter/emitter.go @@ -0,0 +1,441 @@ +package xbusemitter + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/Masterminds/squirrel" + "github.com/jackc/pgtype" + "github.com/jmoiron/sqlx" + "github.com/m4rw3r/uuid" + "github.com/nats-io/nats.go" + nrpc "github.com/nats-rpc/nrpc" + "github.com/prometheus/client_golang/prometheus" + "github.com/rs/zerolog" + "orus.io/orus-io/go-orusapi/database" + "xbus.io/go-xbus/v4" + "xbus.io/go-xbus/v4/api" + "xbus.io/go-xbus/v4/envelope" +) + +var ErrIntOverflow = errors.New("integer overflow") + +type EmissionStatus string + +const ( + EmissionPending EmissionStatus = "pending" + EmissionSent EmissionStatus = "sent" + EmissionRunning EmissionStatus = "running" + EmissionDone EmissionStatus = "done" + EmissionError EmissionStatus = "error" +) + +// NewEmitter instanciates a Emitter. +func NewEmitter( + ctx context.Context, + db *sqlx.DB, + actor *xbus.Actor, + settings xbus.Settings, + log zerolog.Logger, +) *Emitter { + return &Emitter{ + actor: actor, + db: db, + ctx: ctx, + + bufferSize: settings.MustIntD("buffer-size", 1000), + delay: settings.MustIntD("wait-delay", 10), + + log: log, + } +} + +// Emitter consumes the emission queue and sends the messages to xbus. +type Emitter struct { + actor *xbus.Actor + db *sqlx.DB + + ctx context.Context + runCtx context.Context + runStop func() + wg sync.WaitGroup + + bufferSize int + delay int + + log zerolog.Logger +} + +// Startup starts taking messages from the queue and emit them. +func (e *Emitter) Startup() error { + e.runCtx, e.runStop = context.WithCancel(e.ctx) + + // Start listening to process change of state that concerns us + stateChan, sub, err := envelopeStatesSubscribeChan( + e.actor.Client.API.ProcessState, + e.actor.Client.GetConn(), + e.actor.ID.String(), "*", + e.log, + ) + if err != nil { + return err + } + + sentMessages, err := e.listSent() + if err != nil { + if err := sub.Unsubscribe(); err != nil { + e.log.Err(err).Msg("error unsubscribing") + } + + return err + } + for _, msg := range sentMessages { + processState, err := e.actor.Client.API.ProcessState.GetEnvelopeState( + e.actor.ID.String(), &api.GetEnvelopeStateRequest{ID: api.UUID(msg.EnvelopeID).ToBytes()}, + ) + if err != nil { + e.log.Err(err).Msg("cannot load a sent envelope process state. Skip to the next one") + + continue + } + if err := e.setEmissionStatus(api.UUID(msg.EnvelopeID), processState); err != nil { + if err := sub.Unsubscribe(); err != nil { + e.log.Err(err).Msg("error unsubscribing") + } + + return err + } + } + + e.wg.Add(1) + go e.observeProcessStates(stateChan, sub) + + e.wg.Add(1) + go e.run() + + return nil +} + +// Shutdown stops the routine from emitting more messages. +func (e *Emitter) Shutdown() error { + e.runStop() + e.wg.Wait() + + return nil +} + +func (e *Emitter) load() ([]Message, error) { + if e.bufferSize < 0 { + // no upper boundary since uint64 max overflows int + return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize) + } + query := database.SQ. + Select("id", "msgtype", "content", "chunks"). + From(EmissionQueueTableName). + Where(squirrel.Eq{"status": "pending"}). + OrderBy("id ASC"). + Limit(uint64(e.bufferSize)) + + tx, err := database.Begin(e.ctx, e.db) + if err != nil { + return nil, err + } + defer tx.RollbackIfOpened(e.log) + + sqh := database.NewSQLHelper(e.ctx, tx, e.log) + + var messages []Message + if err := sqh.Select(&messages, query); err != nil { + return nil, err + } + + return messages, nil +} + +func (e *Emitter) listSent() ([]Message, error) { + if e.bufferSize < 0 { + // no upper boundary since uint64 max overflows int + return nil, fmt.Errorf("%w: Quantity %d", ErrIntOverflow, e.bufferSize) + } + query := database.SQ. + Select("id", "envelope_id", "related_to", "msgtype", "content", "chunks"). + From(EmissionQueueTableName). + Where(squirrel.Eq{"status": []string{"sent", "running"}}). + OrderBy("id ASC"). + Limit(uint64(e.bufferSize)) + + tx, err := database.Begin(e.ctx, e.db) + if err != nil { + return nil, err + } + defer tx.RollbackIfOpened(e.log) + + sqh := database.NewSQLHelper(e.ctx, tx, e.log) + + var messages []Message + if err := sqh.Select(&messages, query); err != nil { + return nil, err + } + + return messages, nil +} + +func (e *Emitter) setEmissionStatus(envelopeID api.UUID, state *api.EmitterEnvelopeState) error { + var status EmissionStatus + + switch state.GetStatus() { + case api.Process_NOSTATUS, api.Process_INITIAL: + case api.Process_RUNNING, api.Process_PAUSED: + status = EmissionRunning + case api.Process_DONE: + status = EmissionDone + emissionCounter.WithLabelValues(string(EmissionDone)).Add(1) + case api.Process_ERROR: + status = EmissionError + emissionCounter.WithLabelValues(string(EmissionError)).Add(1) + } + values := map[string]interface{}{ + "status": status, + "process_id": uuid.UUID(state.GetProcessID()), + } + if status != "" { + values["date_"+string(status)] = time.Now().UTC() + } + logs := make([]string, 0, len(state.GetErrors())) + for _, log := range state.GetErrors() { + logs = append(logs, log.GetText()) + } + if len(logs) != 0 { + values["log"] = strings.Join(logs, "\n") + } + + query := database.SQ.Update(EmissionQueueTableName).Where( + squirrel.Eq{"envelope_id": uuid.UUID(envelopeID)}, + ).SetMap(values). + Suffix("RETURNING related_to") + + tx, err := database.Begin(e.ctx, e.db) + if err != nil { + return err + } + defer tx.RollbackIfOpened(e.log) + sqh := database.NewSQLHelper(e.ctx, tx, e.log) + + var relatedTo pgtype.TextArray + + if err := sqh.Get(&relatedTo, query); err != nil { + return err + } + + if len(relatedTo.Elements) != 0 && status == EmissionError { + errlogs := zerolog.Arr() + for _, log := range state.GetErrors() { + errlogs = errlogs.Str(log.GetText()) + } + relatedToArr := zerolog.Arr() + for _, element := range relatedTo.Elements { + relatedToArr = relatedToArr.Str(element.String) + } + e.log.Error(). + Array("related_to", relatedToArr). + Str("process_id", state.GetProcessIDAsUUID().String()). + Str("envelope_id", envelopeID.String()). + Array("logs", errlogs). + Msg("xbus error while processing envelope") + } + + return tx.Commit() +} + +func (e *Emitter) observeProcessStates( + stateChan <-chan *EnvelopeStateMsg, sub *nats.Subscription, +) { + defer e.wg.Done() + defer func() { + if err := sub.Unsubscribe(); err != nil { + e.log.Err(err).Msg("error unsubscribing") + } + }() + for { + select { + case <-e.runCtx.Done(): + return + case state := <-stateChan: + envelopeID, err := api.UUIDFromString(state.EnvelopeID) + if err != nil { + e.log.Err(err). + Str("envelope_id", state.EnvelopeID). + Msg("received invalid envelope_id") + } + if err := e.setEmissionStatus( + envelopeID, state.State, + ); err != nil { + e.log.Err(err). + Str("envelope_id", state.EnvelopeID). + Msg("error updating process status") + } + } + } +} + +func (e *Emitter) attemptEmit(msg *Message) error { + tx, err := database.Begin(e.ctx, e.db) + if err != nil { + return err + } + defer tx.RollbackIfOpened(e.log) + db := database.NewSQLHelper(e.ctx, tx, e.log) + + // we need to save the envelope id before emitting because the 'running' + // state is very fast to update. + id, err := uuid.V4() + if err != nil { + return err + } + + msg.EnvelopeID = id + + query := squirrel.Update(EmissionQueueTableName). + Where(squirrel.Eq{"id": msg.ID}). + Set("envelope_id", id) + + if _, err := db.Exec(query); err != nil { + return err + } + + var env envelope.Envelope + if msg.Chunks.Status != pgtype.Null { + m, w := envelope.MustNewMessageChunkWriter(msg.MsgType) + env = envelope.MustNewEnvelope(m) + for _, item := range msg.Chunks.Elements { + if err := w.Write(item.Bytes); err != nil { + return err + } + } + if err := w.Close(); err != nil { + return err + } + } else { + env = envelope.NewEnvelopeWithID( + api.UUID(msg.EnvelopeID), + envelope.MustNewBytesMessage(msg.MsgType, msg.Content), + ) + } + + if _, err := e.actor.Emit(context.Background(), env); err != nil { + return err + } + + query = squirrel.Update(EmissionQueueTableName). + Where(squirrel.Eq{"id": msg.ID}). + Set("envelope_id", uuid.UUID(env.ID())). + Set("status", "sent"). + Set("date_sent", time.Now().UTC()) + + if _, err := db.Exec(query); err != nil { + return err + } + + return tx.Commit() +} + +func (e *Emitter) emit(msg *Message) error { + for { + if err := e.attemptEmit(msg); err != nil { + e.log.Err(err).Msg("failed to emit to xbus (will reattempt later)") + } else { + return nil + } + select { + case <-time.After(time.Duration(e.delay) * time.Second): + case <-e.runCtx.Done(): + return e.runCtx.Err() + } + } +} + +func (e *Emitter) run() { + defer e.wg.Done() + var wait bool + for { + if wait { + select { + case <-time.After(time.Duration(e.delay) * time.Second): + case <-e.runCtx.Done(): + return + } + } + + buffer, err := e.load() + if err != nil { + e.log.Err(err).Msg("failed to load messages from the queue") + wait = true + + continue + } + + if len(buffer) == 0 { + wait = true + + continue + } + + for i := range buffer { + message := buffer[i] + + if err := e.emit(&message); err != nil { + // an error on emit() is always blocking and is already logged + return + } + } + } +} + +// EnvelopeStateMsg ... +type EnvelopeStateMsg struct { + EmitterID string + EnvelopeID string + State *api.EmitterEnvelopeState +} + +func envelopeStatesSubscribeChan( + client *api.ProcessStateClient, nc *nats.Conn, emitterID, envelopeID string, + log zerolog.Logger, +) (<-chan *EnvelopeStateMsg, *nats.Subscription, error) { + ch := make(chan *EnvelopeStateMsg) + subject := client.EnvelopeStatesSubject(emitterID, envelopeID) + sub, err := nc.Subscribe(subject, func(msg *nats.Msg) { + splitted := strings.Split(msg.Subject, ".") + stateMsg := EnvelopeStateMsg{ + EmitterID: splitted[4], + EnvelopeID: splitted[5], + State: &api.EmitterEnvelopeState{}, + } + err := nrpc.Unmarshal(client.Encoding, msg.Data, stateMsg.State) + if err != nil { + log.Err(err).Msg("ProcessStateClient.EnvelopeStatesSubscribe: Error decoding") + + return + } + ch <- &stateMsg + }) + if err != nil { + return nil, nil, err + } + + return ch, sub, nil +} + +var emissionCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "xbus_emission_count", + Help: "Number of xbus emission so far", + }, []string{"status"}) + +func init() { + prometheus.MustRegister(emissionCounter) +} diff --git a/xbusemitter/message.go b/xbusemitter/message.go new file mode 100644 index 0000000000000000000000000000000000000000..2f6044b42451d1e6786f7f847043cf99b02f260d_eGJ1c2VtaXR0ZXIvbWVzc2FnZS5nbw== --- /dev/null +++ b/xbusemitter/message.go @@ -0,0 +1,68 @@ +package xbusemitter + +import ( + "encoding/json" + + "github.com/jackc/pgtype" + "github.com/m4rw3r/uuid" +) + +// Message ... +type Message struct { + ID uint `db:"id"` + EnvelopeID uuid.UUID `db:"envelope_id"` + ProcessID uuid.UUID `db:"process_id"` + RelatedTo pgtype.TextArray `db:"related_to"` + MsgType string `db:"msgtype"` + Content []byte `db:"content"` + Chunks pgtype.ByteaArray `db:"chunks"` +} + +// NewMessage creates a message. +func NewMessage(relatedTo string, msgType string, content []byte) Message { + m := Message{ + MsgType: msgType, + Content: content, + } + if err := m.RelatedTo.Set([]string{relatedTo}); err != nil { + panic(err) + } + if err := m.Chunks.Set(nil); err != nil { + panic(err) + } + + return m +} + +func NewChunkedMessage(msgType string, chunks [][]byte) Message { + m := Message{ + MsgType: msgType, + } + if err := m.RelatedTo.Set([]string{}); err != nil { + panic(err) + } + if err := m.Chunks.Set(chunks); err != nil { + panic(err) + } + + return m +} + +func NewMessageJSON(relatedTo string, msgType string, payload interface{}) (*Message, error) { + b, err := json.Marshal(payload) + if err != nil { + return nil, err + } + m := NewMessage(relatedTo, msgType, b) + + return &m, nil +} + +func MustNewMessageJSON(relatedTo string, msgType string, payload interface{}) Message { + m, err := NewMessageJSON(relatedTo, msgType, payload) + if err != nil { + panic(err) + } + + return *m +} diff --git a/xbusemitter/migration/0001_emissionqueue.up.sql b/xbusemitter/migration/0001_emissionqueue.up.sql new file mode 100644 index 0000000000000000000000000000000000000000..2f6044b42451d1e6786f7f847043cf99b02f260d_eGJ1c2VtaXR0ZXIvbWlncmF0aW9uLzAwMDFfZW1pc3Npb25xdWV1ZS51cC5zcWw= --- /dev/null +++ b/xbusemitter/migration/0001_emissionqueue.up.sql @@ -0,0 +1,30 @@ +CREATE TYPE xbus_emission_status AS ENUM ( + 'pending', + 'sent', + 'running', + 'done', + 'error' +); + +CREATE TABLE xbus_emission_queue ( + id SERIAL PRIMARY KEY, + created TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW(), + msgtype TEXT NOT NULL, + content bytea NULL, + chunks bytea[] NULL, + related_to character varying NOT NULL DEFAULT '{}', + envelope_id uuid NULL, + process_id uuid NULL, + status xbus_emission_status DEFAULT 'pending'::xbus_emission_status NOT NULL, + log text DEFAULT ''::text NOT NULL, + date_sent TIMESTAMP WITHOUT TIME ZONE NULL, + date_running TIMESTAMP WITHOUT TIME ZONE NULL, + date_done TIMESTAMP WITHOUT TIME ZONE NULL, + date_error TIMESTAMP WITHOUT TIME ZONE NULL +); + +CREATE UNIQUE INDEX xbus_emission_queue_envelope_id_idx + ON xbus_emission_queue (envelope_id); + +CREATE INDEX xbus_emission_queue_status_idx + ON xbus_emission_queue (status); diff --git a/xbusemitter/migration/migration.go b/xbusemitter/migration/migration.go new file mode 100644 index 0000000000000000000000000000000000000000..2f6044b42451d1e6786f7f847043cf99b02f260d_eGJ1c2VtaXR0ZXIvbWlncmF0aW9uL21pZ3JhdGlvbi5nbw== --- /dev/null +++ b/xbusemitter/migration/migration.go @@ -0,0 +1,22 @@ +package migration + +import ( + "embed" + + "github.com/golang-migrate/migrate/v4/source" + "github.com/golang-migrate/migrate/v4/source/iofs" +) + +//go:embed *.sql +var migrationScripts embed.FS + +func initSource() source.Driver { + src, err := iofs.New(migrationScripts, ".") + if err != nil { + panic(err) + } + + return src +} + +var Source = initSource() diff --git a/xbusemitter/option.go b/xbusemitter/option.go new file mode 100644 index 0000000000000000000000000000000000000000..2f6044b42451d1e6786f7f847043cf99b02f260d_eGJ1c2VtaXR0ZXIvb3B0aW9uLmdv --- /dev/null +++ b/xbusemitter/option.go @@ -0,0 +1,27 @@ +package xbusemitter + +import ( + "context" + + "orus.io/orus-io/go-orusapi/cmd" + "orus.io/orus-io/go-orusapi/xbusemitter/migration" + "xbus.io/go-xbus/v4" +) + +func WithXbusEmitter[E any]() cmd.Option[E] { + return cmd.CombineOptions( + cmd.WithSchema[E]("xbusemitter", migration.Source), + cmd.WithXbusActors(func(program *cmd.Program[E]) []cmd.XbusActorFactory { + return []cmd.XbusActorFactory{{ + Name: program.Name + ".emitter", + Factory: xbus.NewActorServiceFunc(func( + actor *xbus.Actor, settings xbus.Settings, + ) (xbus.ActorService, error) { + return NewEmitter( + context.TODO(), program.DB, actor, settings, program.Logger, + ), nil + }), + }} + }), + ) +}