Task Streaming
OpenWork implements a sophisticated real-time task streaming system that provides immediate feedback during task execution. The system uses Inter-Process Communication (IPC) to stream output from the OpenCode CLI process to the React UI, creating a responsive and engaging user experience.
Architecture Overview
The task streaming system consists of three main components working together:
- OpenCode CLI: Executes tasks and streams output via
node-pty - Main Process: Receives PTY output and forwards via IPC events
- Renderer Process: Displays real-time updates in the UI
Data Flow
graph TD
A[OpenCode CLI Process] -->|PTY Output| B[Main Process<br/>node-pty]
B -->|IPC Events| C[IPC Handlers]
C -->|Batched Messages| D[Renderer Process]
D -->|UI Updates| E[React Components]
E -->|User Feedback| F[Task Display]
Implementation
Source: /Users/nateb/openwork-repo/apps/desktop/src/main/ipc/handlers.ts
Message Batching System
The streaming system implements intelligent message batching to optimize performance:
// Message batching configuration
const MESSAGE_BATCH_DELAY_MS = 50;
// Per-task message batching state
interface MessageBatcher {
pendingMessages: TaskMessage[];
timeout: NodeJS.Timeout | null;
taskId: string;
flush: () => void;
}
const messageBatchers = new Map<string, MessageBatcher>();
Batch Management
function getMessageBatcher(taskId: string): MessageBatcher {
let batcher = messageBatchers.get(taskId);
if (!batcher) {
batcher = {
pendingMessages: [],
timeout: null,
taskId,
flush: () => {
if (batcher.timeout) {
clearTimeout(batcher.timeout);
}
if (batcher.pendingMessages.length > 0) {
mainWindow?.webContents.send('task:update', {
taskId,
messages: batcher.pendingMessages,
});
batcher.pendingMessages = [];
}
messageBatchers.delete(taskId);
},
};
messageBatchers.set(taskId, batcher);
}
return batcher;
}
Message Processing
// Handle task output from OpenCode CLI
function handleTaskOutput(taskId: string, data: string) {
const batcher = getMessageBatcher(taskId);
// Add message to batch
batcher.pendingMessages.push({
id: `msg_${Date.now()}_${Math.random().toString(36).substring(2, 11)}`,
type: 'output',
content: data,
timestamp: new Date().toISOString(),
});
// Schedule batch flush
if (batcher.timeout) {
clearTimeout(batcher.timeout);
}
batcher.timeout = setTimeout(() => {
batcher.flush();
}, MESSAGE_BATCH_DELAY_MS);
}
IPC Communication
Event Types
The system uses several IPC event types for different types of updates:
| Event Type | Purpose | Frequency |
|---|---|---|
task:update |
Output messages | Batched (50ms) |
task:progress |
Progress indicators | As needed |
task:status |
Status changes | On state change |
task:error |
Error messages | On errors |
Message Structure
interface TaskMessage {
id: string;
type: 'output' | 'error' | 'progress' | 'status';
content: string;
timestamp: string;
metadata?: Record<string, unknown>;
}
Event Handlers
// Register IPC handlers
ipcMain.handle('task:execute', async (_, taskConfig: TaskConfig) => {
// ... task execution logic ...
// Stream output through handlers
pty.on('data', (data) => {
handleTaskOutput(taskId, data);
});
// Handle completion
pty.on('exit', (exitCode) => {
handleTaskComplete(taskId, exitCode);
});
});
Performance Optimizations
1. Message Batching
- Delay: 50ms delay before sending messages
- Purpose: Reduces IPC event frequency
- Benefit: Smoother UI updates with less overhead
2. Text Length Limiting
const MAX_TEXT_LENGTH = 8000;
// Prevent overly large messages
function sanitizeMessage(content: string): string {
if (content.length > MAX_TEXT_LENGTH) {
return content.substring(0, MAX_TEXT_LENGTH) + '\n[Output truncated]';
}
return content;
}
3. Efficient IPC Usage
- Uses
ipcMain.handlefor request-response - Uses
webContents.sendfor one-way events - Minimizes data transfer between processes
Error Handling
Stream Errors
pty.on('error', (error) => {
console.error(`[PTY] Error for task ${taskId}:`, error);
mainWindow?.webContents.send('task:error', {
taskId,
error: error.message,
timestamp: new Date().toISOString(),
});
});
Process Termination
pty.on('exit', (exitCode) => {
console.log(`[PTY] Task ${taskId} exited with code: ${exitCode}`);
// Final flush of any pending messages
const batcher = messageBatchers.get(taskId);
if (batcher) {
batcher.flush();
}
// Send completion event
mainWindow?.webContents.send('task:complete', {
taskId,
exitCode,
timestamp: new Date().toISOString(),
});
});
User Experience Features
1. Real-time Updates
- Characters appear as they’re output
- No waiting for full task completion
- Live feedback during long-running operations
2. Auto-scrolling
The UI automatically scrolls to show the latest output:
// In React component
useEffect(() => {
if (autoScroll && messages.length > 0) {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
}
}, [messages, autoScroll]);
3. Output Streaming States
| State | Description | UI Behavior |
|——-|————-|————–|
| executing | Task is running | Show live output |
| completed | Task finished | Final output display |
| error | Task failed | Error highlighting |
| cancelled | Task cancelled | Stop streaming |
4. Message Types
// Output message
{
type: 'output',
content: 'Processing files...\n',
timestamp: '2024-01-15T10:30:00.000Z'
}
// Progress message
{
type: 'progress',
content: '75% complete',
metadata: { progress: 75, total: 100 },
timestamp: '2024-01-15T10:30:15.000Z'
}
// Error message
{
type: 'error',
content: 'File not found: missing.txt',
timestamp: '2024-01-15T10:30:30.000Z'
}
Integration with Task Manager
Task Lifecycle
class TaskManager {
private tasks = new Map<string, Task>();
async executeTask(config: TaskConfig): Promise<string> {
const taskId = generateTaskId();
// Create task
const task = {
id: taskId,
status: 'executing' as const,
startTime: new Date(),
messages: [],
// ... other task properties
};
// Store task
this.tasks.set(taskId, task);
// Start PTY process
const pty = spawnNodeProcess(command, args, env);
// Handle output streaming
pty.on('data', (data) => {
this.handleTaskOutput(taskId, data);
});
return taskId;
}
private handleTaskOutput(taskId: string, data: string) {
const task = this.tasks.get(taskId);
if (!task) return;
// Add to task's messages
task.messages.push({
type: 'output',
content: data,
timestamp: new Date().toISOString(),
});
// Stream to UI
mainWindow.webContents.send('task:update', {
taskId,
messages: [task.messages[task.messages.length - 1]],
});
}
}
Performance Considerations
Memory Management
- Messages are cached in memory during task execution
- Old messages can be pruned for long-running tasks
- History is stored separately for persistence
CPU Optimization
- Batching reduces IPC overhead
- Efficient string handling for large outputs
- Minimal processing in the streaming path
Network Considerations
- All IPC communication is local (no network latency)
- Optimized for local development and production use
- Efficient data serialization
Debugging and Monitoring
Stream Debugging
// Enable debug logging
const DEBUG_STREAMING = process.env.DEBUG_STREAMING === '1';
if (DEBUG_STREAMING) {
console.log(`[Streaming] Task ${taskId} output:`, data);
}
Performance Metrics
// Track streaming performance
const streamingMetrics = {
messagesSent: 0,
batchesSent: 0,
totalBytesSent: 0,
averageBatchSize: 0,
};
Common Issues
- Messages not appearing: Check IPC handlers and event registration
- High memory usage: Implement message pruning for long tasks
- UI lag: Adjust batching delay or optimize message handling
- Missing output: Ensure PTY process is properly configured
Future Enhancements
Planned Features
- Selective Streaming: Allow users to filter output types
- Output Search: Real-time search through streaming output
- Export Streaming: Capture streaming output to files
- Collaborative Streaming: Share live execution with others
Performance Improvements
- Web Workers: Offload message processing to Web Workers
- Compression: Compress large messages before IPC transfer
- Virtual Scrolling: Efficient rendering of large output streams
- Delta Updates: Send only changed portions of output
The task streaming system provides the foundation for a responsive and interactive development experience, ensuring users see immediate feedback from their tasks while maintaining optimal performance across different scenarios.