TIBET Workflow System (TWS)
Wins
- Powerful, micro-service engine running on Node.js.
- Job state changes are persistent and trigger actions.
- Event-driven task and job transitions via change feed.
- Fully extensible via micro-services written in simple JS.
- Fully supported as part of the overall TIBET ecosystem.
Contents
Concepts
Cookbook
Preliminaries
- Install CouchDB
- Make sure CouchDB is running
- Clone/init a TWS-capable project
- Activate the TWS 'tasks' plugin
- Install task plugin dependencies
- Confirm configuration values for CouchDB
- Confirm environment variables for CouchDB
- Initialize the TWS database
- Initialize TWS tasks and flows
- Start the TIBET Data Server
Tasks
Workflows
Jobs
Code
Internals
Concepts
The TIBET Workflow System (TWS) is a micro-service execution subsystem that leverages the TDS and CouchDB to provide simple, scalable task execution out of the box.
CouchDB documents hold the definitions of your workflows, their tasks, and job requests. Storing a new job document triggers the engine and activates the associated tasks.
Individual flows
are composed of one or more tasks
. Sample tasks include
sending email, storing data via AWS, or performing credit card transactions.
Writing your own tasks is straightforward, letting you keep your server-side
logic highly composable.
Job Activation
In response to storage of a job document CouchDB's changes feed notifies
observers and the TDS tasks
plugin takes over, triggering job execution. The
actual work is performed by loadable modules in the TDS so the scope of a task
is only limited by what you can do from Node.js.
The output below shows the TDS startup sequence and output from the TWS specific
to execution of a sample
workflow:

Job Submission
Below we show a sample.json
document targeting the sample
workflow (which
has a single task, send an email). The parameters for the email shown on the
left use TIBET binding syntax (double square braces) which the tibet tws submit
command will prompt for:

Submitting jobs can be done via curl
, the TIBET client, the TIBET CLI, or any
other HTTP client interface. If it can PUT a JSON document to an HTTP REST API
it can submit a TWS job.
The TIBET Workflow System reflects the real power of TIBET+CouchDB integration as well as the power of integration between TIBET's CLI and other components. With the TWS you can be up and running with a scalable microservices solution in minutes.

