advanced write scenarios 2
conditional and smart writing
1. threshold-based writing
// Write only when value exceeds threshold
const currentValue = msg.payload;
const threshold = flow.get("writeThreshold") || 50;
const lastWritten = flow.get("lastWrittenValue");
// Only write if significant change
if (Math.abs(currentValue - lastWritten) > threshold) {
flow.set("lastWrittenValue", currentValue);
msg.payload = currentValue;
msg.nodeId = "ns=2;s=ProcessValue";
msg.timestamp = new Date().toISOString();
node.log(`Writing value ${currentValue} (changed by ${Math.abs(currentValue - lastWritten)})`);
return msg;
} else {
node.log(`Skipping write - change too small: ${Math.abs(currentValue - lastWritten)}`);
return null;
}
2. time-based write control
// Limit write frequency
const now = Date.now();
const lastWrite = flow.get("lastWriteTime") || 0;
const minInterval = 5000; // 5 seconds minimum between writes
if (now - lastWrite >= minInterval) {
flow.set("lastWriteTime", now);
msg.payload = msg.payload;
msg.nodeId = "ns=2;s=PeriodicValue";
return msg;
} else {
const waitTime = minInterval - (now - lastWrite);
node.log(`Write too frequent, wait ${waitTime}ms`);
// Schedule delayed write
setTimeout(() => {
node.send({
payload: msg.payload,
nodeId: "ns=2;s=PeriodicValue"
});
}, waitTime);
return null;
}
3. quality-based writing
// Write only with good quality data
const dataQuality = msg.quality || "Good";
const value = msg.payload;
if (dataQuality === "Good") {
msg.payload = value;
msg.nodeId = "ns=2;s=QualityValue";
return msg;
} else {
// Write default/safe value for bad quality
msg.payload = flow.get("safeValue") || 0;
msg.nodeId = "ns=2;s=QualityValue";
msg.dataType = "Double";
node.warn(`Poor quality data (${dataQuality}), writing safe value`);
return msg;
}
Transactional write operations
1. Multi-step transaction
// Implement transaction-like behavior
const transactionId = msg.transactionId || Date.now().toString();
const step = msg.transactionStep || 1;
switch (step) {
case 1:
// Step 1: Prepare system
msg.payload = [
{
nodeId: "ns=2;s=SystemReady",
value: false,
},
{
nodeId: "ns=2;s=TransactionId",
value: transactionId,
}
];
msg.transactionStep = 2;
break;
case 2:
// Step 2: Write main data
msg.payload = [
{
nodeId: "ns=2;s=ProcessData",
value: flow.get("transactionData"),
},
{
nodeId: "ns=2;s=DataTimestamp",
value: new Date(),
}
];
msg.transactionStep = 3;
break;
case 3:
// Step 3: Commit transaction
msg.payload = [
{
nodeId: "ns=2;s=SystemReady",
value: true,
},
{
nodeId: "ns=2;s=TransactionCommitted",
value: true,
}
];
msg.transactionStep = 0; // Reset
break;
}
return msg;
2. rollback on failure
// Handle transaction rollback
if (msg.statusCode && msg.statusCode !== "Good") {
const transactionId = msg.transactionId;
// Rollback previous writes
msg.payload = [
{
nodeId: "ns=2;s=SystemReady",
value: true, // Restore previous state
},
{
nodeId: "ns=2;s=TransactionId",
value: "", // Clear transaction
},
{
nodeId: "ns=2;s=TransactionFailed",
value: true,
}
];
node.error(`Transaction ${transactionId} failed, rolling back`);
return msg;
}
// Continue with next step if successful
return msg;
Advanced Data Transformation
1. recipe-based writing
// Transform recipe data into OPC UA writes
const recipe = msg.payload || {};
const recipeId = recipe.id || "DEFAULT";
// Load recipe template
const recipeTemplates = {
"RECIPE_A": {
temperature: "ns=2;s=Reactor.Temperature",
pressure: "ns=2;s=Reactor.Pressure",
mixingSpeed: "ns=2;s=Mixer.Speed",
duration: "ns=2;s=Process.Duration"
},
"RECIPE_B": {
temperature: "ns=2;s=Heater.Setpoint",
flowRate: "ns=2;s=Pump.FlowRate",
concentration: "ns=2;s=Dosing.Concentration"
}
};
const template = recipeTemplates[recipeId];
if (!template) {
node.error(`Unknown recipe: ${recipeId}`);
return null;
}
// Transform recipe data to write operations
const writes = [];
Object.keys(template).forEach(param => {
if (recipe[param] !== undefined) {
writes.push({
nodeId: template[param],
value: recipe[param],
dataType: "Double"
});
}
});
// Add recipe metadata
writes.push({
nodeId: "ns=2;s=ActiveRecipe.Id",
value: recipeId,
dataType: "String"
});
writes.push({
nodeId: "ns=2;s=ActiveRecipe.Timestamp",
value: new Date(),
dataType: "DateTime"
});
msg.payload = writes;
return msg;
2. Unit Conversion and Scaling
// Convert and scale values before writing
const ConversionUtils = {
// Temperature conversions
celsiusToKelvin: (c) => c + 273.15,
fahrenheitToCelsius: (f) => (f - 32) * 5/9,
// Pressure conversions
barToPsi: (bar) => bar * 14.5038,
psiToBar: (psi) => psi / 14.5038,
// Flow rate conversions
lpmToGpm: (lpm) => lpm * 0.264172,
gpmToLpm: (gpm) => gpm / 0.264172,
// Scaling with min/max
scaleValue: (value, inMin, inMax, outMin, outMax) => {
return ((value - inMin) / (inMax - inMin)) * (outMax - outMin) + outMin;
}
};
// Process input with conversions
const inputData = msg.payload || {};
const writes = [];
// Temperature conversion (Fahrenheit to Celsius)
if (inputData.temperatureF !== undefined) {
const tempC = ConversionUtils.fahrenheitToCelsius(inputData.temperatureF);
writes.push({
nodeId: "ns=2;s=Temperature",
value: tempC,
dataType: "Double"
});
}
// Pressure conversion (PSI to Bar)
if (inputData.pressurePsi !== undefined) {
const pressureBar = ConversionUtils.psiToBar(inputData.pressurePsi);
writes.push({
nodeId: "ns=2;s=Pressure",
value: pressureBar,
dataType: "Double"
});
}
// Scale analog input (0-100% to 0-10V)
if (inputData.analogPercent !== undefined) {
const voltage = ConversionUtils.scaleValue(inputData.analogPercent, 0, 100, 0, 10);
writes.push({
nodeId: "ns=2;s=AnalogOutput",
value: voltage,
dataType: "Double"
});
}
msg.payload = writes;
return msg;
3. Statistical Processing
// Statistical analysis before writing
const StatUtils = {
buffer: [],
maxSize: 10,
addValue: function(value) {
this.buffer.push(value);
if (this.buffer.length > this.maxSize) {
this.buffer.shift();
}
},
getAverage: function() {
if (this.buffer.length === 0) return 0;
return this.buffer.reduce((a, b) => a + b, 0) / this.buffer.length;
},
getStdDev: function() {
const avg = this.getAverage();
const variance = this.buffer.reduce((acc, val) => acc + Math.pow(val - avg, 2), 0) / this.buffer.length;
return Math.sqrt(variance);
},
getMin: function() {
return Math.min(...this.buffer);
},
getMax: function() {
return Math.max(...this.buffer);
}
};
// Add new value to buffer
const newValue = msg.payload;
StatUtils.addValue(newValue);
// Write statistical data
msg.payload = [
{
nodeId: "ns=2;s=Stats.Current",
value: newValue,
dataType: "Double"
},
{
nodeId: "ns=2;s=Stats.Average",
value: StatUtils.getAverage(),
dataType: "Double"
},
{
nodeId: "ns=2;s=Stats.StdDev",
value: StatUtils.getStdDev(),
dataType: "Double"
},
{
nodeId: "ns=2;s=Stats.Min",
value: StatUtils.getMin(),
dataType: "Double"
},
{
nodeId: "ns=2;s=Stats.Max",
value: StatUtils.getMax(),
dataType: "Double"
}
];
return msg;
Scheduling and Timing
1. Scheduled Write Operations
// Schedule writes based on time patterns
const schedule = {
hourly: ["ns=2;s=HourlyReport"],
daily: ["ns=2;s=DailyReport", "ns=2;s=DailyReset"],
weekly: ["ns=2;s=WeeklyMaintenance"],
monthly: ["ns=2;s=MonthlyCalibration"]
};
const now = new Date();
const hour = now.getHours();
const day = now.getDay();
const date = now.getDate();
const writes = [];
// Hourly operations (every hour at minute 0)
if (now.getMinutes() === 0) {
schedule.hourly.forEach(nodeId => {
writes.push({
nodeId: nodeId,
value: {
timestamp: now.toISOString(),
data: generateHourlyData()
},
dataType: "ExtensionObject"
});
});
}
// Daily operations (every day at 00:00)
if (hour === 0 && now.getMinutes() === 0) {
schedule.daily.forEach(nodeId => {
writes.push({
nodeId: nodeId,
value: nodeId.includes("Reset") ? 0 : generateDailyData(),
dataType: nodeId.includes("Reset") ? "Int32" : "ExtensionObject"
});
});
}
// Weekly operations (Sunday at 00:00)
if (day === 0 && hour === 0 && now.getMinutes() === 0) {
schedule.weekly.forEach(nodeId => {
writes.push({
nodeId: nodeId,
value: true,
dataType: "Boolean"
});
});
}
if (writes.length > 0) {
msg.payload = writes;
return msg;
}
return null;
2. Delayed Write Execution
// Implement delayed writes with priority queue
const DelayedWriteManager = {
queue: [],
schedule: function(writeData, delayMs, priority = 0) {
const executeTime = Date.now() + delayMs;
this.queue.push({
executeTime: executeTime,
writeData: writeData,
priority: priority
});
// Sort by execute time, then priority
this.queue.sort((a, b) => {
if (a.executeTime === b.executeTime) {
return b.priority - a.priority;
}
return a.executeTime - b.executeTime;
});
},
getReady: function() {
const now = Date.now();
const ready = [];
while (this.queue.length > 0 && this.queue[0].executeTime <= now) {
ready.push(this.queue.shift().writeData);
}
return ready;
}
};
// Add new delayed write
if (msg.delay) {
DelayedWriteManager.schedule(
{
nodeId: msg.nodeId,
value: msg.payload,
dataType: msg.dataType
},
msg.delay,
msg.priority || 0
);
return null;
}
// Execute ready writes
const readyWrites = DelayedWriteManager.getReady();
if (readyWrites.length > 0) {
msg.payload = readyWrites;
return msg;
}
return null;
Integration Patterns
1. Event-Driven Writing
// Write based on complex event patterns
const EventManager = {
events: [],
maxEvents: 100,
addEvent: function(eventType, data) {
this.events.push({
type: eventType,
data: data,
timestamp: Date.now()
});
if (this.events.length > this.maxEvents) {
this.events.shift();
}
this.checkPatterns();
},
checkPatterns: function() {
// Pattern 1: Three consecutive temperature alarms
const tempAlarms = this.events
.filter(e => e.type === "TemperatureAlarm")
.slice(-3);
if (tempAlarms.length === 3) {
this.triggerEmergencyShutdown();
}
// Pattern 2: Pressure spike followed by flow drop
const recentEvents = this.events.slice(-5);
const pressureSpike = recentEvents.find(e => e.type === "PressureSpike");
const flowDrop = recentEvents.find(e => e.type === "FlowDrop");
if (pressureSpike && flowDrop) {
this.triggerMaintenanceAlert();
}
},
triggerEmergencyShutdown: function() {
node.send({
payload: [
{
nodeId: "ns=2;s=EmergencyShutdown",
value: true,
dataType: "Boolean"
},
{
nodeId: "ns=2;s=ShutdownReason",
value: "Three consecutive temperature alarms",
dataType: "String"
}
]
});
},
triggerMaintenanceAlert: function() {
node.send({
payload: [
{
nodeId: "ns=2;s=MaintenanceRequired",
value: true,
dataType: "Boolean"
},
{
nodeId: "ns=2;s=MaintenanceReason",
value: "Pressure-flow anomaly detected",
dataType: "String"
}
]
});
}
};
// Process incoming event
const eventType = msg.eventType;
const eventData = msg.payload;
if (eventType) {
EventManager.addEvent(eventType, eventData);
}
return null;
2. State Machine Integration
// State machine driven writes
const StateMachine = {
states: {
IDLE: "idle",
STARTING: "starting",
RUNNING: "running",
STOPPING: "stopping",
ERROR: "error"
},
currentState: "idle",
transition: function(newState, context) {
const oldState = this.currentState;
this.currentState = newState;
// Execute state-specific writes
return this.getStateWrites(newState, oldState, context);
},
getStateWrites: function(state, previousState, context) {
const writes = [];
// Always write current state
writes.push({
nodeId: "ns=2;s=SystemState",
value: state,
dataType: "String"
});
writes.push({
nodeId: "ns=2;s=StateTransitionTime",
value: new Date(),
dataType: "DateTime"
});
// State-specific writes
switch (state) {
case this.states.STARTING:
writes.push({
nodeId: "ns=2;s=StartupSequence",
value: true,
dataType: "Boolean"
});
writes.push({
nodeId: "ns=2;s=MotorSpeed",
value: context.startupSpeed || 500,
dataType: "Int32"
});
break;
case this.states.RUNNING:
writes.push({
nodeId: "ns=2;s=ProductionMode",
value: true,
dataType: "Boolean"
});
writes.push({
nodeId: "ns=2;s=TargetOutput",
value: context.targetOutput || 100,
dataType: "Double"
});
break;
case this.states.STOPPING:
writes.push({
nodeId: "ns=2;s=ShutdownSequence",
value: true,
dataType: "Boolean"
});
writes.push({
nodeId: "ns=2;s=MotorSpeed",
value: 0,
dataType: "Int32"
});
break;
case this.states.ERROR:
writes.push({
nodeId: "ns=2;s=ErrorCode",
value: context.errorCode || 9999,
dataType: "Int32"
});
writes.push({
nodeId: "ns=2;s=ErrorMessage",
value: context.errorMessage || "Unknown error",
dataType: "String"
});
break;
}
return writes;
}
};
// Handle state transition
const command = msg.command;
const context = msg.context || {};
let writes = [];
switch (command) {
case "start":
writes = StateMachine.transition(StateMachine.states.STARTING, context);
break;
case "run":
writes = StateMachine.transition(StateMachine.states.RUNNING, context);
break;
case "stop":
writes = StateMachine.transition(StateMachine.states.STOPPING, context);
break;
case "error":
writes = StateMachine.transition(StateMachine.states.ERROR, context);
break;
}
if (writes.length > 0) {
msg.payload = writes;
return msg;
}
return null;
Performance Monitoring and Optimization
1. Write Performance Tracking
// Track write performance metrics
const PerformanceTracker = {
metrics: {
totalWrites: 0,
successfulWrites: 0,
failedWrites: 0,
averageLatency: 0,
latencyHistory: []
},
recordWrite: function(success, latency) {
this.metrics.totalWrites++;
if (success) {
this.metrics.successfulWrites++;
} else {
this.metrics.failedWrites++;
}
// Track latency
this.metrics.latencyHistory.push(latency);
if (this.metrics.latencyHistory.length > 100) {
this.metrics.latencyHistory.shift();
}
// Calculate rolling average
this.metrics.averageLatency = this.metrics.latencyHistory.reduce((a, b) => a + b, 0) / this.metrics.latencyHistory.length;
// Check for performance issues
this.checkPerformance();
},
checkPerformance: function() {
const successRate = this.metrics.successfulWrites / this.metrics.totalWrites;
if (successRate < 0.95) {
node.warn(`Low success rate: ${(successRate * 100).toFixed(1)}%`);
}
if (this.metrics.averageLatency > 1000) {
node.warn(`High latency: ${this.metrics.averageLatency.toFixed(0)}ms`);
}
},
getReport: function() {
return {
totalWrites: this.metrics.totalWrites,
successRate: (this.metrics.successfulWrites / this.metrics.totalWrites * 100).toFixed(1) + "%",
averageLatency: this.metrics.averageLatency.toFixed(0) + "ms",
failedWrites: this.metrics.failedWrites
};
}
};
// Record write result
const startTime = msg._startTime || Date.now();
const latency = Date.now() - startTime;
const success = msg.statusCode === "Good";
PerformanceTracker.recordWrite(success, latency);
// Periodically output performance report
if (PerformanceTracker.metrics.totalWrites % 100 === 0) {
node.log(`Performance Report: ${JSON.stringify(PerformanceTracker.getReport())}`);
}
return msg;
Best Practices for Advanced Scenarios
- Error Recovery: Implement robust error handling and recovery mechanisms
- State Management: Use proper state management for complex workflows
- Performance Monitoring: Track and optimize write performance
- Resource Management: Manage memory and connections efficiently
- Logging: Implement comprehensive logging for debugging
- Testing: Test edge cases and failure scenarios
- Documentation: Document complex logic and state transitions