Skip to content

Commit

Permalink
updated documentation, renamte control to WorkflowRun
Browse files Browse the repository at this point in the history
  • Loading branch information
adranwit committed Mar 4, 2018
1 parent 132a181 commit 2117d58
Show file tree
Hide file tree
Showing 16 changed files with 90 additions and 50 deletions.
11 changes: 5 additions & 6 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var converter = toolbox.NewColumnConverter("yyyy-MM-dd HH:ss")

var serviceManagerKey = (*manager)(nil)
var deferFunctionsKey = (*[]func())(nil)

//WorkflowKey context.State workflow key
var WorkflowKey = (*Workflow)(nil)

//Context represents a workflow session context/state
Expand All @@ -36,6 +38,7 @@ type Context struct {
closed int32
}

//Publish publishes event to listeners, it updates current run details like activity workflow name etc ...
func (c *Context) Publish(value interface{}) *Event {
var workflow = c.Workflows.Last()
var workflowName = ""
Expand Down Expand Up @@ -64,6 +67,7 @@ func (c *Context) Publish(value interface{}) *Event {
return event
}

//SetListener sets context event listener
func (c *Context) SetListener(listener EventListener) {
c.listener = listener
}
Expand All @@ -73,11 +77,6 @@ func (c *Context) IsClosed() bool {
return atomic.LoadInt32(&c.closed) == 1
}

func reportError(err error) error {
fileName, funcName, line := toolbox.CallerInfo(4)
return fmt.Errorf("%v at %v:%v -> %v", err, fileName, line, funcName)
}

//Clone clones the context.
func (c *Context) Clone() *Context {
if len(c.cloned) == 0 {
Expand Down Expand Up @@ -139,7 +138,7 @@ func (c *Context) ExpandResource(resource *url.Resource) (*url.Resource, error)
return result, nil
}

//Service returns workflow manager or error
//Manager returns workflow manager or error
func (c *Context) Manager() (Manager, error) {
var manager = &manager{}
if !c.GetInto(serviceManagerKey, &manager) {
Expand Down
3 changes: 3 additions & 0 deletions criteria.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Criteria struct {
Criteria []*Criterion
}

//IsTrue evaluates criteria with supplied context and state map . Dolar prefixed $expression will be expanded before evaluation.
func (c *Criteria) IsTrue(context *Context, state data.Map) (bool, error) {
if c.LogicalOperator == "||" {
for _, criterion := range c.Criteria {
Expand Down Expand Up @@ -53,6 +54,7 @@ func (c *Criterion) expandOperand(opperand interface{}, state data.Map) interfac
return state.Expand(opperand)
}

//IsTrue evaluates criterion with supplied context and state map . Dolar prefixed $expression will be expanded before evaluation.
func (c *Criterion) IsTrue(context *Context, state data.Map) (bool, error) {
if c.Criteria != nil {
return c.Criteria.IsTrue(context, state)
Expand Down Expand Up @@ -104,6 +106,7 @@ func (c *Criterion) IsTrue(context *Context, state data.Map) (bool, error) {
return false, err
}

//NewCriterion creates a new criterion
func NewCriterion(leftOperand interface{}, operator string, rightOperand interface{}) *Criterion {
return &Criterion{
LeftOperand: leftOperand,
Expand Down
1 change: 1 addition & 0 deletions criteria_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (p *Parser) expectOptionalWhitespaceFollowedBy(tokenizer *toolbox.Tokenizer
return token, nil
}

//Parse parses supplied expression. It returns criteria or parsing error.
func (p *Parser) Parse(expression string) (*Criteria, error) {
result := NewCriteria("")
tokenizer := toolbox.NewTokenizer(expression, illegal, eof, matchers)
Expand Down
1 change: 1 addition & 0 deletions evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type CriteriaEvalEvent struct {
Error string
}

//NewCriteriaEvalEvent creates a new evaluation event.
func NewCriteriaEvalEvent(defaultValue, evaluation bool, criteria, expendedCriteria string, err error) *CriteriaEvalEvent {
var result = &CriteriaEvalEvent{
Default: defaultValue,
Expand Down
4 changes: 3 additions & 1 deletion event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Event struct {
Value interface{}
}

//Get returns supplied expected type or nil
func (e *Event) Get(expectedType reflect.Type) interface{} {
if e.Value == nil {
return nil
Expand All @@ -35,6 +36,7 @@ func (e *Event) Get(expectedType reflect.Type) interface{} {
return nil
}

//Type returns event type (simple package and struct name)
func (e *Event) Type() string {
if e.Value == nil {
return fmt.Sprintf("%T", e.Value)
Expand Down Expand Up @@ -113,7 +115,7 @@ func (e *Events) AsEventListener() EventListener {
}
}

//Drains removes all events from struct to publish to context
//Drain removes all events from struct to publish them to context
func (e *Events) Drain(context *Context) {
for i := 0; i < len(e.Events); i++ {
event := e.Shift()
Expand Down
3 changes: 2 additions & 1 deletion event_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (l *EventLogger) handlerError(err error) {
log.Print(err)
}

//OnEvent handles supplied event.
func (l *EventLogger) OnEvent(event *Event) {
l.mutex.Lock()
defer l.mutex.Unlock()
Expand Down Expand Up @@ -86,7 +87,7 @@ func (l *EventLogger) OnEvent(event *Event) {
_, _ = file.Write(buf)
}

//Log logs an event
//AsEventListener returns an event listener
func (l *EventLogger) AsEventListener() EventListener {
return func(event *Event) {
if l.listener != nil {
Expand Down
7 changes: 7 additions & 0 deletions helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package endly
import (
"bytes"
"encoding/base64"
"fmt"
"github.com/viant/toolbox"
"io/ioutil"
"strings"
"unicode"
Expand Down Expand Up @@ -49,3 +51,8 @@ func AsPayload(data []byte) string {
encoder.Close()
return "base64:" + buf.String()
}

func reportError(err error) error {
fileName, funcName, line := toolbox.CallerInfo(4)
return fmt.Errorf("%v at %v:%v -> %v", err, fileName, line, funcName)
}
37 changes: 19 additions & 18 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const AppName = "endly"
//Namespace represents endly namespace
const Namespace = "github.com/viant/endly/"

//Service represnets a workflow manager
//Manager represents a workflow manager
type Manager interface {
//Name returns an application ID
Name() string
Expand All @@ -44,51 +44,52 @@ type manager struct {
serviceByRequestType map[reflect.Type]Service
}

func (s *manager) Name() string {
return s.name
func (m *manager) Name() string {
return m.name
}

func (s *manager) Version() string {
return s.version
func (m *manager) Version() string {
return m.version
}

func (s *manager) Service(input interface{}) (Service, error) {
if serviceId, ok := input.(string); ok {
if result, found := s.serviceByID[serviceId]; found {
//Service returns service for supplied request or name.
func (m *manager) Service(input interface{}) (Service, error) {
if serviceID, ok := input.(string); ok {
if result, found := m.serviceByID[serviceID]; found {
return result, nil
}
} else if toolbox.IsStruct(input) {
if result, found := s.serviceByRequestType[reflect.TypeOf(input)]; found {
if result, found := m.serviceByRequestType[reflect.TypeOf(input)]; found {
return result, nil
}
}
var available = toolbox.MapKeysToStringSlice(s.serviceByID)
var available = toolbox.MapKeysToStringSlice(m.serviceByID)
return nil, fmt.Errorf("failed to lookup service: '%v' in [%v]", input, strings.Join(available, ","))
}

func (s *manager) Register(service Service) {
s.serviceByID[service.ID()] = service
func (m *manager) Register(service Service) {
m.serviceByID[service.ID()] = service
for _, action := range service.Actions() {
if actionRoute, err := service.ServiceActionRoute(action); err == nil {
request := actionRoute.RequestProvider()
s.serviceByRequestType[reflect.TypeOf(request)] = service
m.serviceByRequestType[reflect.TypeOf(request)] = service
}
}
}

func (s *manager) NewContext(ctx toolbox.Context) *Context {
func (m *manager) NewContext(ctx toolbox.Context) *Context {
sessionID := toolbox.AsString(time.Now().Unix())
if UUID, err := uuid.NewV1(); err == nil {
sessionID = UUID.String()
}
var workflowStack Workflows = make([]*Control, 0)
var workflowStack Workflows = make([]*WorkflowRun, 0)
var result = &Context{
SessionID: sessionID,
Context: ctx,
Wait: &sync.WaitGroup{},
Workflows: &workflowStack,
}
_ = result.Put(serviceManagerKey, s)
_ = result.Put(serviceManagerKey, m)
return result
}

Expand All @@ -110,7 +111,7 @@ func NewManager() Manager {
return result
}

//Run action for supplied request, returns service action response or error
//Run runs action for supplied request, returns service action response or error
func (m *manager) Run(context *Context, request interface{}) (interface{}, error) {
manager := NewManager()

Expand Down Expand Up @@ -138,7 +139,7 @@ func Services(mgr interface{}) map[string]Service {
return manager.serviceByID
}

//GetVersion return endly version
//GetVersion returns endly version
func GetVersion() string {
resource := url.NewResource(fmt.Sprintf("mem://%v/Version", Namespace))
version, _ := resource.DownloadText()
Expand Down
1 change: 1 addition & 0 deletions operating_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (p *SystemPath) Push(paths ...string) {
}
}

//NewSystemPath create a new system path.
func NewSystemPath(items ...string) *SystemPath {
return &SystemPath{
index: make(map[string]bool),
Expand Down
6 changes: 6 additions & 0 deletions registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ import (
//UdfRegistry represents a udf registry
var UdfRegistry = make(map[string]func(source interface{}, state data.Map) (interface{}, error))

//ServiceProvider represents a service provider
type ServiceProvider func() Service

//ServiceRegistry represents a service registry
type ServiceRegistry []ServiceProvider

//Register register service provider.
func (r *ServiceRegistry) Register(serviceProvider ServiceProvider) error {
if serviceProvider == nil {
return errors.New("provider was empty")
Expand All @@ -20,4 +24,6 @@ func (r *ServiceRegistry) Register(serviceProvider ServiceProvider) error {
}

var registry ServiceRegistry = make([]ServiceProvider, 0)

//Registry global service provider registry
var Registry = &registry
2 changes: 2 additions & 0 deletions repeatable.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,14 @@ func (r *Repeatable) Get() *Repeatable {
return result
}

//DataExtractionEvent represents data extraction event
type DataExtractionEvent struct {
Output string
StructuredOutput interface{}
Extracted interface{}
}

//NewDataExtractionEvent creates a new event.
func NewDataExtractionEvent(output string, structuredOutput, extracted interface{}) *DataExtractionEvent {
return &DataExtractionEvent{
Output: output,
Expand Down
7 changes: 4 additions & 3 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type AbstractService struct {
state data.Map
}

//Mutex returns a mutex.
func (s *AbstractService) Mutex() *sync.RWMutex {
return s.RWMutex
}
Expand Down Expand Up @@ -224,7 +225,7 @@ const (
NopServiceID = "nop"
)

//Request represent no operation
//NopRequest represent no operation
type NopRequest struct{}

//NopParrotRequest represent parrot request
Expand Down Expand Up @@ -288,7 +289,7 @@ func newNopService() Service {
}

const (
//ServiceID represents log service id.
//LoggerServiceID represents log service id.
LoggerServiceID = "logger"
)

Expand Down Expand Up @@ -318,7 +319,7 @@ func (s *loggerService) registerRoutes() {
},
Handler: func(context *Context, req interface{}) (interface{}, error) {
if request, ok := req.(*PrintRequest); ok {
if ! context.CLIEnabled {
if !context.CLIEnabled {
if request.Message != "" {
fmt.Printf("%v\n", request.Message)
}
Expand Down
1 change: 1 addition & 0 deletions udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
)

//TransformWithUDF transform payload with provided UDF name.
func TransformWithUDF(context *Context, udfName, source string, payload interface{}) (interface{}, error) {
var state = context.State()
var udf, has = UdfRegistry[udfName]
Expand Down
2 changes: 2 additions & 0 deletions variable.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,14 @@ func (v Variables) String() string {
return result
}

//ModifiedStateEvent represent modified state event
type ModifiedStateEvent struct {
Variables Variables
In map[string]interface{}
Modified map[string]interface{}
}

//NewModifiedStateEvent creates a new modified state event.
func NewModifiedStateEvent(variables Variables, in, out data.Map) *ModifiedStateEvent {
var result = &ModifiedStateEvent{
Variables: variables,
Expand Down
Loading

0 comments on commit 2117d58

Please sign in to comment.