Each concrete task relies on a corresponding task
definition stored in
CouchDB. Tasks are combined by virtue of flow
documents which serve
as blueprints for how to execute a sequence of tasks, including any default
parameters for the flow and its tasks.
Execution of a flow
is triggered by storage of a job
document. In response,
the TWS will look up the job's referenced flow
and use it as a template for
executing the various tasks for that job. All flow/task transitions are
responses to document updates.
Each job
gets a unique copy of the task
and flow
information so you have
full audit trails for every job
specific to the flow definition at the time
the job ran.
Cookbook
Preliminaries
Install CouchDB
If you don't have a copy of CouchDB running (or a cloud-based instance available from Cloudant or a similar hosting service) you need to install CouchDB.
Follow the instructions at http://couchdb.apache.org/ to install and configure CouchDB, or the instructions at the cloud-based hosting service of your choice.
Make sure CouchDB is running
Access your CouchDB instance via the administration page
(http://127.0.0.1:5984/_utils) or use curl
or a similar command to confirm
CouchDB is accessible:
$ curl -X GET http://127.0.0.1:5984
{"couchdb":"Welcome","version":"2.0.0","vendor":{"name":"The Apache Software Foundation"}}
If your CouchDB installation isn't running/accessible you'll want to get that sorted out before moving forward.
Clone/init a TWS-capable project using default
or couch
dna
The TWS relies on the TDS so you need to create a project using either the
default
or couch
dna to get a project set up with a TIBET server.
While both default
and couch
projects support the TWS only couch
dna also
supports running your application directly from CouchDB (as a 'couchapp'). If
you don't intend to run your application in couchapp form you should choose
default
dna.
For example, we create and initialize a default
project in the example below:
$ tibet clone {{appname}}
$ cd {{appname}}
$ tibet init
Alternatively you can create a CouchDB-focused project via --dna couch
:
$ tibet clone {{appname}} --dna couch
$ cd {{appname}}
$ tibet init
Activate the TDS tasks plugin
With your project initialized, you need to enable the TDS tasks
plugin. This
will tell the TDS that it should load and activate the server middleware which
processes TWS operations.
To activate the TWS task plugin set TIBET's tds.use_tasks
configuration flag
to true:
$ tibet config tds.use_tasks=true
Install task plugin dependencies
Depending on the tasks you intend to use you will need to add certain package dependencies to your project.
When your project is created using default
or couch
dna the package.json
file doesn't automatically include the modules for the various built-in task
samples. You need to use npm
to install the modules required by the specific
task plugins your workflows will activate to avoid seeing job failures.
mail-smtp
The built-in mail-smtp
task uses nodemailer
to send mail via an SMTP host.
If you want to use that task in your workflows install the dependencies via:
npm install --save nodemailer
s3-upload
The built-in 's3-upload' task uses aws-sdk
(Amazon AWS NodeJS) to communicate
with S3. Install that package using:
npm install --save aws-sdk
Repeat this sequence for any other modules you intend to script into flows. See the documention on individual TWS tasks for details on their requirements.
Confirm configuration settings for TWS
The TWS relies on a CouchDB database to store flow, task, and job documents and their associated views. CouchDB connection and database information is specified by TIBET configuration parameters and/or environment variables.
If you're running a local instance that doesn't require authentication data you can rely entirely on TIBET configuration values. These include:
// tds.couch params are for couch-based TIBET applications
TDS.getcfg('tds.couch.scheme') || 'http'
TDS.getcfg('tds.couch.host') || '127.0.0.1';
TDS.getcfg('tds.couch.port') || 5984;
// tds.tasks params are for TWS operation (including separate DB)
TDS.getcfg('tds.tasks.db_name') || TDS.getcfg('tds.couch.db_name') || TDS.getcfg('npm.name');
TDS.getcfg('tds.tasks.db_app') || TDS.getcfg('tds.couch.db_app') || 'tibet';
You can list (and adjust) the various settings using tibet config
:
$ tibet config tds.tasks
{
"tds.tasks.db_app": "tws",
"tds.tasks.db_name": "tasks",
...snipped...
}
$ tibet config tds.couch
{
"tds.couch.db_app": "tibet",
"tds.couch.host": "127.0.0.1",
"tds.couch.port": "5984",
"tds.couch.scheme": "http",
...snipped...
}
With the default settings you can expect to be using a CouchDB database named
{{appname}}_tasks
and a design document named _design/tws
for storage of the
various jobs, tasks, and flows in your TWS configuration.
Use the tibet config
command to adjust these values to meet your needs.
Confirm environment variables for TWS
There are several commonly used environment variables which override any settings found in the TIBET configuration values.
Environment variables are recommended for anything involving a username or password since they are less likely to be checked in to a repository inadvertently.
COUCH_URL // a full URL including any user/pass information
COUCH_USER // optional username for basic authentication
COUCH_KEY // optional api_key (supercedes COUCH_USER)
COUCH_PASS // optional password for basic authentication
COUCH_DATABASE // the name of the database to connect to
COUCH_APPNAME // TIBET-specific. What _design/{{application name}} ?
If your CouchDB server is configured to required a username/password to log in
ensure the COUCH_USER
and COUCH_PASS
environment variables are set.
For example, you might use the following lines, with admin/password set to their proper values for your CouchDB installation:
export COUCH_USER=admin
# export COUCH_KEY=api_key (optional, but supercedes COUCH_USER)
export COUCH_PASS=password
NOTE: Error messages including You are not a server admin
mean you need
to export or otherwise ensure that the proper COUCH_USER
and COUCH_PASS
environment variables are set to match a valid CouchDB admin account.
Initialize the TWS database
Before you can use the TWS you need to initialize the task engine database using
the tibet tws init
command. This command will check for the TWS database,
the appropriate design document, and the baseline view configuration required
for the engine to operate correctly.
NOTE that tibet tws init
is idempotent so running it on an existing database
won't harm anything, it will simply insert any missing views as needed.
Be aware that when running the tibet tws init
command you will be prompted to
confirm the current CouchDB parameters as determined by the current
configuration and environment settings. When these prompts appear any username
and password information is automatically hidden.
$ tibet tws init
CouchDB base [http://127.0.0.1:5984] ?
using base url 'http://127.0.0.1:5984'.
Database name [{{appname}}_tasks] ?
Application name [tws] ?
confirming TWS database
creating TWS database
confirming TWS design doc
inserting TWS design doc
confirming TWS views
inserting TWS view block
TWS initialized
Initialize TWS tasks and flows
When you create a new default
or couch
project your application folder will
include a root couch
directory containing at least one subdirectory named
tws
. The couch/tws
directory contains the template files used by the tibet
tws
command when initializing and pushing data to CouchDB.
In particular, the couch/tws
directory will contain flows
, tasks
, and
jobs
directories, each of which contains sample JSON documents for each object
type.
The tibet tws
command has subcommands for list
, push
, view
et. al. which
allow you to list and push (insert/update) documents from each of the tws
subdirectories found in your project's couch
directory.
Pushing Tasks/Flows/Jobs
Before you can execute a successful job in the TWS you must upload at least one task and one flow document into the TWS's database. Your first step should be to review the sample JSON documents and adjust them to contain the proper names and parameters you'd like for your particular configuration.
Once you have updated each JSON file to meet your needs you can push them
using the tibet tws
command.
To push all the flow and task definitions use tibet tws push --map
$ tibet tws push --map --no-confirm
updating: /Users/ss/temporary/dbtest/couch/tws/tasks/mailer.json
skipping: /Users/ss/temporary/dbtest/couch/tws/flows/_sample_flow.json
updating: /Users/ss/temporary/dbtest/couch/tws/flows/mailtest.json
updating: /Users/ss/temporary/dbtest/couch/tws/flows/mailtwo.json
/Users/ss/temporary/dbtest/couch/tws/tasks/mailer.json =>
{
"ok": true,
"id": "e52b072b50c597298c310ea9c8005852",
"rev": "6-fe94b4637b5d66320583f8ff0e63910d"
}
/Users/ss/temporary/dbtest/couch/tws/flows/mailtwo.json =>
{
"ok": true,
"id": "e52b072b50c597298c310ea9c80050b2",
"rev": "5-fe65ba812451a632a0b2a17756816f44"
}
/Users/ss/temporary/dbtest/couch/tws/flows/mailtest.json =>
{
"ok": true,
"id": "31ee427eac51b94470bdfd14da00715e",
"rev": "14-f3644ec1c48ce27336ec5915dc83f356"
}
To push a single file just reference that file directly. The TIBET command line
can resolve TIBET virtual paths and the value ~tws
references the root of all
TWS templates. This allows you to use paths of the form ~tws/tasks/{{file}}
or
~tws/flows/{{file}}
to refer to individual templates.
To push the current definition for the task named s3
, for example, use.
tibet tws push '~tws/tasks/s3.json'
.
As with most tibet tws
commands you will be prompted to confirm the current
CouchDB parameters. If you want to skip these prompts add --no-confirm
to your
command line:
$ tibet tws push '~tws/tasks/s3.json'
CouchDB base [http://127.0.0.1:5984] ?
using base url 'http://127.0.0.1:5984'.
Database name [{{appname}}_tasks] ?
Application name [tws] ?
{
"ok": true,
"id": "e52b072b50c597298c310ea9c80033a7",
"rev": "1-5eeca1d84d794fd4ea289c6dafed98d7"
}
If your JSON document includes a value for _id
that ID will be used to check
for an existing document. If no _id
field exists the document will be updated
after the push with the id
value returned by the insert operation. In the
output above that would be e52b072b50c597298c310ea9c80033a7
. This approach
allows the system to coordinate JSON documents on disk with their counterparts
in CouchDB, avoiding duplication with multiple push
operations.
Use the tibet tws push {{name}}
approach to upload and adjust
your task and flow definitions as needed to configure your TWS engine.
Listing Existing Tasks/Flows/Jobs
You can list the existing tasks, flows, and jobs by using the
--list
flag on the tibet tws
command.
$ tibet tws list --tasks
mail => mail-smtp (31ee427eac51b94470bdfd14da0050cd)
s3 => s3-upload (e52b072b50c597298c310ea9c80033a7)
Add the --verbose
flag to see entire documents:
$ tibet tws list --tasks --verbose
[{
"_id": "31ee427eac51b94470bdfd14da0050cd",
"_rev": "4-da2d9b9650e08b4b715f99b4baaa16ea",
"type": "task",
"name": "mail",
"plugin": "mail-smtp",
"params": {
"smtp": {
"service": "gmail",
"auth": {
"user": "useraftertibetencrypt",
"pass": "passaftertibetencrypt"
}
},
"from": "mailer@technicalpursuit.com",
"html": "<code>{{params}}</code>"
}
}, {
"_id": "e52b072b50c597298c310ea9c80033a7",
"_rev": "1-5eeca1d84d794fd4ea289c6dafed98d7",
"type": "task",
"name": "s3",
"plugin": "s3-upload",
"params": {
"auth": {
"id": "idaftertibetencrypt",
"secret": "secretaftertibetencrypt"
},
"region": "us-east-1",
"bucket": "default",
"key": "",
"body": ""
}
}]
NOTE that the various authentication values here are encrypted via the tibet
encrypt
command. See the detailed documentation further in this document for
more information.
Start the TIBET Data Server:
$ tibet start
Tasks
Tasks are the smallest unit of workflow in the TWS. You can think of the individual tasks as micro-services…small, composable units of work.
A task definition is a JSON document which provides a unique identifier and
parameters for how to invoke a TWS task plugin such as the built-in mail-smtp
or s3-upload
task. While TIBET includes some pre-built tasks it's fully
expected that you will create and compose your own tasks and workflows.
Define a Task:
Tasks are JSON documents stored in CouchDB and used as steps in a Flow.
{
"type": "task",
"name": string,
"params": JSON,
"error": string,
"timeout": number=15000,
"retry": number=0,
"flow": string,
"plugin": string
}
Each concrete task must include a type
of task
, and a unique name
.
{
"type": "task",
"name": "mail-smtp"
}
Each task name
must either match a TWS plugin from the project's tasks
directory or the task must include a plugin
slot to reference a different TWS
plugin to process this task type. Here we're using mailer
as the name and
pointing that task name to the mail-smtp
plugin for actual operation.
{
"type": "task",
"name": "mailer",
"plugin": "mail-smtp"
}
In addition to type
, name
, and plugin
, you can define parameters for the
task which serve as defaults for all instances of that task as shown below.
mail-smtp
As a concrete example, here's a task definition for the built-in mail-smtp
plugin.
Note we've included parameters for the task including an HTML template with
embedded substitution syntax. The mail-smtp
plugin will use this template to
merge actual values with the template at runtime:
{
"type": "task",
"name": "mail",
"plugin": "mail-smtp",
"params": {
"smtp": {
"service": "gmail",
"auth": {
"user": "someuser@yourdomain.com",
"pass": "tibetencryptoutputforpassword"
}
},
"from": "mailer@yourdomain.com",
"html": "<code>{{json params}}</code>"
}
}
Using Futon/Fauxton you can cut/paste the content of the above document, adjusting the specifics to match your particular SMTP server requirements.
Be sure to save the document.
For security reasons the value for params.smtp.auth.pass
should always
be stored in encrypted form. See tibet help encrypt
for more information on
how to use the TIBET CLI to produce an encrypted password value you can store
safely. (The TDS will automatically decrypt these values using environment
values to drive the decryption).
NEVER STORE UNENCRYPTED CREDENTIALS IN COUCHDB.
If you need more information on the smtp
parameter options see the
documentation on nodemailer
, the underlying SMTP transport used for this task.
Common alterations would be to change the service
or use a specific host
and
port
.
s3-upload
Another concrete example for the built-in s3-upload
plugin is shown below:
{
"type": "task",
"name": "s3",
"plugin": "s3-upload",
"params": {
"auth": {
"id": "870e53f68dfa6b2fd7144762b8460ccf07a54b79",
"secret": "ff337df0a69f044ba5260672be7d1dc01aa57b7e4a0c1b51d059d98d0448ff958ebf2bbc21c3ca2b"
},
"region": "us-east-1",
"bucket": "default",
"key": "",
"body": ""
}
}
As shown in our example above you can also provide values for the other
parameters used by the s3-upload
task including the region, bucket name, key,
and body (which can be a handlebars template which will be merged with the task
parameter data).
NOTE: As with the mail-smtp
example the auth values shown here for id
and
secret
are the result of running tibet encrypt
on your actual S3 credential
values.
NEVER STORE UNENCRYPTED CREDENTIALS IN COUCHDB.
See the full Tasks documentation for more details on task documents.
Workflows
Workflows are, quite simply, a set of configured tasks.
As with tasks, the TWS stores a workflow as a JSON document in CouchDB.
At the present time the structure
of a workflow is limited to sequences of
tasks, however since each task can have its own error handling and retry logic
you can still handle most common operations with that simple structure.
Define a Workflow:
Workflows (flows) are CouchDB documents defining one or more tasks to be run when a job referencing that workflow is initiated.
{
"type": "flow",
"name": string,
"owner": string,
"tasks": JSON,
"params": JSON,
"error": string,
"timeout": number=60000,
"retry": number=0,
"enabled": boolean=true
}
Each workflow must include a type
set to flow
as well as a name
and owner
which combine to form a unique composite key for job triggering. The owner value
is often correlated to unique client IDs in environments which support multiple
clients.
{
"type": "flow",
"name": "mailtest",
"owner": "Team TIBET",
...
}
In addition to the composite key values each flow requires a tasks
object
defining a structure
and associated task configuration.
At the present time the only valid structure
is sequence
, meaning an ordered
list of tasks. Future releases will support additional structure types allowing
you to create AND, OR, and JOIN conditions for more complex choreography.
As a concrete example, here's a flow definition which will test our previously
defined mailer
task:
{
"type": "flow",
"name": "mailtest",
"owner": "Team TIBET",
"tasks": {
"structure": "sequence",
"sequence": [
"mailer"
]
}
}
As with our previous example, you can simply cut/paste the content here into a new document using Futon/Fauxton.
Be sure to save the document.
See the Flows section for more details on flow documents.
Jobs
Workflows and tasks define how something should be accomplished; what plugins are involved, what parameters are used, what options exist for error handling.
To trigger actual processing you save a job
document to the TWS database.
{
"type": "job",
"flow": string,
"owner": string,
"params": JSON
}
Each job document must include a flow
name and owner
value which match a
known workflow name/owner combination.
An optional params
block can also be provided to pass parameters to the
various tasks in the workflow. For flows with unique task names you can simply
provide the name. For flows with multiple invocations of a task you can qualify
the name with an index number as in {{task-N}}
as the examples below show.
Here we're passing a simple set of parameters to the mailer
task within the
mailtest
workflow:
{
"type": "job",
"flow": "mailtest",
"owner": "Team TIBET",
"params": {
"mailer": {
"to": "your@email.here",
"subject": "TWS mail check"
}
}
}
For command-line testing store a job document using a tool like curl
:
curl -X POST \
-H 'Content-Type: application/json' \
-d '{"type":"job", "flow":"mailtest","owner":"Team TIBET","params": { "mail": {"to": "your@email.here", "subject": "TWS mail check"}}}' \
http://127.0.0.1:5984/tasks
curl -X POST \
-H 'Content-Type: application/json' \
-d '{"type":"job", "flow":"mailtest","owner":"Team TIBET","params": { "mail-0": {"to": "your@email.here", "subject": "TWS mail check"}}}' \
http://127.0.0.1:5984/tasks
For testing from a client-side application POST
to the TWS job endpoint
defined by tds.tasks.job.uri
(which defaults to _tws/jobs
).
NOTE: the TWS must be active meaning that the tasks
plugin is loaded in the
server and tds.use_tasks
is true.
Concepts
TWS operation is driven by documents and views stored in CouchDB which
reference and trigger invocation of TWS plugins kept in
the ~tds_tasks
directory of your project.
(FUTURE) Pages vended from the /_tds/admin
route provide a simple way to manage
tasks, workflows, and jobs without interacting directly with CouchDB but you can
always do so, provided you know the schema of the documents the TWS
relies upon.
TWS-related documents fall into two broad categories: definitions and executions.
Definitions are relatively static documents which define the individual tasks and workflows the system is configured to execute. Executions are documents containing runtime execution state and history. We'll start out looking at the definition documents.
Definition
Tasks
A task is a single atomic step in a larger workflow. All flows are defined as a set of 0-N tasks. Individual tasks are defined in documents with the following form:
{
"type": "task",
"name": string,
"params": JSON,
"error": string,
"timeout": number=15000,
"retry": number=0,
"flow": string,
"plugin": string
}
Only type
and name
are required.
type
The required type
field must always be task
for a task definition.
name
The required name
field must be a string identifying the task. The value here
is used as a key in Flow definitions and cross-checked in params
fields for
any parameter values which should be defaulted or passed to the task.
NOTE: the name
field must match a known TWS plugin in your project's
~tds_tasks
directory. If not, you must use the plugin
field to point to
to the proper plugin.
params
The optional params
field should contain JSON-formatted data for default
values. This field's values are copied into the individual step execution
documents for the task.
error
The optional error
field specifies a task to run if the named task fails.
Failure can occur due to timeout, retry exhaustion, or other situations. If you
want to perform multiple steps on error you can reference a task which points to
a flow. See the flow
property.
timeout
The optional timeout
field defines a millisecond count during which a job
running this task must complete the task or cause the job to time out. The
default is 15000
milliseconds (15 seconds).
retry
The optional retry
field defines how many times this task can be retried if it
fails for any reason. The default value is 0 meaning tasks do not retry by
default. Note that this is a retry
count, not a "total attempts" number, in
other words, a retry count of 2 means the task will be attempted a total of 3
times (the initial attempt and two retry attempts).
flow
(FUTURE) The optional flow
field lets you specify that this task definition is
actually a reference to a named flow, meaning the task is run by running the
flow in question as a job step. When this field is specified only the type
,
name
and error
fields are used, the plugin
, retry
, and timeout
fields
are ignored and the flow's task values are used.
plugin
The optional plugin
field lets you define the file name of the task's plugin.
This file will always be found in the project's path.tds_task
directory and
defaults to task name
.
Flows
Flows, or more accurately "workflows", define a configuration of tasks to be executed in response to Job requests. A flow is defined as follows:
{
"type": "flow",
"name": string,
"owner": string,
"tasks": JSON,
"params": JSON,
"error": string,
"timeout": number=60000,
"retry": number=0,
"enabled": boolean=true
}
Only type
, name
, owner
, and tasks
are required.
type
The required type
field must always be flow
for a flow definition.
name
The required name
field must be a unique string identifying the flow. The
value here is used by a Job request's flow
value to identify the workflow
which should run.
owner
The required owner
field defines the owner of the workflow definition. The
combination of flow name and flow owner creates a composite key. Job requests
much match both the name and owner of a valid flow definition to trigger a
workflow to execute.
tasks
The required tasks
entry contains a JSON string defining the structure of the
workflow. During job execution a copy of this field is made in the job
document so that jobs run independently of the workflow and maintain their
script for logging/auditing.
{
"structure": "sequence",
"sequence": array
}
As shown above, the tasks
field takes an object defining a structure
which
currently must be set to sequence
and a matching sequence
key containing an
array of task names. Future versions will support other structure
values.
params
The optional params
field should contain JSON-formatted data for
default values. As with task
parameters a flow's params
field contains
default values for the tasks it contains. To pass parameters from a
flow to a task you can use keys matching the step names. Step names follow a
format of {{taskname}}-N so a workflow invocation of a task fluff
as the
second job step can be parameterized using a key of 'fluff-2'.
error
The optional error
field specifies a task to run if the named flow fails.
Failure can occur due to timeout, retry exhaustion, or other situations. If you
want to perform multiple steps on error you can reference a task which points to
another flow.
timeout
(FUTURE) The optional timeout
field defines a millisecond count during which a
job running the entire flow must complete the flow or cause the job to time out.
The default is 60000
milliseconds (1 minute). Note that for flows this figure
is not a hard cutoff but instead is treated as a cutoff for new task startup. In
other words an in-process task may exceed this limit and, if it's the final
task, cause the job to run successfully even though it may have taken slightly
longer. No new tasks will trigger beyond the timeout value however.
retry
The optional retry
field defines how many times this flow can be retried on
failure. The default value is 0 meaning flows do not retry by default. Flows
are not atomic so setting this value to a non-0 value will cause
the flow's last uncompleted task to be retried.
enabled
(FUTURE) The optional enabled
field can be used to disable a workflow so no
jobs for that workflow will run. Jobs requesting the flow will fail until it is
re-enabled. Enabled defaults to true.
Execution
TWS execution is initiated by storage of a Job document. The document stored to trigger a job request is updated throughout execution and notification of those persistent state changes drives each subsequent step in task execution.
By triggering and tracking execution via document storage and change notification the TWS ensures that job state is never lost due to process failures and that task execution can be handled across multiple observing TDS/TWS processes.
Jobs
Like tasks and flows a job is essentially just a JSON document stored in CouchDB. The difference is that a job document is updated during workflow execution to track the current state of a specific workflow invocation. An initial job request includes:
{
"type": "job",
"flow": string,
"owner": string,
"params": JSON
}
Note that params
are optional but certain tasks may require them and will fail
if the job does not specify appropriate values. See the documentation for
specific task runners for more information.
type
The required type
field must always be job
for a job request.
flow
The required flow
field must identify a specific flow to be executed. In
combination with the owner
field this must match an enabled flow definition.
owner
The required owner
field must identify a specific flow owner. In combination
with the flow
value this must match an enabled flow definition.
params
The optional params
field should contain JSON-formatted data which is parsed
then used by the various tasks in a workflow as needed.
Job Updates
During execution a Job's document is modified to contain the current state of execution for a particular workflow invocation. Fields in an active job document include:
{
"type": "job",
"flow": string,
"owner": string,
"params": JSON,
"tasks": JSON,
"start": number,
"state": string,
"steps": array,
"exit": number,
"end": number
}
The first four fields, type
, flow
, owner
, and params
were described
earlier as a Job Request however note that in an active job the
params
field always contains a value combining the expanded defaults and any
optional paramters passed to the job.
tasks
The tasks
field is a copy of the flow
's JSON task string at the moment the
job is initialized. This allows jobs to run independently of flow
edits.
start
The start
field contains the JavaScript Date.now()
value captured at the
moment the job is initialized. This value supports timeout checks and
elapsed time calculation.
state
The state
field contains the current job state. The state value depends on the
task structure
and task execution results however there are a few fixed states
which are used by the engine internally. These states use a $$
prefix and
include:
$$ready - Job or step entry
$$timeout - Job or step entry
$$error - Job or step entry
$$complete - Job or step entry
$$cancelled - (FUTURE)
$$active - Step entry only
Jobs start out without a state field and have this value set to $$ready
on
completion of job initialization. Jobs with a $$ready
state can be picked
up by TWS task runners.
While a job has unprocessed tasks and has not failed or timed out the state
is determined by the task(s) being run. By default the engine will set the state
to {{taskname}}-N
where N is a step count. This value provides "progress"
information of a sort.
When the job has finished for any reason it will have a $$complete
state and
the exit
field will contain an exit code defining success or failure.
If a task or flow times out the engine sets the state to $$timeout
. This
state triggers retry logic which looks at which level (task or flow) timed
out and its retry
count.
steps
The steps
field is an array containing an entry for each task run by the job.
Upon completion of a job the steps
array contains the sequence of actual tasks
run by the job regardless of how the original flow definition may have been
defined. In other words, it represents the specific path the job took through
the workflow structure.
Each item in the steps
array consists of an execution entry of the form:
{
"task": string,
"pid": number,
"params": JSON,
"state": string,
"start": number,
"end": number,
"exit": number,
"result": string
}
A step entry's task
field contains the name of the task being run.
Each step entry has pid
assigned by the TWS process which is running the
task. This pid value locks the task so only that process can run it. If the task
times out the retry logic will determine if the pid
can be cleared to support
retry.
The params
field of a step entry contains any parameter data passed to the
task which was run for that step. This value is assembled from job params
and
default data.
The state
, exit
, start
, end
fields are used just as they are for a job,
containing the individual step's state, exit code, and timing data.
The result
field of a step entry contains any data returned by task execution.
exit
The exit
field is set upon job completion to a value similar to that used by
Node.js. A 0
represents a successful job. A non-zero value represents a
failed job of some type.
end
The end
field contains the JavaScript Date.now()
value captured at the
moment the job is completed (set to the $$complete
state).
Job Initialization
When the TWS machinery is triggered by a new job document any TWS-enabled process can respond to that request. Before actual task processing begins however, the job must be initialized so its tasks, parameters, and state tracking slots are in place.
To minimize thrashing due to a flurry of document updates job initialization is done in a single document update which sets the following fields:
state - $$ready
start - the current Date.now() value
tasks - copy of the job flow's tasks slot (snapshot of workflow script)
params - expanded JSON representation of job and task params data values
steps - an empty array [] for holding individual task execution records
When the job is saved with these updates all observing TWS processes are again notified of a document change and task execution begins.
Note that each job automatically gets a unique id (_id
) via CouchDB.
Task Execution
Acceptance
The first check in each job update cycle is to see if the job is on a "task
boundary". This is inherently true when the job is $$ready
but once the first
task begins processing the job's last step entry state
is checked and treated
as the current job state.
Jobs on task boundaries trigger the current TWS processes to compete for work by
"accepting" individual tasks, signifying which process will run each one. In the
current sequential
structure only one task at a time can be active but future
versions of the TWS should support parallel task execution depending on task
and flow compatibility.
Before accepting a task the TWS process first computes the 'next task' to be run, then ensures it has the proper plugin for the task. If the process does not have the required plugin no action is taken. This check allows you to configure different TWS processes to have specific "task profiles" so you can distribute work across processes or hosts.
If no TWS processes are registered for a task the job will eventually time out.
Accepting a task occurs upon creation of a step entry with a state
of
$$ready
and a pid
value set to the Node.js process.pid
value of the TWS
process. Since CouchDB won't allow two processes to update the same version of a
job document the first TWS process to save the entry "wins" and becomes
responsible for running the task.
Invocation
When a TWS process receives a job update that's not on a task boundary, one with
an unfinished step it owns (via pid
), it takes action based on the current
step state
.
A state
of $$ready
means the task plugin needs to be loaded and invoked. In
response, the entry's start
value is set to Date.now()
, the state
is
set to $$active
, and the task entry is invoked. The document is then saved,
triggering another notification cycle.
A state
of $$active
is essentially ignored since it's just notification of
the job's being activated. The interesting activity at this point is happening
in the task plugin.
With the task plugin running the owning TWS process is essentially waiting on either a timeout condition or a response from the plugin. Either will result in the TWS process acquiring new information it needs to save to the step entry.
If the task runs successfully the return data is used to set the result
value,
the state
is set to $$complete
, end
is set to Date.now()
, and the job is
saved. Notification of this change will trigger the execution cycle to repeat
with the job back on a task boundary.
Exceptions
$$timeout
If a task times out the step state
is set to $$timeout
and the document is
saved.
Receiving a $$timeout
state triggers a retry attempt.
A $$timeout
state represents a task boundary, meaning that all TWS processes
can act in response. This approach helps load balance in response to timeouts.
If the last step entry state is $$timeout
the 'next task' computation
machinery checks the retry
data for the task. If a retry is appropriate the
computation returns the same task, essentially repeating the step and letting
all TWS processes compete for the new step.
If retry is not appropriate the step's state
is set to $$error
, the exit
value is set to reflect a timeout error, and the document is saved, triggering
error flow processing. NOTE: this update is only done by the owning pid
process to minimize potential conflicts. If the original pid
process is no
longer running the overall job will eventually time out.
$$error
If a task throws an error the state
is set to $$error
and the document
is saved.
Receiving an $$error
state triggers error flow processing.
A $$error
state represents a task boundary, meaning that all TWS processes
can act in response. This approach helps redistribute work in response to
errors.
If the last step state is $$error
and the related task definition includes
an error
field that field's value is returned by the 'next task' computation
machinery. All TWC processes are free to compete for this task, which can
potentially 'recover' the step.
If there is no error
field for the task itself the computation machinery looks
to the job's flow definition and returns any error
task found there. If there
is no error
task for the job the job state is set to $$complete
along with
an exit
value that is non-0.
Note that a job's final state
and exit
values are determined by the last
step executed and that may be a step in an error
task flow. This implies that
error
tasks should be designed to consider their final step exit values.
Lather, Rinse, Repeat
In all cases once a termination event (success or failure) occurs the end
timestamp, state
field and exit
code are updated to reflect the step's final
resolution.
If the job has more tasks to run this cycle will continue, with a TWS process accepting the next task, and the next, until the job completes in some fashion.
Code
The central implementation of TWS logic is found in ~lib/tds/plugins/tasks.js
.
This is the TDS plugin which sets up the core observations for the TWS and which
manages most notifications. You shouldn't need to interact with this plugin.
The CouchDB task, flow, and job documents specific to your project are found in
~/couch/tws
. This is the location the TIBET tws
command uses when pushing
your workflow definitions to CouchDB. Keeping your flow definitions in your
project ensures you can track them through version control.
The actual task implementations for your workflows are kept in
~/plugins/tasks
. These are the Node.js modules which perform the actual work.
For example, the mail-sendmail.js
file actually sends your email task content
using a sendmail transport. Creating additional plugins here provides you with
more options you can choreograph from your workflows.
Internals
Views
The TWS interacts with CouchDB by manipulating documents and querying the document store for information. The queries used by the TWS rely on a small set of CouchDB view definitions which must exist in the target database.
You can initialize these views using tibet tws init
.
Tasks
A view on the 'app' design document named 'tasks' for task name filtering:
function(doc) {
if (doc.type === 'task') {
emit(doc.name + '::' + doc.owner, doc);
}
}
Flows
A view on the 'app' design document named 'flows' for flow name filtering:
function(doc) {
if (doc.type === 'flow') {
emit(doc.name + '::' + doc.owner, doc);
}
}
Jobs
A view on the 'app' design document named 'jobs' for job-by-flow filtering:
function(doc) {
if (doc.type === 'job') {
emit(doc.flow + '::' + doc.owner, doc);
}
}