Cookie Consent by FreePrivacyPolicy.com TIBET
TIBET Logo

TIBET

Workflow

TIBET Workflow System (TWS)


Wins

  • Powerful, micro-service engine running on Node.js.
  • Job state changes are persistent and trigger actions.
  • Event-driven task and job transitions via change feed.
  • Fully extensible via micro-services written in simple JS.
  • Fully supported as part of the overall TIBET ecosystem.

Contents

Concepts

Cookbook

Preliminaries

Tasks

Workflows

Jobs

Code

Internals

Concepts

The TIBET Workflow System (TWS) is a micro-service execution subsystem that leverages the TDS and CouchDB to provide simple, scalable task execution out of the box.

CouchDB documents hold the definitions of your workflows, their tasks, and job requests. Storing a new job document triggers the engine and activates the associated tasks.

Individual flows are composed of one or more tasks. Sample tasks include sending email, storing data via AWS, or performing credit card transactions. Writing your own tasks is straightforward, letting you keep your server-side logic highly composable.

Job Activation

In response to storage of a job document CouchDB's changes feed notifies observers and the TDS tasks plugin takes over, triggering job execution. The actual work is performed by loadable modules in the TDS so the scope of a task is only limited by what you can do from Node.js.

The output below shows the TDS startup sequence and output from the TWS specific to execution of a sample workflow:

TIBET Workflow System
TIBET Workflow System

Job Submission

Below we show a sample.json document targeting the sample workflow (which has a single task, send an email). The parameters for the email shown on the left use TIBET binding syntax (double square braces) which the tibet tws submit command will prompt for:

TWS Job Submission
TWS Job Submission

Submitting jobs can be done via curl, the TIBET client, the TIBET CLI, or any other HTTP client interface. If it can PUT a JSON document to an HTTP REST API it can submit a TWS job.

The TIBET Workflow System reflects the real power of TIBET+CouchDB integration as well as the power of integration between TIBET's CLI and other components. With the TWS you can be up and running with a scalable microservices solution in minutes.

TIBET Workflow
Workflow Overview

Each concrete task relies on a corresponding task definition stored in CouchDB. Tasks are combined by virtue of flow documents which serve as blueprints for how to execute a sequence of tasks, including any default parameters for the flow and its tasks.

Execution of a flow is triggered by storage of a job document. In response, the TWS will look up the job's referenced flow and use it as a template for executing the various tasks for that job. All flow/task transitions are responses to document updates.

Each job gets a unique copy of the task and flow information so you have full audit trails for every job specific to the flow definition at the time the job ran.

Cookbook

Preliminaries

Install CouchDB

If you don't have a copy of CouchDB running (or a cloud-based instance available from Cloudant or a similar hosting service) you need to install CouchDB.

Follow the instructions at http://couchdb.apache.org/ to install and configure CouchDB, or the instructions at the cloud-based hosting service of your choice.

Make sure CouchDB is running

