Newer
Older
package fullenv
import (
"bufio"
"fmt"
"io"
"os"
"os/exec"
"strings"
"time"
"xbus.io/go-xbus/v4"
"xbus.io/go-xbus/v4/api"
"xbus.io/go-xbus/v4/testutils"
)
// Fullenv uses the xbus-fullenv binary to coordinate tests
type Fullenv struct {
configPath string
targetDirPath string
cmd *exec.Cmd
stdin io.WriteCloser
err error
cmdQuit chan struct{}
outputChan chan string
errorChan chan string
}
// NewFullenv instanciate a Fullenv
func NewFullenv(configPath string, logger zerolog.Logger) *Fullenv {
return &Fullenv{
configPath: configPath,
logger: logger,
vars: make(map[string]string),
outputChan: make(chan string),
errorChan: make(chan string),
}
}
// NewFullenvForTest returns a fullenv associated to the giveng test
func NewFullenvForTest(t testing.TB, name string, configPath string) *Fullenv {
// SetVar sets a substitution variable
func (fe *Fullenv) SetVar(key, value string) {
fe.vars[key] = value
}
// SetVars sets substitution variables
func (fe *Fullenv) SetVars(values map[string]string) {
for key, value := range values {
fe.vars[key] = value
}
}
func (fe *Fullenv) initCmd() error {
args := []string{
"run", "--no-prompt",
fe.configPath,
}
for key, value := range fe.vars {
args = append(args, "--var", key+"="+value)
}
cmd := exec.Command("xbus-fullenv", args...)
stdin, err := cmd.StdinPipe()
if err != nil {
return err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return err
}
fe.cmd = cmd
fe.stdin = stdin
go func() {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
s := scanner.Text()
if strings.HasPrefix(s, "< OK: ") {
fe.outputChan <- s[6:]
} else if strings.HasPrefix(s, "< ERR: ") {
fe.errorChan <- s[6:]
} else {
}
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "reading stdout:", err)
}
}()
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
s := scanner.Text()
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "reading stderr:", err)
}
}()
return nil
}
func (fe *Fullenv) nextOutput() (string, error) {
select {
case s := <-fe.outputChan:
return s, nil
case e := <-fe.errorChan:
return "", fmt.Errorf(e)
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
case <-fe.cmdQuit:
if fe.err != nil {
return "", fe.err
}
return "", fmt.Errorf("fullenv is closed")
}
}
func (fe *Fullenv) sendCommand(cmd ...string) (string, error) {
_, err := fe.stdin.Write([]byte(strings.Join(cmd, " ") + "\n"))
if err != nil {
return "", err
}
return fe.nextOutput()
}
// Start runs 'xbus-fullenv run' and keep it opened for further interaction
func (fe *Fullenv) Start() error {
if err := fe.initCmd(); err != nil {
return err
}
fe.cmdQuit = make(chan struct{})
go func() {
fe.err = fe.cmd.Run()
close(fe.cmdQuit)
}()
output, err := fe.nextOutput()
if err != nil {
return err
}
fe.targetDirPath = output
return nil
}
// Stop stops the fullenv
func (fe *Fullenv) Stop() error {
_, err := fe.stdin.Write([]byte("quit\n"))
if err != nil {
}
select {
case <-fe.cmdQuit:
return fe.Error()
case <-time.After(2 * time.Second):
if err := fe.cmd.Process.Kill(); err != nil {
fe.logger.Err(err).Msg("Error killing process")
return fmt.Errorf("Had to kill xbus-fullenv")
}
}
func (fe *Fullenv) Error() error {
return fe.err
}
// StartClients sends the 'startup' command to xbus-fullenv
func (fe *Fullenv) StartClients() error {
_, err := fe.sendCommand("startup")
return err
}
// ClientConfig returns the configuration path of a given client
func (fe *Fullenv) ClientConfig(name string) (string, error) {
return fe.sendCommand("client-config", name)
}
// LoadClientConfig load the xbus.COptions of a client
func (fe *Fullenv) LoadClientConfig(name string) (xbus.COptions, error) {
path, err := fe.ClientConfig(name)
if err != nil {
return xbus.COptions{}, err
}
return xbus.COptionsFromFile(path, api.Account_ACTOR, fe.logger)
}
// LoadClient loads a client instance
func (fe *Fullenv) LoadClient(name string) (*xbus.Client, error) {
options, err := fe.LoadClientConfig(name)
if err != nil {
return nil, err
}
client := xbus.NewClient(options, fe.logger, nil)
return client, nil
}