Skip to content
Snippets Groups Projects
fullenv.go 4.45 KiB
package fullenv

import (
	"bufio"
	"fmt"
	"io"
	"os"
	"os/exec"
	"strings"
	"testing"
	"time"

	"github.com/rs/zerolog"

	"xbus.io/go-xbus/v4"
	"xbus.io/go-xbus/v4/api"
	"xbus.io/go-xbus/v4/testutils"
)

// Fullenv uses the xbus-fullenv binary to coordinate tests
type Fullenv struct {
	configPath    string
	targetDirPath string

	logger zerolog.Logger

	vars map[string]string

	cmd   *exec.Cmd
	stdin io.WriteCloser
	err   error

	cmdQuit chan struct{}

	outputChan chan string
	errorChan  chan string
}

// NewFullenv instanciate a Fullenv
func NewFullenv(configPath string, logger zerolog.Logger) *Fullenv {
	return &Fullenv{
		configPath: configPath,
		logger:     logger,
		vars:       make(map[string]string),
		outputChan: make(chan string),
		errorChan:  make(chan string),
	}
}

// NewFullenvForTest returns a fullenv associated to the giveng test
func NewFullenvForTest(t testing.TB, name string, configPath string) *Fullenv {
	logger := testutils.GetTestLogger(t)
	fe := NewFullenv(configPath, logger.Logger)
	return fe
}

// SetVar sets a substitution variable
func (fe *Fullenv) SetVar(key, value string) {
	fe.vars[key] = value
}

// SetVars sets substitution variables
func (fe *Fullenv) SetVars(values map[string]string) {
	for key, value := range values {
		fe.vars[key] = value
	}
}

func (fe *Fullenv) initCmd() error {
	args := []string{
		"run", "--no-prompt",
		fe.configPath,
	}
	for key, value := range fe.vars {
		args = append(args, "--var", key+"="+value)
	}
	cmd := exec.Command("xbus-fullenv", args...)
	stdin, err := cmd.StdinPipe()
	if err != nil {
		return err
	}
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		return err
	}
	stderr, err := cmd.StderrPipe()
	if err != nil {
		return err
	}
	fe.cmd = cmd
	fe.stdin = stdin

	go func() {
		scanner := bufio.NewScanner(stdout)
		for scanner.Scan() {
			s := scanner.Text()
			if strings.HasPrefix(s, "< OK: ") {
				fe.outputChan <- s[6:]
			} else if strings.HasPrefix(s, "< ERR: ") {
				fe.errorChan <- s[6:]
			} else {
				fe.logger.Debug().Msg(s)
			}
		}
		if err := scanner.Err(); err != nil {
			fmt.Fprintln(os.Stderr, "reading stdout:", err)
		}
	}()
	go func() {
		scanner := bufio.NewScanner(stderr)
		for scanner.Scan() {
			s := scanner.Text()
			fe.logger.Info().Msg(s)
		}
		if err := scanner.Err(); err != nil {
			fmt.Fprintln(os.Stderr, "reading stderr:", err)
		}
	}()
	return nil
}

func (fe *Fullenv) nextOutput() (string, error) {
	select {
	case s := <-fe.outputChan:
		return s, nil
	case e := <-fe.errorChan:
		return "", fmt.Errorf(e)
	case <-fe.cmdQuit:
		if fe.err != nil {
			return "", fe.err
		}
		return "", fmt.Errorf("fullenv is closed")
	}
}

func (fe *Fullenv) sendCommand(cmd ...string) (string, error) {
	_, err := fe.stdin.Write([]byte(strings.Join(cmd, " ") + "\n"))
	if err != nil {
		return "", err
	}
	return fe.nextOutput()
}

// Start runs 'xbus-fullenv run' and keep it opened for further interaction
func (fe *Fullenv) Start() error {
	if err := fe.initCmd(); err != nil {
		return err
	}

	fe.cmdQuit = make(chan struct{})

	go func() {
		fe.err = fe.cmd.Run()
		close(fe.cmdQuit)
	}()

	output, err := fe.nextOutput()
	if err != nil {
		return err
	}

	fe.targetDirPath = output

	return nil
}

// Stop stops the fullenv
func (fe *Fullenv) Stop() error {
	_, err := fe.stdin.Write([]byte("quit\n"))
	if err != nil {
		fe.logger.Err(err).Send()
	}
	select {
	case <-fe.cmdQuit:
		return fe.Error()
	case <-time.After(2 * time.Second):
		if err := fe.cmd.Process.Kill(); err != nil {
			fe.logger.Err(err).Msg("Error killing process")
		}
		return fmt.Errorf("Had to kill xbus-fullenv")
	}
}

func (fe *Fullenv) Error() error {
	return fe.err
}

// StartClients sends the 'startup' command to xbus-fullenv
func (fe *Fullenv) StartClients() error {
	_, err := fe.sendCommand("startup")
	return err
}

// ClientConfig returns the configuration path of a given client
func (fe *Fullenv) ClientConfig(name string) (string, error) {
	return fe.sendCommand("client-config", name)
}

// LoadClientConfig load the xbus.COptions of a client
func (fe *Fullenv) LoadClientConfig(name string) (xbus.COptions, error) {
	path, err := fe.ClientConfig(name)
	if err != nil {
		return xbus.COptions{}, err
	}

	return xbus.COptionsFromFile(path, api.Account_ACTOR, fe.logger)
}

// LoadClient loads a client instance
func (fe *Fullenv) LoadClient(name string) (*xbus.Client, error) {
	options, err := fe.LoadClientConfig(name)
	if err != nil {
		return nil, err
	}

	client := xbus.NewClient(options, fe.logger, nil)
	return client, nil
}