-
Christophe de Vienne authoredChristophe de Vienne authored
fullenv.go 4.45 KiB
package fullenv
import (
"bufio"
"fmt"
"io"
"os"
"os/exec"
"strings"
"testing"
"time"
"github.com/rs/zerolog"
"xbus.io/go-xbus/v4"
"xbus.io/go-xbus/v4/api"
"xbus.io/go-xbus/v4/testutils"
)
// Fullenv uses the xbus-fullenv binary to coordinate tests
type Fullenv struct {
configPath string
targetDirPath string
logger zerolog.Logger
vars map[string]string
cmd *exec.Cmd
stdin io.WriteCloser
err error
cmdQuit chan struct{}
outputChan chan string
errorChan chan string
}
// NewFullenv instanciate a Fullenv
func NewFullenv(configPath string, logger zerolog.Logger) *Fullenv {
return &Fullenv{
configPath: configPath,
logger: logger,
vars: make(map[string]string),
outputChan: make(chan string),
errorChan: make(chan string),
}
}
// NewFullenvForTest returns a fullenv associated to the giveng test
func NewFullenvForTest(t testing.TB, name string, configPath string) *Fullenv {
logger := testutils.GetTestLogger(t)
fe := NewFullenv(configPath, logger.Logger)
return fe
}
// SetVar sets a substitution variable
func (fe *Fullenv) SetVar(key, value string) {
fe.vars[key] = value
}
// SetVars sets substitution variables
func (fe *Fullenv) SetVars(values map[string]string) {
for key, value := range values {
fe.vars[key] = value
}
}
func (fe *Fullenv) initCmd() error {
args := []string{
"run", "--no-prompt",
fe.configPath,
}
for key, value := range fe.vars {
args = append(args, "--var", key+"="+value)
}
cmd := exec.Command("xbus-fullenv", args...)
stdin, err := cmd.StdinPipe()
if err != nil {
return err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return err
}
fe.cmd = cmd
fe.stdin = stdin
go func() {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
s := scanner.Text()
if strings.HasPrefix(s, "< OK: ") {
fe.outputChan <- s[6:]
} else if strings.HasPrefix(s, "< ERR: ") {
fe.errorChan <- s[6:]
} else {
fe.logger.Debug().Msg(s)
}
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "reading stdout:", err)
}
}()
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
s := scanner.Text()
fe.logger.Info().Msg(s)
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "reading stderr:", err)
}
}()
return nil
}
func (fe *Fullenv) nextOutput() (string, error) {
select {
case s := <-fe.outputChan:
return s, nil
case e := <-fe.errorChan:
return "", fmt.Errorf(e)
case <-fe.cmdQuit:
if fe.err != nil {
return "", fe.err
}
return "", fmt.Errorf("fullenv is closed")
}
}
func (fe *Fullenv) sendCommand(cmd ...string) (string, error) {
_, err := fe.stdin.Write([]byte(strings.Join(cmd, " ") + "\n"))
if err != nil {
return "", err
}
return fe.nextOutput()
}
// Start runs 'xbus-fullenv run' and keep it opened for further interaction
func (fe *Fullenv) Start() error {
if err := fe.initCmd(); err != nil {
return err
}
fe.cmdQuit = make(chan struct{})
go func() {
fe.err = fe.cmd.Run()
close(fe.cmdQuit)
}()
output, err := fe.nextOutput()
if err != nil {
return err
}
fe.targetDirPath = output
return nil
}
// Stop stops the fullenv
func (fe *Fullenv) Stop() error {
_, err := fe.stdin.Write([]byte("quit\n"))
if err != nil {
fe.logger.Err(err).Send()
}
select {
case <-fe.cmdQuit:
return fe.Error()
case <-time.After(2 * time.Second):
if err := fe.cmd.Process.Kill(); err != nil {
fe.logger.Err(err).Msg("Error killing process")
}
return fmt.Errorf("Had to kill xbus-fullenv")
}
}
func (fe *Fullenv) Error() error {
return fe.err
}
// StartClients sends the 'startup' command to xbus-fullenv
func (fe *Fullenv) StartClients() error {
_, err := fe.sendCommand("startup")
return err
}
// ClientConfig returns the configuration path of a given client
func (fe *Fullenv) ClientConfig(name string) (string, error) {
return fe.sendCommand("client-config", name)
}
// LoadClientConfig load the xbus.COptions of a client
func (fe *Fullenv) LoadClientConfig(name string) (xbus.COptions, error) {
path, err := fe.ClientConfig(name)
if err != nil {
return xbus.COptions{}, err
}
return xbus.COptionsFromFile(path, api.Account_ACTOR, fe.logger)
}
// LoadClient loads a client instance
func (fe *Fullenv) LoadClient(name string) (*xbus.Client, error) {
options, err := fe.LoadClientConfig(name)
if err != nil {
return nil, err
}
client := xbus.NewClient(options, fe.logger, nil)
return client, nil
}