Access your CouchDB instance via the administration page (http://127.0.0.1:5984/_utils) or use curl or a similar command to confirm CouchDB is accessible:

$ curl -X GET http://127.0.0.1:5984

{"couchdb":"Welcome","version":"2.0.0","vendor":{"name":"The Apache Software Foundation"}}

If your CouchDB installation isn't running/accessible you'll want to get that sorted out before moving forward.

Clone/init a TWS-capable project using default or couch dna

The TWS relies on the TDS so you need to create a project using either the default or couch dna to get a project set up with a TIBET server.

While both default and couch projects support the TWS only couch dna also supports running your application directly from CouchDB (as a 'couchapp'). If you don't intend to run your application in couchapp form you should choose default dna.

For example, we create and initialize a default project in the example below:

$ tibet clone {{appname}}
$ cd {{appname}}
$ tibet init

Alternatively you can create a CouchDB-focused project via --dna couch:

$ tibet clone {{appname}} --dna couch
$ cd {{appname}}
$ tibet init

Activate the TDS tasks plugin

With your project initialized, you need to enable the TDS tasks plugin. This will tell the TDS that it should load and activate the server middleware which processes TWS operations.

To activate the TWS task plugin set TIBET's tds.use_tasks configuration flag to true:

$ tibet config tds.use_tasks=true

Install task plugin dependencies

Depending on the tasks you intend to use you will need to add certain package dependencies to your project.

When your project is created using default or couch dna the package.json file doesn't automatically include the modules for the various built-in task samples. You need to use npm to install the modules required by the specific task plugins your workflows will activate to avoid seeing job failures.

mail-smtp

The built-in mail-smtp task uses nodemailer to send mail via an SMTP host. If you want to use that task in your workflows install the dependencies via:

npm install --save nodemailer

s3-upload

The built-in 's3-upload' task uses aws-sdk (Amazon AWS NodeJS) to communicate with S3. Install that package using:

npm install --save aws-sdk

Repeat this sequence for any other modules you intend to script into flows. See the documention on individual TWS tasks for details on their requirements.

Confirm configuration settings for TWS

The TWS relies on a CouchDB database to store flow, task, and job documents and their associated views. CouchDB connection and database information is specified by TIBET configuration parameters and/or environment variables.

If you're running a local instance that doesn't require authentication data you can rely entirely on TIBET configuration values. These include:

//  tds.couch params are for couch-based TIBET applications
TDS.getcfg('tds.couch.scheme') || 'http'
TDS.getcfg('tds.couch.host') || '127.0.0.1';
TDS.getcfg('tds.couch.port') || 5984;

//  tds.tasks params are for TWS operation (including separate DB)
TDS.getcfg('tds.tasks.db_name') || TDS.getcfg('tds.couch.db_name') || TDS.getcfg('npm.name');
TDS.getcfg('tds.tasks.db_app') || TDS.getcfg('tds.couch.db_app') || 'tibet';

You can list (and adjust) the various settings using tibet config:

$ tibet config tds.tasks
{
    "tds.tasks.db_app": "tws",
    "tds.tasks.db_name": "tasks",
    ...snipped...
}

$ tibet config tds.couch
{
    "tds.couch.db_app": "tibet",
    "tds.couch.host": "127.0.0.1",
    "tds.couch.port": "5984",
    "tds.couch.scheme": "http",
    ...snipped...
}

With the default settings you can expect to be using a CouchDB database named {{appname}}_tasks and a design document named _design/tws for storage of the various jobs, tasks, and flows in your TWS configuration.

Use the tibet config command to adjust these values to meet your needs.

Confirm environment variables for TWS

There are several commonly used environment variables which override any settings found in the TIBET configuration values.

Environment variables are recommended for anything involving a username or password since they are less likely to be checked in to a repository inadvertently.

COUCH_URL       //  a full URL including any user/pass information

COUCH_USER      //  optional username for basic authentication
COUCH_KEY       //  optional api_key (supercedes COUCH_USER)
COUCH_PASS      //  optional password for basic authentication

COUCH_DATABASE  //  the name of the database to connect to

COUCH_APPNAME   //  TIBET-specific. What _design/{{application name}} ?

If your CouchDB server is configured to required a username/password to log in ensure the COUCH_USER and COUCH_PASS environment variables are set.

For example, you might use the following lines, with admin/password set to their proper values for your CouchDB installation:

export COUCH_USER=admin
# export COUCH_KEY=api_key  (optional, but supercedes COUCH_USER)
export COUCH_PASS=password

NOTE: Error messages including You are not a server admin mean you need to export or otherwise ensure that the proper COUCH_USER and COUCH_PASS environment variables are set to match a valid CouchDB admin account.

Initialize the TWS database

Before you can use the TWS you need to initialize the task engine database using the tibet tws init command. This command will check for the TWS database, the appropriate design document, and the baseline view configuration required for the engine to operate correctly.

NOTE that tibet tws init is idempotent so running it on an existing database won't harm anything, it will simply insert any missing views as needed.

Be aware that when running the tibet tws init command you will be prompted to confirm the current CouchDB parameters as determined by the current configuration and environment settings. When these prompts appear any username and password information is automatically hidden.

$ tibet tws init
CouchDB base [http://127.0.0.1:5984] ?
using base url 'http://127.0.0.1:5984'.
Database name [{{appname}}_tasks] ?
Application name [tws] ?
confirming TWS database
creating TWS database
confirming TWS design doc
inserting TWS design doc
confirming TWS views
inserting TWS view block
TWS initialized

Initialize TWS tasks and flows

When you create a new default or couch project your application folder will include a root couch directory containing at least one subdirectory named tws. The couch/tws directory contains the template files used by the tibet tws command when initializing and pushing data to CouchDB.

In particular, the couch/tws directory will contain flows, tasks, and jobs directories, each of which contains sample JSON documents for each object type.

The tibet tws command has subcommands for list, push, view et. al. which allow you to list and push (insert/update) documents from each of the tws subdirectories found in your project's couch directory.

Pushing Tasks/Flows/Jobs

Before you can execute a successful job in the TWS you must upload at least one task and one flow document into the TWS's database. Your first step should be to review the sample JSON documents and adjust them to contain the proper names and parameters you'd like for your particular configuration.

Once you have updated each JSON file to meet your needs you can push them using the tibet tws command.

To push all the flow and task definitions use tibet tws push --map

$ tibet tws push --map --no-confirm
updating: /Users/ss/temporary/dbtest/couch/tws/tasks/mailer.json
skipping: /Users/ss/temporary/dbtest/couch/tws/flows/_sample_flow.json
updating: /Users/ss/temporary/dbtest/couch/tws/flows/mailtest.json
updating: /Users/ss/temporary/dbtest/couch/tws/flows/mailtwo.json
/Users/ss/temporary/dbtest/couch/tws/tasks/mailer.json =>
{
    "ok": true,
    "id": "e52b072b50c597298c310ea9c8005852",
    "rev": "6-fe94b4637b5d66320583f8ff0e63910d"
}
/Users/ss/temporary/dbtest/couch/tws/flows/mailtwo.json =>
{
    "ok": true,
    "id": "e52b072b50c597298c310ea9c80050b2",
    "rev": "5-fe65ba812451a632a0b2a17756816f44"
}
/Users/ss/temporary/dbtest/couch/tws/flows/mailtest.json =>
{
    "ok": true,
    "id": "31ee427eac51b94470bdfd14da00715e",
    "rev": "14-f3644ec1c48ce27336ec5915dc83f356"
}

To push a single file just reference that file directly. The TIBET command line can resolve TIBET virtual paths and the value ~tws references the root of all TWS templates. This allows you to use paths of the form ~tws/tasks/{{file}} or ~tws/flows/{{file}} to refer to individual templates.

To push the current definition for the task named s3, for example, use. tibet tws push '~tws/tasks/s3.json'.

As with most tibet tws commands you will be prompted to confirm the current CouchDB parameters. If you want to skip these prompts add --no-confirm to your command line:

$ tibet tws push '~tws/tasks/s3.json'
CouchDB base [http://127.0.0.1:5984] ?
using base url 'http://127.0.0.1:5984'.
Database name [{{appname}}_tasks] ?
Application name [tws] ?
{
    "ok": true,
    "id": "e52b072b50c597298c310ea9c80033a7",
    "rev": "1-5eeca1d84d794fd4ea289c6dafed98d7"
}

If your JSON document includes a value for _id that ID will be used to check for an existing document. If no _id field exists the document will be updated after the push with the id value returned by the insert operation. In the output above that would be e52b072b50c597298c310ea9c80033a7. This approach allows the system to coordinate JSON documents on disk with their counterparts in CouchDB, avoiding duplication with multiple push operations.

Use the tibet tws push {{name}} approach to upload and adjust your task and flow definitions as needed to configure your TWS engine.

Listing Existing Tasks/Flows/Jobs

You can list the existing tasks, flows, and jobs by using the --list flag on the tibet tws command.

$ tibet tws list --tasks
mail => mail-smtp (31ee427eac51b94470bdfd14da0050cd)
s3 => s3-upload (e52b072b50c597298c310ea9c80033a7)

Add the --verbose flag to see entire documents:

$ tibet tws list --tasks --verbose
[{
    "_id": "31ee427eac51b94470bdfd14da0050cd",
    "_rev": "4-da2d9b9650e08b4b715f99b4baaa16ea",
    "type": "task",
    "name": "mail",
    "plugin": "mail-smtp",
    "params": {
        "smtp": {
            "service": "gmail",
            "auth": {
                "user": "useraftertibetencrypt",
                "pass": "passaftertibetencrypt"
            }
        },
        "from": "mailer@technicalpursuit.com",
        "html": "<code>{{params}}</code>"
    }
}, {
    "_id": "e52b072b50c597298c310ea9c80033a7",
    "_rev": "1-5eeca1d84d794fd4ea289c6dafed98d7",
    "type": "task",
    "name": "s3",
    "plugin": "s3-upload",
    "params": {
        "auth": {
            "id": "idaftertibetencrypt",
            "secret": "secretaftertibetencrypt"
        },
        "region": "us-east-1",
        "bucket": "default",
        "key": "",
        "body": ""
    }
}]

NOTE that the various authentication values here are encrypted via the tibet encrypt command. See the detailed documentation further in this document for more information.

Start the TIBET Data Server:

$ tibet start

Tasks

Tasks are the smallest unit of workflow in the TWS. You can think of the individual tasks as micro-services…small, composable units of work.

A task definition is a JSON document which provides a unique identifier and parameters for how to invoke a TWS task plugin such as the built-in mail-smtp or s3-upload task. While TIBET includes some pre-built tasks it's fully expected that you will create and compose your own tasks and workflows.

Define a Task:

Tasks are JSON documents stored in CouchDB and used as steps in a Flow.

{
    "type": "task",
    "name": string,
    "params": JSON,
    "error": string,
    "timeout": number=15000,
    "retry": number=0,
    "flow": string,
    "plugin": string
}

Each concrete task must include a type of task, and a unique name.

{
    "type": "task",
    "name": "mail-smtp"
}

Each task name must either match a TWS plugin from the project's tasks directory or the task must include a plugin slot to reference a different TWS plugin to process this task type. Here we're using mailer as the name and pointing that task name to the mail-smtp plugin for actual operation.

{
    "type": "task",
    "name": "mailer",
    "plugin": "mail-smtp"
}

In addition to type, name, and plugin, you can define parameters for the task which serve as defaults for all instances of that task as shown below.

mail-smtp

As a concrete example, here's a task definition for the built-in mail-smtp plugin.

Note we've included parameters for the task including an HTML template with embedded substitution syntax. The mail-smtp plugin will use this template to merge actual values with the template at runtime:

{
    "type": "task",
    "name": "mail",
    "plugin": "mail-smtp",
    "params": {
        "smtp": {
            "service": "gmail",
            "auth": {
                "user": "someuser@yourdomain.com",
                "pass": "tibetencryptoutputforpassword"
            }
        },
        "from": "mailer@yourdomain.com",
        "html": "<code>{{json params}}</code>"
    }
}

Using Futon/Fauxton you can cut/paste the content of the above document, adjusting the specifics to match your particular SMTP server requirements.

Be sure to save the document.

For security reasons the value for params.smtp.auth.pass should always be stored in encrypted form. See tibet help encrypt for more information on how to use the TIBET CLI to produce an encrypted password value you can store safely. (The TDS will automatically decrypt these values using environment values to drive the decryption).

NEVER STORE UNENCRYPTED CREDENTIALS IN COUCHDB.

If you need more information on the smtp parameter options see the documentation on nodemailer, the underlying SMTP transport used for this task. Common alterations would be to change the service or use a specific host and port.

s3-upload

Another concrete example for the built-in s3-upload plugin is shown below:

{
   "type": "task",
   "name": "s3",
   "plugin": "s3-upload",
   "params": {
       "auth": {
           "id": "870e53f68dfa6b2fd7144762b8460ccf07a54b79",
           "secret": "ff337df0a69f044ba5260672be7d1dc01aa57b7e4a0c1b51d059d98d0448ff958ebf2bbc21c3ca2b"
       },
       "region": "us-east-1",
       "bucket": "default",
       "key": "",
       "body": ""
   }
}

As shown in our example above you can also provide values for the other parameters used by the s3-upload task including the region, bucket name, key, and body (which can be a handlebars template which will be merged with the task parameter data).

NOTE: As with the mail-smtp example the auth values shown here for id and secret are the result of running tibet encrypt on your actual S3 credential values.

NEVER STORE UNENCRYPTED CREDENTIALS IN COUCHDB.

See the full Tasks documentation for more details on task documents.

Workflows

Workflows are, quite simply, a set of configured tasks.

As with tasks, the TWS stores a workflow as a JSON document in CouchDB.

At the present time the structure of a workflow is limited to sequences of tasks, however since each task can have its own error handling and retry logic you can still handle most common operations with that simple structure.

Define a Workflow:

Workflows (flows) are CouchDB documents defining one or more tasks to be run when a job referencing that workflow is initiated.

{
    "type": "flow",
    "name":  string,
    "owner": string,
    "tasks": JSON,
    "params": JSON,
    "error": string,
    "timeout": number=60000,
    "retry": number=0,
    "enabled": boolean=true
}

Each workflow must include a type set to flow as well as a name and owner which combine to form a unique composite key for job triggering. The owner value is often correlated to unique client IDs in environments which support multiple clients.

{
    "type": "flow",
    "name": "mailtest",
    "owner": "Team TIBET",
    ...
}

In addition to the composite key values each flow requires a tasks object defining a structure and associated task configuration.

At the present time the only valid structure is sequence, meaning an ordered list of tasks. Future releases will support additional structure types allowing you to create AND, OR, and JOIN conditions for more complex choreography.

As a concrete example, here's a flow definition which will test our previously defined mailer task:

{
   "type": "flow",
   "name": "mailtest",
   "owner": "Team TIBET",
   "tasks": {
       "structure": "sequence",
       "sequence": [
           "mailer"
       ]
   }
}

As with our previous example, you can simply cut/paste the content here into a new document using Futon/Fauxton.

Be sure to save the document.

See the Flows section for more details on flow documents.

Jobs

Workflows and tasks define how something should be accomplished; what plugins are involved, what parameters are used, what options exist for error handling.

To trigger actual processing you save a job document to the TWS database.

{
    "type": "job",
    "flow": string,
    "owner": string,
    "params": JSON
}

Each job document must include a flow name and owner value which match a known workflow name/owner combination.

An optional params block can also be provided to pass parameters to the various tasks in the workflow. For flows with unique task names you can simply provide the name. For flows with multiple invocations of a task you can qualify the name with an index number as in {{task-N}} as the examples below show.

Here we're passing a simple set of parameters to the mailer task within the mailtest workflow:

{
    "type": "job",
    "flow": "mailtest",
    "owner": "Team TIBET",
    "params": {
        "mailer": {
            "to": "your@email.here",
            "subject": "TWS mail check"
        }
    }
}

For command-line testing store a job document using a tool like curl:

curl -X POST \
     -H 'Content-Type: application/json' \
     -d '{"type":"job", "flow":"mailtest","owner":"Team TIBET","params": { "mail": {"to": "your@email.here", "subject": "TWS mail check"}}}' \
     http://127.0.0.1:5984/tasks

curl -X POST \
     -H 'Content-Type: application/json' \
     -d '{"type":"job", "flow":"mailtest","owner":"Team TIBET","params": { "mail-0": {"to": "your@email.here", "subject": "TWS mail check"}}}' \
     http://127.0.0.1:5984/tasks

For testing from a client-side application POST to the TWS job endpoint defined by tds.tasks.job.uri (which defaults to _tws/jobs).

NOTE: the TWS must be active meaning that the tasks plugin is loaded in the server and tds.use_tasks is true.

Concepts

TWS operation is driven by documents and views stored in CouchDB which reference and trigger invocation of TWS plugins kept in the ~tds_tasks directory of your project.

(FUTURE) Pages vended from the /_tds/admin route provide a simple way to manage tasks, workflows, and jobs without interacting directly with CouchDB but you can always do so, provided you know the schema of the documents the TWS relies upon.

TWS-related documents fall into two broad categories: definitions and executions.

Definitions are relatively static documents which define the individual tasks and workflows the system is configured to execute. Executions are documents containing runtime execution state and history. We'll start out looking at the definition documents.

Definition

Tasks

A task is a single atomic step in a larger workflow. All flows are defined as a set of 0-N tasks. Individual tasks are defined in documents with the following form:

{
    "type": "task",
    "name": string,
    "params": JSON,
    "error": string,
    "timeout": number=15000,
    "retry": number=0,
    "flow": string,
    "plugin": string
}

Only type and name are required.

type

The required type field must always be task for a task definition.

name

The required name field must be a string identifying the task. The value here is used as a key in Flow definitions and cross-checked in params fields for any parameter values which should be defaulted or passed to the task.

NOTE: the name field must match a known TWS plugin in your project's ~tds_tasks directory. If not, you must use the plugin field to point to to the proper plugin.

params

The optional params field should contain JSON-formatted data for default values. This field's values are copied into the individual step execution documents for the task.

error

The optional error field specifies a task to run if the named task fails. Failure can occur due to timeout, retry exhaustion, or other situations. If you want to perform multiple steps on error you can reference a task which points to a flow. See the flow property.

timeout

The optional timeout field defines a millisecond count during which a job running this task must complete the task or cause the job to time out. The default is 15000 milliseconds (15 seconds).

retry

The optional retry field defines how many times this task can be retried if it fails for any reason. The default value is 0 meaning tasks do not retry by default. Note that this is a retry count, not a "total attempts" number, in other words, a retry count of 2 means the task will be attempted a total of 3 times (the initial attempt and two retry attempts).

flow

(FUTURE) The optional flow field lets you specify that this task definition is actually a reference to a named flow, meaning the task is run by running the flow in question as a job step. When this field is specified only the type, name and error fields are used, the plugin, retry, and timeout fields are ignored and the flow's task values are used.

plugin

The optional plugin field lets you define the file name of the task's plugin. This file will always be found in the project's path.tds_task directory and defaults to task name.

Flows

Flows, or more accurately "workflows", define a configuration of tasks to be executed in response to Job requests. A flow is defined as follows:

{
    "type": "flow",
    "name":  string,
    "owner": string,
    "tasks": JSON,
    "params": JSON,
    "error": string,
    "timeout": number=60000,
    "retry": number=0,
    "enabled": boolean=true
}

Only type, name, owner, and tasks are required.

type

The required type field must always be flow for a flow definition.

name

The required name field must be a unique string identifying the flow. The value here is used by a Job request's flow value to identify the workflow which should run.

owner

The required owner field defines the owner of the workflow definition. The combination of flow name and flow owner creates a composite key. Job requests much match both the name and owner of a valid flow definition to trigger a workflow to execute.

tasks

The required tasks entry contains a JSON string defining the structure of the workflow. During job execution a copy of this field is made in the job document so that jobs run independently of the workflow and maintain their script for logging/auditing.

{
    "structure": "sequence",
    "sequence": array
}

As shown above, the tasks field takes an object defining a structure which currently must be set to sequence and a matching sequence key containing an array of task names. Future versions will support other structure values.

params

The optional params field should contain JSON-formatted data for default values. As with task parameters a flow's params field contains default values for the tasks it contains. To pass parameters from a flow to a task you can use keys matching the step names. Step names follow a format of {{taskname}}-N so a workflow invocation of a task fluff as the second job step can be parameterized using a key of 'fluff-2'.

error

The optional error field specifies a task to run if the named flow fails. Failure can occur due to timeout, retry exhaustion, or other situations. If you want to perform multiple steps on error you can reference a task which points to another flow.

timeout

(FUTURE) The optional timeout field defines a millisecond count during which a job running the entire flow must complete the flow or cause the job to time out. The default is 60000 milliseconds (1 minute). Note that for flows this figure is not a hard cutoff but instead is treated as a cutoff for new task startup. In other words an in-process task may exceed this limit and, if it's the final task, cause the job to run successfully even though it may have taken slightly longer. No new tasks will trigger beyond the timeout value however.

retry

The optional retry field defines how many times this flow can be retried on failure. The default value is 0 meaning flows do not retry by default. Flows are not atomic so setting this value to a non-0 value will cause the flow's last uncompleted task to be retried.

enabled

(FUTURE) The optional enabled field can be used to disable a workflow so no jobs for that workflow will run. Jobs requesting the flow will fail until it is re-enabled. Enabled defaults to true.

Execution

TWS execution is initiated by storage of a Job document. The document stored to trigger a job request is updated throughout execution and notification of those persistent state changes drives each subsequent step in task execution.

By triggering and tracking execution via document storage and change notification the TWS ensures that job state is never lost due to process failures and that task execution can be handled across multiple observing TDS/TWS processes.

Jobs

Like tasks and flows a job is essentially just a JSON document stored in CouchDB. The difference is that a job document is updated during workflow execution to track the current state of a specific workflow invocation. An initial job request includes:

{
    "type": "job",
    "flow": string,
    "owner": string,
    "params": JSON
}

Note that params are optional but certain tasks may require them and will fail if the job does not specify appropriate values. See the documentation for specific task runners for more information.

type

The required type field must always be job for a job request.

flow

The required flow field must identify a specific flow to be executed. In combination with the owner field this must match an enabled flow definition.

owner

The required owner field must identify a specific flow owner. In combination with the flow value this must match an enabled flow definition.

params

The optional params field should contain JSON-formatted data which is parsed then used by the various tasks in a workflow as needed.

Job Updates

During execution a Job's document is modified to contain the current state of execution for a particular workflow invocation. Fields in an active job document include:

{
    "type": "job",
    "flow": string,
    "owner": string,
    "params": JSON,
    "tasks": JSON,
    "start": number,
    "state": string,
    "steps": array,
    "exit": number,
    "end": number
}

The first four fields, type, flow, owner, and params were described earlier as a Job Request however note that in an active job the params field always contains a value combining the expanded defaults and any optional paramters passed to the job.

tasks

The tasks field is a copy of the flow's JSON task string at the moment the job is initialized. This allows jobs to run independently of flow edits.

start

The start field contains the JavaScript Date.now() value captured at the moment the job is initialized. This value supports timeout checks and elapsed time calculation.

state

The state field contains the current job state. The state value depends on the task structure and task execution results however there are a few fixed states which are used by the engine internally. These states use a $$ prefix and include:

$$ready     - Job or step entry
$$timeout   - Job or step entry
$$error     - Job or step entry
$$complete  - Job or step entry

$$cancelled - (FUTURE)

$$active    - Step entry only

Jobs start out without a state field and have this value set to $$ready on completion of job initialization. Jobs with a $$ready state can be picked up by TWS task runners.

While a job has unprocessed tasks and has not failed or timed out the state is determined by the task(s) being run. By default the engine will set the state to {{taskname}}-N where N is a step count. This value provides "progress" information of a sort.

When the job has finished for any reason it will have a $$complete state and the exit field will contain an exit code defining success or failure.

If a task or flow times out the engine sets the state to $$timeout. This state triggers retry logic which looks at which level (task or flow) timed out and its retry count.

steps

The steps field is an array containing an entry for each task run by the job.

Upon completion of a job the steps array contains the sequence of actual tasks run by the job regardless of how the original flow definition may have been defined. In other words, it represents the specific path the job took through the workflow structure.

Each item in the steps array consists of an execution entry of the form:

{
    "task": string,
    "pid": number,
    "params": JSON,
    "state": string,
    "start": number,
    "end": number,
    "exit": number,
    "result": string
}

A step entry's task field contains the name of the task being run.

Each step entry has pid assigned by the TWS process which is running the task. This pid value locks the task so only that process can run it. If the task times out the retry logic will determine if the pid can be cleared to support retry.

The params field of a step entry contains any parameter data passed to the task which was run for that step. This value is assembled from job params and default data.

The state, exit, start, end fields are used just as they are for a job, containing the individual step's state, exit code, and timing data.

The result field of a step entry contains any data returned by task execution.

exit

The exit field is set upon job completion to a value similar to that used by Node.js. A 0 represents a successful job. A non-zero value represents a failed job of some type.

end

The end field contains the JavaScript Date.now() value captured at the moment the job is completed (set to the $$complete state).

Job Initialization

When the TWS machinery is triggered by a new job document any TWS-enabled process can respond to that request. Before actual task processing begins however, the job must be initialized so its tasks, parameters, and state tracking slots are in place.

To minimize thrashing due to a flurry of document updates job initialization is done in a single document update which sets the following fields:

state  - $$ready
start  - the current Date.now() value
tasks  - copy of the job flow's tasks slot (snapshot of workflow script)
params - expanded JSON representation of job and task params data values
steps  - an empty array [] for holding individual task execution records

When the job is saved with these updates all observing TWS processes are again notified of a document change and task execution begins.

Note that each job automatically gets a unique id (_id) via CouchDB.

Task Execution

Acceptance

The first check in each job update cycle is to see if the job is on a "task boundary". This is inherently true when the job is $$ready but once the first task begins processing the job's last step entry state is checked and treated as the current job state.

Jobs on task boundaries trigger the current TWS processes to compete for work by "accepting" individual tasks, signifying which process will run each one. In the current sequential structure only one task at a time can be active but future versions of the TWS should support parallel task execution depending on task and flow compatibility.

Before accepting a task the TWS process first computes the 'next task' to be run, then ensures it has the proper plugin for the task. If the process does not have the required plugin no action is taken. This check allows you to configure different TWS processes to have specific "task profiles" so you can distribute work across processes or hosts.

If no TWS processes are registered for a task the job will eventually time out.

Accepting a task occurs upon creation of a step entry with a state of $$ready and a pid value set to the Node.js process.pid value of the TWS process. Since CouchDB won't allow two processes to update the same version of a job document the first TWS process to save the entry "wins" and becomes responsible for running the task.

Invocation

When a TWS process receives a job update that's not on a task boundary, one with an unfinished step it owns (via pid), it takes action based on the current step state.

A state of $$ready means the task plugin needs to be loaded and invoked. In response, the entry's start value is set to Date.now(), the state is set to $$active, and the task entry is invoked. The document is then saved, triggering another notification cycle.

A state of $$active is essentially ignored since it's just notification of the job's being activated. The interesting activity at this point is happening in the task plugin.

With the task plugin running the owning TWS process is essentially waiting on either a timeout condition or a response from the plugin. Either will result in the TWS process acquiring new information it needs to save to the step entry.

If the task runs successfully the return data is used to set the result value, the state is set to $$complete, end is set to Date.now(), and the job is saved. Notification of this change will trigger the execution cycle to repeat with the job back on a task boundary.

Exceptions

$$timeout

If a task times out the step state is set to $$timeout and the document is saved.

Receiving a $$timeout state triggers a retry attempt.

A $$timeout state represents a task boundary, meaning that all TWS processes can act in response. This approach helps load balance in response to timeouts.

If the last step entry state is $$timeout the 'next task' computation machinery checks the retry data for the task. If a retry is appropriate the computation returns the same task, essentially repeating the step and letting all TWS processes compete for the new step.

If retry is not appropriate the step's state is set to $$error, the exit value is set to reflect a timeout error, and the document is saved, triggering error flow processing. NOTE: this update is only done by the owning pid process to minimize potential conflicts. If the original pid process is no longer running the overall job will eventually time out.

$$error

If a task throws an error the state is set to $$error and the document is saved.

Receiving an $$error state triggers error flow processing.

A $$error state represents a task boundary, meaning that all TWS processes can act in response. This approach helps redistribute work in response to errors.

If the last step state is $$error and the related task definition includes an error field that field's value is returned by the 'next task' computation machinery. All TWC processes are free to compete for this task, which can potentially 'recover' the step.

If there is no error field for the task itself the computation machinery looks to the job's flow definition and returns any error task found there. If there is no error task for the job the job state is set to $$complete along with an exit value that is non-0.

Note that a job's final state and exit values are determined by the last step executed and that may be a step in an error task flow. This implies that error tasks should be designed to consider their final step exit values.

Lather, Rinse, Repeat

In all cases once a termination event (success or failure) occurs the end timestamp, state field and exit code are updated to reflect the step's final resolution.

If the job has more tasks to run this cycle will continue, with a TWS process accepting the next task, and the next, until the job completes in some fashion.

Code

The central implementation of TWS logic is found in ~lib/tds/plugins/tasks.js. This is the TDS plugin which sets up the core observations for the TWS and which manages most notifications. You shouldn't need to interact with this plugin.

The CouchDB task, flow, and job documents specific to your project are found in ~/couch/tws. This is the location the TIBET tws command uses when pushing your workflow definitions to CouchDB. Keeping your flow definitions in your project ensures you can track them through version control.

The actual task implementations for your workflows are kept in ~/plugins/tasks. These are the Node.js modules which perform the actual work. For example, the mail-sendmail.js file actually sends your email task content using a sendmail transport. Creating additional plugins here provides you with more options you can choreograph from your workflows.

Internals

Views

The TWS interacts with CouchDB by manipulating documents and querying the document store for information. The queries used by the TWS rely on a small set of CouchDB view definitions which must exist in the target database.

You can initialize these views using tibet tws init.

Tasks

A view on the 'app' design document named 'tasks' for task name filtering:

function(doc) {
    if (doc.type === 'task') {
        emit(doc.name + '::' + doc.owner, doc);
    }
}

Flows

A view on the 'app' design document named 'flows' for flow name filtering:

function(doc) {
    if (doc.type === 'flow') {
        emit(doc.name + '::' + doc.owner, doc);
    }
}

Jobs

A view on the 'app' design document named 'jobs' for job-by-flow filtering:

function(doc) {
    if (doc.type === 'job') {
        emit(doc.flow + '::' + doc.owner, doc);
    }
}