# HG changeset patch # User Christophe de Vienne <christophe@cdevienne.info> # Date 1614365358 -3600 # Fri Feb 26 19:49:18 2021 +0100 # Node ID e43c02ba15dc895a59ad64fb5137371f5f4f243d # Parent 1aef9b530839dd3853f9a14dfe5f4efb28654736 Implement pipelining from the template diff --git a/engines/mjml/engine.go b/engines/mjml/engine.go --- a/engines/mjml/engine.go +++ b/engines/mjml/engine.go @@ -4,6 +4,7 @@ "bytes" "context" "errors" + "fmt" "io/ioutil" "orus.io/orus-io/rednerd/models" @@ -92,7 +93,7 @@ } mjmlSrc, err := ioutil.ReadAll(r) if err != nil { - return err + return fmt.Errorf("could not read input document: %s", err) } result, err := renderDukTape(e.ctx, string(mjmlSrc), Options{ @@ -108,7 +109,7 @@ w.RenderErrors(errors...) w.RenderErrors(wrapErrors(result.Errors)...) if _, err := w.Write([]byte(result.HTML)); err != nil { - return err + return fmt.Errorf("could not write: %s", err) } return nil } diff --git a/rendering/document.go b/rendering/document.go --- a/rendering/document.go +++ b/rendering/document.go @@ -79,6 +79,9 @@ p.cond.Wait() } err := p.err + if err == io.ErrClosedPipe { + err = nil + } p.l.Unlock() return err } diff --git a/rendering/registry.go b/rendering/registry.go --- a/rendering/registry.go +++ b/rendering/registry.go @@ -122,7 +122,7 @@ if err == nil { err = fmt.Errorf( "engine %s failed: %s", - stepResult.step.Engine.Info().Type, err) + stepResult.step.Engine.Info().Type, stepResult.err) cancel() } } diff --git a/rendering/template_registry.go b/rendering/template_registry.go --- a/rendering/template_registry.go +++ b/rendering/template_registry.go @@ -46,24 +46,24 @@ } // Register a TemplateEngine -func (s *TemplateEngineRegistry) Register(name string, engine TemplateEngine) error { - if _, ok := s.TemplateEngines[name]; ok { +func (r *TemplateEngineRegistry) Register(name string, engine TemplateEngine) error { + if _, ok := r.TemplateEngines[name]; ok { return ErrDuplicateName } - s.TemplateEngines[name] = engine + r.TemplateEngines[name] = engine return nil } // MustRegister a TemplateEngine -func (s *TemplateEngineRegistry) MustRegister(name string, engine TemplateEngine) { - if err := s.Register(name, engine); err != nil { +func (r *TemplateEngineRegistry) MustRegister(name string, engine TemplateEngine) { + if err := r.Register(name, engine); err != nil { panic(err) } } // Find finds a template engine by the language it supports -func (s *TemplateEngineRegistry) Find(language string) TemplateEngine { - for _, engine := range s.TemplateEngines { +func (r *TemplateEngineRegistry) Find(language string) TemplateEngine { + for _, engine := range r.TemplateEngines { if engine.Info().Language == language { return engine } @@ -72,8 +72,8 @@ } // Load a template with the proper engine -func (s *TemplateEngineRegistry) Load(ctx context.Context, template *models.Template) (Template, error) { - engine := s.Find(template.Language) +func (r *TemplateEngineRegistry) Load(ctx context.Context, template *models.Template) (Template, error) { + engine := r.Find(template.Language) if engine == nil { return nil, ErrUnknownTemplateLanguage } @@ -91,3 +91,57 @@ // DefaultTemplateEngineRegistry is the default engine registry DefaultTemplateEngineRegistry = NewTemplateEngineRegistry() ) + +func (r *TemplateEngineRegistry) NewPipeline( + ctx context.Context, + registry *EngineRegistry, template *models.Template, metadata models.Metadata, targetType string, +) (*TemplatePipeline, error) { + tmpl, err := r.Load(ctx, template) + if err != nil { + return nil, err + } + p, err := registry.GetPipeline(template.Produces, targetType) + if err != nil { + return nil, err + } + return &TemplatePipeline{ + templateProduces: template.Produces, + metadata: metadata, + template: tmpl, + pipeline: p, + }, nil +} + +type TemplatePipeline struct { + templateProduces string + metadata models.Metadata + template Template + pipeline Pipeline +} + +func (t *TemplatePipeline) Render( + ctx context.Context, w DocumentWriter, record models.Record, +) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + pipeErr := make(chan error, 1) + if len(t.pipeline) != 0 { + pw, pr := DocumentPipe() + go func(ctx context.Context, w DocumentWriter, r DocumentReader) { + pipeErr <- t.pipeline.Render(ctx, w, r) + }(ctx, w, pr) + w = pw + } + w.SetType(t.templateProduces) + if err := t.template.Render(ctx, w, record); err != nil { + _ = w.Close() + return err + } + if err := w.Close(); err != nil { + return err + } + if len(t.pipeline) != 0 { + return <-pipeErr + } + return nil +} diff --git a/restapi/handlers/render.go b/restapi/handlers/render.go --- a/restapi/handlers/render.go +++ b/restapi/handlers/render.go @@ -47,9 +47,8 @@ } var ( - log = zerolog.Ctx(ctx) - result []*models.Document - pipeline rendering.Pipeline + log = zerolog.Ctx(ctx) + result []*models.Document ) if template != nil { @@ -68,30 +67,21 @@ templatedb.FromDB() template = &templatedb.Template } - tmpl, err := h.templateRegistry.Load(ctx, template) + pipeline, err := h.templateRegistry.NewPipeline( + ctx, h.registry, template, metadata, toType, + ) if err != nil { if err == rendering.ErrUnknownTemplateLanguage { return nil, utils.HTTPBadRequest( fmt.Errorf("unknown template language: %s", template.Language)) } return nil, utils.HTTPBadRequest( - fmt.Errorf("error loading template: %s", err)) + fmt.Errorf("error loading rendering pipeline: %s", err)) } - p, err := h.registry.GetPipeline(template.Produces, toType) - if err != nil { - return nil, utils.HTTPBadRequest(err) - } - pipeline = p for _, record := range data { var doc models.Document w := rendering.NewDocumentWriter(&doc) - w.SetType(template.Produces) - if err := tmpl.Render(ctx, w, record); err != nil { - _ = w.Close() - return nil, utils.HTTPBadRequest( - fmt.Errorf("error rendering template: %s", err)) - } - if err := w.Close(); err != nil { + if err := pipeline.Render(ctx, w, record); err != nil { return nil, err } result = append(result, &doc) @@ -99,34 +89,27 @@ } if document != nil { - p, err := h.registry.GetPipeline(document.Type, toType) + pipeline, err := h.registry.GetPipeline(document.Type, toType) if err != nil { return nil, utils.HTTPBadRequest(err) } - pipeline = p - result = append(result, document) - } - var outputs = make([]*models.Document, 0, len(result)) - for _, doc := range result { - // XXX We may want to merge the metadata before template rendering - doc.UpdateMetadata(metadata) + document.UpdateMetadata(metadata) if len(pipeline) != 0 { var output models.Document - reader := rendering.NewDocumentReader(doc) + reader := rendering.NewDocumentReader(document) writer := rendering.NewDocumentWriter(&output) if err := pipeline.Render(ctx, writer, reader); err != nil { return nil, err } - outputs = append(outputs, &output) + result = append(result, &output) } else { - outputs = append(outputs, doc) + result = append(result, document) } } - result = outputs return result, nil }