Skip to content

Commit

Permalink
added ec2 intergation, refactor common logic into repeatable abstract…
Browse files Browse the repository at this point in the history
…ion, added workflow repeat, switch on task and action level
  • Loading branch information
adranwit committed Jan 11, 2018
1 parent 8fb1dbd commit d961a01
Show file tree
Hide file tree
Showing 39 changed files with 1,299 additions and 382 deletions.
34 changes: 25 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ The following expression are supported:
**Workflow Lifecycle**

1) New context with a new state map is created after inheriting values from a caller. (Caller will not see any state changes from downstream workflow)
2) **data** key is published to the state map with defined workflow.data
2) **data** key is published to the state map with defined workflow.data. Workflow data field would stores complex nested data structure lika a setup data.
2) **params** key is published to state map with the caller parameters
3) Workflow initialization stage executes, applying variables defined in Workflow.Pre (input: workflow state, output: workflow state)
4) Tasks Execution
Expand Down Expand Up @@ -214,10 +214,8 @@ The following expression are supported:
<a name="SystemServices"></a>
## System services


All services are running on the system referred as target and defined as [Resource](https://raw.githubusercontent.com/viant/toolbox/master/url/resource.go)


**Execution services**

The execution service is responsible for opening, managing terminal session, with ability to send command and extract data.
Expand All @@ -230,7 +228,6 @@ The execution service is responsible for opening, managing terminal session, wit
| exec | managed-command | executes commands with ability to extract data, define error or success state | [ExtractableCommandRequest](service_exec_command.go) | [SystemCommandResponse](service_system_exec_command_response.go) |



**Daemon service.**

Daemon System service is responsible for managing system daemon services.
Expand Down Expand Up @@ -331,6 +328,23 @@ Maven, tomcat use this service.
| deployment | deploy | run deployment | [DeploymentDeployRequest](service_deployment_deploy.go) | [DeploymentDeployResponse](service_deployment_deploy.go) |


### Cloud services


**AWC Ec2 Service**

Ec2 service - amazon computing service management.


| Service Id | Action | Description | Request | Response |
| --- | --- | --- | --- | --- |
| aws/ec2 | call | Run ec2 operation | [Ec2CallRequest](service_ec2_call.go) | [Ec2CallResponse](service_ec2_call.go) |

'call' action's method and input are proxied to [Ec2 client](github.com/aws/aws-sdk-go/service/ec2)




<a name="Testingservices"></a>
### Testing services

Expand All @@ -351,12 +365,10 @@ Http runner sends one or more http request to the specified endpoint, it manages
| rest/runner | send | Sends one rest request to the endpoint. | [RestSendRequest](service_rest_send.go) | [RestSendResponse](service_rest_send.go) |



**Selenium Runner**

Selenium runner open a web session to run various action on web driver or web elements.


| Service Id | Action | Description | Request | Response |
| --- | --- | --- | --- | --- |
| selenium | start | Starts standalone selenium server | [SeleniumServerStartRequest](service_selenium_start.go) | [SeleniumServerStartResponse](service_selenium_start.go) |
Expand All @@ -367,6 +379,7 @@ Selenium runner open a web session to run various action on web driver or web el
| selenium | call-element | Call a method on a web element, i.e. we.Click() | [SeleniumWebElementCallRequest](service_selenium_call_web_element.go) | [SeleniumWebElementCallResponse](service_selenium_call_web_element.go) |
| selenium | run | Run set of action on a page | [SeleniumRunRequest](service_selenium_run.go) | [SeleniumRunResponse](service_selenium_run.go) |

call-driver and call-element actions's method and parameters are proxied to stand along selenium server via [selenium client](github.com/tebeka/selenium)

**Generic validation service**

Expand Down Expand Up @@ -435,6 +448,7 @@ Take as example the following actual and expected data structure.
```


TODO: unify endly and dsunit validation with macro expression.

**Datastore services**

Expand All @@ -459,7 +473,6 @@ DsUnit uses its own predicate and macro system to perform advanced validation se
<a name="Workfowservice"></a>
## Workflow service


**Workflow Service**

Workflow service provide capability to run task, action from any defined workflow.
Expand All @@ -469,10 +482,13 @@ Workflow service provide capability to run task, action from any defined workflo
| workflow | load | Loads workflow from provided path | [WorkflowLoadRequest](service_workflow_load.go) | [WorkflowLoadRequest](service_workflow_load.go) |
| workflow | register | Register provide workflow in registry | [WorkflowLoadRequest](service_workflow_register.go) | |
| workflow | run | run workflow with specified tasks and parameters | [WorkflowRunRequest](service_workflow_run.go) | [WorkflowRunResponse]((service_workflow_run.go) |
| workflow | switch-task | Runs matched task with provided switch/case sourceKey and value | [WorkflowSwitchTaskRequest](service_workflow_repeat_task.go) | [WorkflowSwitchTaskResponse](service_workflow_repeat_task.go) |
| workflow | switch-action | Runs matched service action with provided switch/case sourceKey and value | [WorkflowSwitchActionRequest](service_workflow_repeat_task.go) | [WorkflowSwitchActionResponse](service_workflow_repeat_task.go) |
| workflow | repeat-task | Repeats specified task n times or till exit criteria is met | [WorkflowRepeatActionRequest](service_workflow_repeat_task.go) | [WorkflowRepeatActionResponse](service_workflow_repeat_task.go) |
| workflow | repeat-action | Repeats specified service action n times or till exit criteria is met| [WorkflowRepeatTaskRequest](service_workflow_repeat_task.go) | [WorkflowRepeatTaskResponse](service_workflow_repeat_task.go) |





**Predefined workflows**

| Name | Task |Description |
Expand Down
2 changes: 1 addition & 1 deletion context.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ It comes with the following registered keys:
* tmpDir - temp directory
* uuid.next - generate unique id
* uuid.get - returns previously generated unique id, or generate new
*.end.XXX where XXX is the Id of the env variable to return
*.end.XXX where XXX is the ID of the env variable to return
* all UFD registry functions
*/
func NewDefaultState() data.Map {
Expand Down
4 changes: 2 additions & 2 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Event struct {
StartEvent *Event //starting event
Timestamp time.Time //start time
TimeTakenMs int //time taken
Workflow string //workflow Id
Workflow string //workflow ID
Task *WorkflowTask //task
Activity *WorkflowServiceActivity //activity details
Level int //logging level
Expand All @@ -73,7 +73,7 @@ type Event struct {
//Info returns basic event info
func (e *Event) Info() string {
var name = ""
if value, ok := e.Value["Id"]; ok {
if value, ok := e.Value["ID"]; ok {
name = toolbox.AsString(value)
}
return fmt.Sprintf("%v", name)
Expand Down
4 changes: 2 additions & 2 deletions example/etl/transformer/endly/regresion/regresion.csv
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ Workflow,,,Name,Description,Tasks,,,,,,,
,use_cases/${index}*,dsunit,prepare,prepare aerospike data,#req/prepare_data.json,${path}/db4,db4,,,,,
,use_cases/${index}*,rest/runner,send,send copy request,#req/rest_send.json|#copy.json,/api/copy/,,[$arg0] | $var,rest.actual,Response.Status,,
,use_cases/${index}*,validator,assert,validate register respose,#req/rest_assert.json,,,,,,ok,
,use_cases/${index}*,dsunit,expect,validate report table in datastore,#req/expect_data.json,${path}/db3,db3,,,,,
,use_cases/${index}*,dsunit,expect,validate report table in datastore,#req/expect_data.json,${path}/db4,db4,,,,,
,use_cases/${index}*,dsunit,expect,validate mysql datastore,#req/expect_data.json,${path}/db3,db3,,,,,
,use_cases/${index}*,dsunit,expect,validate aerospike datastore,#req/expect_data.json,${path}/db4,db4,,,,,
4 changes: 2 additions & 2 deletions example/etl/transformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package transformer

import (
"fmt"
"github.com/satori/go.uuid"
"github.com/viant/dsc"
"os"
"path"
"sync"
"sync/atomic"
"time"
"github.com/viant/toolbox"
)

//Service represents transformer service
Expand All @@ -27,7 +27,7 @@ type service struct {

func (s *service) registerTask(baseResponse *BaseResponse, taskInfo *TaskInfo, dataset string, request interface{}) {
var task = &Task{
ID: uuid.NewV4().String(),
ID: toolbox.AsString(time.Now().Nanosecond()),
Table: dataset,
BaseResponse: baseResponse,
TaskInfo: taskInfo,
Expand Down
7 changes: 4 additions & 3 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ const AppVersion = "0.0.1"
//Manager represnets a workflow manager
type Manager interface {

//Name returns an application Id
//Name returns an application ID
Name() string

//Version returns an application version
Version() string

//Service return a workflow service for provided Id or error
//Service return a workflow service for provided ID or error
Service(name string) (Service, error)

//Register register service in this manager
Expand Down Expand Up @@ -57,7 +57,7 @@ func (s *manager) Service(name string) (Service, error) {
}

func (s *manager) Register(service Service) {
s.services[service.Id()] = service
s.services[service.ID()] = service
}

func (s *manager) NewContext(ctx toolbox.Context) *Context {
Expand Down Expand Up @@ -110,6 +110,7 @@ func NewManager() Manager {
result.Register(NewEventReporterService())
result.Register(NewNetworkService())
result.Register(NewSeleniumService())
result.Register(NewEc2Service())
return result
}

Expand Down
134 changes: 134 additions & 0 deletions repeatable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package endly

import (
"fmt"
"github.com/viant/toolbox"
"github.com/viant/toolbox/data"
"time"
)

//Repeatable represent repetable execution
type Repeatable struct {
Extraction DataExtractions //data extraction
Variables Variables // input JSON body map, output state.httpPrevious
Repeat int //how many time send this request
SleepInMs int //Sleep time after request send, this only makes sense with repeat option
ExitCriteria string //Repeat exit criteria, it uses extracted variable to determine repeat termination
}

func asStructureData(source interface{}) data.Map {
if source == nil {
return data.Map(map[string]interface{}{})
}
var aMap = make(map[string]interface{})
if toolbox.IsStruct(source) {
converter = toolbox.NewColumnConverter(toolbox.DefaultDateLayout)
converter.AssignConverted(&aMap, source)
} else if toolbox.IsMap(source) {
aMap = toolbox.AsMap(source)
}
return data.Map(aMap)
}

//AsExtractable returns extractable text and struct
func (r *Repeatable) AsExtractable(context *Context, input interface{}) (string, map[string]interface{}) {
var extractableOutput string
var structuredOutput data.Map
switch value := input.(type) {
case string:
extractableOutput = value
case []byte:
extractableOutput = string(value)
case []interface{}:
if len(value) > 0 {
if toolbox.IsString(value[0]) {
extractableOutput = toolbox.AsString(value[0])
} else {
structuredOutput = asStructureData(value[0])
}
}
default:
structuredOutput = asStructureData(value)
}

if extractableOutput != "" {
if toolbox.IsCompleteJSON(extractableOutput) {
if aMap, err := toolbox.JSONToMap(extractableOutput); err == nil {
structuredOutput = data.Map(aMap)
}
}
}
return extractableOutput, structuredOutput
}

//EvaluateExitCriteria check is exit criteria is met.
func (r *Repeatable) EvaluateExitCriteria(callerInfo string, context *Context, extracted map[string]string) (bool, error) {
var extractedState = context.state.Clone()
for k, v := range extracted {
extractedState[k] = v
}
criteria := extractedState.ExpandAsText(r.ExitCriteria)
canBreak, err := EvaluateCriteria(context, criteria, callerInfo, false)
if err != nil {
return true, fmt.Errorf("failed to check %v exit criteia: %v", callerInfo, err)
}
if canBreak {
return true, nil
}
return false, nil

}

//Run repeats x times supplied handler
func (r *Repeatable) Run(callerInfo string, context *Context, handler func() (interface{}, error), extracted map[string]string) error {
for i := 0; i < r.Repeat; i++ {
out, err := handler()
if err != nil {
return err
}
extractableOutput, structuredOutput := r.AsExtractable(context, out)
if extractableOutput != "" {
extracted["value"] = extractableOutput //string output is published as $value
err := r.Extraction.Extract(context, extracted, extractableOutput)
if err != nil {
return err
}
}
if len(structuredOutput) > 0 {
var extractedVariables = data.NewMap()
_ = r.Variables.Apply(structuredOutput, extractedVariables)
for k, v := range extractedVariables {
extracted[k] = toolbox.AsString(v)
}
}
if r.ExitCriteria != "" {
if canBreak, err := r.EvaluateExitCriteria(callerInfo+"ExitEvaluation", context, extracted); canBreak || err != nil {
return err
}
}
if r.SleepInMs > 0 {
timeToSleep := time.Millisecond * time.Duration(r.SleepInMs)
time.Sleep(timeToSleep)
}
}
return nil
}

//NewRepeatable creates a new repeatable struct
func NewRepeatable() *Repeatable {
return &Repeatable{
Repeat: 1,
}
}

//Get returns non empty instance of default instance
func (r *Repeatable) Get() *Repeatable {
var result = r
if r == nil {
result = NewRepeatable()
}
if result.Repeat == 0 {
result.Repeat = 1
}
return result
}
6 changes: 3 additions & 3 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type Service interface {

//service id
Id() string
ID() string

//service state map
State() data.Map
Expand Down Expand Up @@ -102,8 +102,8 @@ func (s *AbstractService) End(context *Context) func(*Event, map[string]interfac
}
}

//Id returns this service id.
func (s *AbstractService) Id() string {
//ID returns this service id.
func (s *AbstractService) ID() string {
return s.id
}

Expand Down
4 changes: 2 additions & 2 deletions service_docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ func (s *dockerService) stopImages(context *Context, request *DockerStopImagesRe
https://docs.docker.com/compose/reference/run/
Options:
-d Detached mode: Run container in the background, print
new container Id.
--Id NAME Assign a Id to the container
new container ID.
--ID NAME Assign a ID to the container
--entrypoint CMD Override the entrypoint of the image.
-e KEY=VAL Set an environment variable (can be used multiple times)
-u, --user="" Run as specified username or uid
Expand Down
Loading

0 comments on commit d961a01

Please sign in to comment.