Transform
The Transform Stage is used to configure advanced transformations using JavaScript. During execution the expression can be used modify the value, extract metadata from the value for use in other stages, and incorporate additional information into the event value.
The Transform Stage is similar to a Custom Condition in that they both use JavaScript to manipulate the data. However, Transforms only operate on the event data. Use Transforms to manipulate events in a pipeline, where the event could be raw input data or modeled data. Use Custom Conditions to clean up raw input data before modeling or sending it to a pipeline.
Transform Expression
Required javascript expression. At a minimum the expression is expected to use stage.setValue()
to specify the event value for future stages
Using Transform Expressions
The transform expression executes JavaScript to modify the event value and/or operate on metadata about the value that can be used in Output stages. Input references (using {{}}
) in
Pipeline expression fields is not currently supported. JavaScript in the transform expression has access to the following objects:
Event
The event
variable contains read-only information about input event. Its type is described below.
class Event {
/**
* Unique ID of the parent event in form XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
**/
parent: string;
/**
* Unique ID of the current event in form XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
**/
id: string;
/**
* The event value. This is the value written to the pipeline for the first stage and value emitted by previous stages for all other stages.
**/
value: any;
/**
* Alias set by the Source when the value was input.
**/
alias: string;
/**
* Describes the quality of the value (typically Good or Bad)
**/
quality: string;
/**
* Date when the value was read.
**/
timestamp: Date;
/**
* Describes the value type. E.g Int8, Int32, ComplexData.
**/
type: string;
/**
* Describes the object name For ComplexData types.
**/
name: string;
/**
* Key:value object added to the event by previous stages in the pipeline.
**/
metadata: object;
}
Stage
The stage
variable has methods that can be used to control the state of the stage during transform evaluation.
class Stage {
/**
* The name of the stage
**/
name: string
/**
* The name of the pipeline
**/
pipelineName: string
/**
* The names of the stages the current stage outputs to
**/
outputs: string[]
/**
* Sets an event value. Expressions that use setValue multiple times will send multiple events to the next stage of the pipeline.
**/
function setValue(value: any);
/**
* Sets the buffer key used for data buffering. This is applicable to buffer and filter stages only.
**/
function setBufferKey (key: string);
/**
* Sets the properties in a Write New stage. These are merged with properties in the stage configuration.
**/
function setWriteProperties (properties: object);
/**
* Sets a single property in a Write New stage. These are merged with properties in the stage configuration.
**/
function setWriteProperty (key: string, value: any);
/**
* Replaces the event metadata object sent to the next stage of the pipeline.
**/
function setAllMetadata(value: any);
/**
* Upsert metadata using the supplied key name. Setting a null/undefined value will clear the metadata associated with the supplied key.
**/
function setMetadata(key: string, value: any);
/**
* Stops the pipeline from executing any further events down the current pipeline path
**/
function stop();
}
System
The System
variable contains read-only information about the hub. Its type is described below.
class System {
/**
* Read-only object containing all the System Variables defined for the hub instance.
**/
Variables: object;
/**
* Read-only object containing all the variable defined in the Environment (when enabled)
**/
Environment: object;
/**
* Read-only internal values
**/
Internal : {
DateTime: Date
}
}
State
The state
variable defines functions that can be used to access data that is persisted across pipeline executions and application restarts.
Below are some examples and the available methods. See Use Pipeline State for more examples.
state.pipeline.set("myStateKey", 100);
let myStateKeyValue = state.pipeline.get("myStateKey") // value is 100
let myUnknownKey = state.pipeline.get("unknown", 0) // value is 0
class State {
pipeline: {
/**
* Set a variable accessible from all stages. Setting a key value to null or undefined removes it from state storage.
**/
function set (key: string, value: any);
/**
* Get a variable accessible from all stages. The function returns undefined if the variable has not been set.
**/
function get (key: string) : any | undefined;
/**
* Get a variable accessible from all stages or return the defaultValue when the variable is undefined.
**/
function get (key: string, defaultValue: any) : any;
}
}
Example Transform Expressions
The following are examples of transform expressions.
Rename Properties in the Event
In this example assume the event payload contains OPC tags.
{
"XD_R32_CONVERYOR8_INLET1_CV": 13.2,
"XD_R32_CONVERYOR9_INLET1_CV": 12.9,
"XD_R32_CONVERYOR10_INLET1_CV": 13.0
}
We want to rename these tags to add “PUMP” into the name between “CONVERYORX_INLET”. The following transform performs this change.
let eventOut = {}
// Iterate each tag name
for (const tagName in event.value){
// Split up to the 3rd "_"
let tagNameParts = tagName.split("_");
let newTagName = tagNameParts.slice(0,3).join("_") + "_PUMP_" + tagNameParts.slice(3,tagNameParts.length).join("_")
// Assign the new tagname the existing value
eventOut[newTagName] = event.value[tagName]
}
// Set the value of the event
stage.setValue(eventOut);
The output is the following.
{
"XD_R32_CONVERYOR8_PUMP_INLET1_CV": 13.2,
"XD_R32_CONVERYOR9_PUMP_INLET1_CV": 12.9,
"XD_R32_CONVERYOR10_PUMP_INLET1_CV": 13.0
}
Change Event based on State
In this example assume the event payload contains the state of the machine and we want to change the values based on the state. This could be an MQTT or SparkplugB payload.
{
"line": "line1",
"asset": "press1",
"stateOn": true,
"pressCount": 101,
"lastCount": 98,
"temp": 97.8
}
The following transform changes the values based on the machine on/off state.
let latestValue = event.value
if (!latestValue.stateOn){
// Zero out the temp and signal no parts were made
latestValue.temp = 0
latestValue["newParts"] = 0
}else{
// Calculate new parts
latestValue["newParts"] = latestValue.pressCount - latestValue.lastCount;
}
// Set the value of the event
stage.setValue(latestValue);
Break Event into Multiple Events
Assume we have the following payload, and we want to break this up by assetID so that moving forward each event contains only events for an asset.
[
{
"assetId": 1,
"value": 27
},
{
"assetId": 2,
"value": 55
},
{
"assetId": 1,
"value": 28
}
]
The following transform will break up the array.
let dataOut = {}
// Iterate each asset
for (const assetValue of event.value){
if (dataOut[assetValue.assetId] === undefined){
// New assetId, create the array
dataOut[assetValue.assetId] = []
}
// Add the value
dataOut[assetValue.assetId].push(assetValue)
}
// Create an event for each unique assetId
for (const asset in dataOut){
stage.setValue(dataOut[asset])
}
The first event has the following payload.
[
{
"assetId": 1,
"value": 27
},
{
"assetId": 1,
"value": 28
}
]
The second event has the following payload.
[
{
"assetId": 2,
"value": 55
}
]
Use Pipeline State
Assume we have the following payload, and we only want to run the pipeline when the mode
value changes to ERROR
{
"assetId": "asset1",
"value1": 27,
"mode": "RUNNING" // possible values include "RUNNING", "ONBREAK", "ERROR"
}
The following expression compares the event mode to the mode stored in the state and stops the pipeline unless the mode
changes to "ERROR"
// Create a state key using asset id
let key = event.value.assetId + ".mode";
let newMode = event.value.mode;
// Load previous state using the current mode if mode has never been saved
let previousMode = state.pipeline.get(key, event.value.mode)
// Save new stage
state.pipeline.set(key, newMode);
// No change, stop
if (newMode === previousMode) {
stage.stop();
return;
}
// Not error, stop
if (newMode !== "ERROR") {
stage.stop();
return
}
Values stored in state using state.pipeline.set
can be removed and edited from the State page.