package emissionqueue import ( "encoding/json" "errors" "fmt" "io" "time" "orus.io/orus-io/go-orusapi/database" "orus.io/orus-io/go-orusapi/xbus/emissionqueue/migration" "xbus.io/go-xbus/v4/envelope" ) const TableName = "xbus_emission_queue" var Schema = database.Schema{Name: "emissionqueue", Source: migration.Source} type EmissionQueue interface { AddMessage(db *database.SQLHelper, msg Message) error Add(db *database.SQLHelper, relatedTo string, msgType string, payload interface{}) error } func MustMessageFromXbusMessage(msg envelope.Message) Message { m, err := MessageFromXbusMessage(msg) if err != nil { panic(err) } return m } func MessageFromXbusMessage(msg envelope.Message) (Message, error) { var chunks [][]byte reader := msg.ChunkReader() for { b, err := reader.ReadBytes() if errors.Is(err, io.EOF) { break } chunks = append(chunks, b) } m := NewChunkedMessage(msg.MsgType(), chunks) return m, nil } // NewEmissionQueue creates a EmissionQueue. func NewEmissionQueue() EmissionQueue { return &emissionQueue{} } // emissionQueue queues messages for emission to xbus. type emissionQueue struct{} // AddMessage pushes a message in the queue. func (eq *emissionQueue) AddMessage(db *database.SQLHelper, msg Message) error { query := database.SQ.Insert(TableName). Columns("created", "related_to", "msgtype", "content", "chunks"). Values(time.Now().UTC(), msg.RelatedTo, msg.MsgType, msg.Content, &msg.Chunks) if _, err := db.Exec(query); err != nil { return fmt.Errorf("emissionqueue: cannot insert the message: %w", err) } return nil } // Add marshals the given payload and pushes a message in the queue. func (eq *emissionQueue) Add( db *database.SQLHelper, relatedTo string, msgType string, payload interface{}, ) error { if eq == nil { return nil } if bArr, ok := payload.([][]byte); ok { msg := NewChunkedMessage(msgType, bArr) return eq.AddMessage(db, msg) } var ( b []byte err error ) b, err = json.Marshal(payload) if err != nil { return fmt.Errorf( "xbus.EmissionQueue: failed to marshal. err was: %w. payload is %#v", err, payload) } return eq.AddMessage(db, NewMessage(relatedTo, msgType, b)) }