Skip to content
Snippets Groups Projects
emission_queue.go 2.16 KiB
Newer Older
package emissionqueue

import (
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"time"

	"orus.io/orus-io/go-orusapi/database"
	"orus.io/orus-io/go-orusapi/xbus/emissionqueue/migration"
	"xbus.io/go-xbus/v4/envelope"
)

const TableName = "xbus_emission_queue"

var Schema = database.Schema{Name: "emissionqueue", Source: migration.Source}

type EmissionQueue interface {
	AddMessage(db *database.SQLHelper, msg Message) error
	Add(db *database.SQLHelper, relatedTo string, msgType string, payload interface{}) error
}

func MustMessageFromXbusMessage(msg envelope.Message) Message {
	m, err := MessageFromXbusMessage(msg)
	if err != nil {
		panic(err)
	}

	return m
}

func MessageFromXbusMessage(msg envelope.Message) (Message, error) {
	var chunks [][]byte
	reader := msg.ChunkReader()
	for {
		b, err := reader.ReadBytes()
		if errors.Is(err, io.EOF) {
			break
		}
		chunks = append(chunks, b)
	}

	m := NewChunkedMessage(msg.MsgType(), chunks)

	return m, nil
}

// NewEmissionQueue creates a EmissionQueue.
func NewEmissionQueue() EmissionQueue {
	return &emissionQueue{}
}

// emissionQueue queues messages for emission to xbus.
type emissionQueue struct{}

// AddMessage pushes a message in the queue.
func (eq *emissionQueue) AddMessage(db *database.SQLHelper, msg Message) error {
	query := database.SQ.Insert(TableName).
		Columns("created", "related_to", "msgtype", "content", "chunks").
		Values(time.Now().UTC(), msg.RelatedTo, msg.MsgType, msg.Content, &msg.Chunks)

	if _, err := db.Exec(query); err != nil {
		return fmt.Errorf("emissionqueue: cannot insert the message: %w", err)
	}

	return nil
}

// Add marshals the given payload and pushes a message in the queue.
func (eq *emissionQueue) Add(
	db *database.SQLHelper, relatedTo string, msgType string, payload interface{},
) error {
	if eq == nil {
		return nil
	}
	if bArr, ok := payload.([][]byte); ok {
		msg := NewChunkedMessage(msgType, bArr)

		return eq.AddMessage(db, msg)
	}
	var (
		b   []byte
		err error
	)
	b, err = json.Marshal(payload)
	if err != nil {
		return fmt.Errorf(
			"xbus.EmissionQueue: failed to marshal. err was: %w. payload is %#v", err, payload)
	}

	return eq.AddMessage(db, NewMessage(relatedTo, msgType, b))
}