Task Streaming

OpenWork implements a sophisticated real-time task streaming system that provides immediate feedback during task execution. The system uses Inter-Process Communication (IPC) to stream output from the OpenCode CLI process to the React UI, creating a responsive and engaging user experience.

Architecture Overview

The task streaming system consists of three main components working together:

  1. OpenCode CLI: Executes tasks and streams output via node-pty
  2. Main Process: Receives PTY output and forwards via IPC events
  3. Renderer Process: Displays real-time updates in the UI

Data Flow

graph TD
    A[OpenCode CLI Process] -->|PTY Output| B[Main Process<br/>node-pty]
    B -->|IPC Events| C[IPC Handlers]
    C -->|Batched Messages| D[Renderer Process]
    D -->|UI Updates| E[React Components]
    E -->|User Feedback| F[Task Display]

Implementation

Source: /Users/nateb/openwork-repo/apps/desktop/src/main/ipc/handlers.ts

Message Batching System

The streaming system implements intelligent message batching to optimize performance:

// Message batching configuration
const MESSAGE_BATCH_DELAY_MS = 50;

// Per-task message batching state
interface MessageBatcher {
  pendingMessages: TaskMessage[];
  timeout: NodeJS.Timeout | null;
  taskId: string;
  flush: () => void;
}

const messageBatchers = new Map<string, MessageBatcher>();

Batch Management

function getMessageBatcher(taskId: string): MessageBatcher {
  let batcher = messageBatchers.get(taskId);

  if (!batcher) {
    batcher = {
      pendingMessages: [],
      timeout: null,
      taskId,
      flush: () => {
        if (batcher.timeout) {
          clearTimeout(batcher.timeout);
        }

        if (batcher.pendingMessages.length > 0) {
          mainWindow?.webContents.send('task:update', {
            taskId,
            messages: batcher.pendingMessages,
          });

          batcher.pendingMessages = [];
        }

        messageBatchers.delete(taskId);
      },
    };

    messageBatchers.set(taskId, batcher);
  }

  return batcher;
}

Message Processing

// Handle task output from OpenCode CLI
function handleTaskOutput(taskId: string, data: string) {
  const batcher = getMessageBatcher(taskId);

  // Add message to batch
  batcher.pendingMessages.push({
    id: `msg_${Date.now()}_${Math.random().toString(36).substring(2, 11)}`,
    type: 'output',
    content: data,
    timestamp: new Date().toISOString(),
  });

  // Schedule batch flush
  if (batcher.timeout) {
    clearTimeout(batcher.timeout);
  }

  batcher.timeout = setTimeout(() => {
    batcher.flush();
  }, MESSAGE_BATCH_DELAY_MS);
}

IPC Communication

Event Types

The system uses several IPC event types for different types of updates:

Event Type Purpose Frequency
task:update Output messages Batched (50ms)
task:progress Progress indicators As needed
task:status Status changes On state change
task:error Error messages On errors

Message Structure

interface TaskMessage {
  id: string;
  type: 'output' | 'error' | 'progress' | 'status';
  content: string;
  timestamp: string;
  metadata?: Record<string, unknown>;
}

Event Handlers

// Register IPC handlers
ipcMain.handle('task:execute', async (_, taskConfig: TaskConfig) => {
  // ... task execution logic ...

  // Stream output through handlers
  pty.on('data', (data) => {
    handleTaskOutput(taskId, data);
  });

  // Handle completion
  pty.on('exit', (exitCode) => {
    handleTaskComplete(taskId, exitCode);
  });
});

Performance Optimizations

1. Message Batching

  • Delay: 50ms delay before sending messages
  • Purpose: Reduces IPC event frequency
  • Benefit: Smoother UI updates with less overhead

2. Text Length Limiting

const MAX_TEXT_LENGTH = 8000;

// Prevent overly large messages
function sanitizeMessage(content: string): string {
  if (content.length > MAX_TEXT_LENGTH) {
    return content.substring(0, MAX_TEXT_LENGTH) + '\n[Output truncated]';
  }
  return content;
}

3. Efficient IPC Usage

  • Uses ipcMain.handle for request-response
  • Uses webContents.send for one-way events
  • Minimizes data transfer between processes

Error Handling

Stream Errors

pty.on('error', (error) => {
  console.error(`[PTY] Error for task ${taskId}:`, error);

  mainWindow?.webContents.send('task:error', {
    taskId,
    error: error.message,
    timestamp: new Date().toISOString(),
  });
});

Process Termination

pty.on('exit', (exitCode) => {
  console.log(`[PTY] Task ${taskId} exited with code: ${exitCode}`);

  // Final flush of any pending messages
  const batcher = messageBatchers.get(taskId);
  if (batcher) {
    batcher.flush();
  }

  // Send completion event
  mainWindow?.webContents.send('task:complete', {
    taskId,
    exitCode,
    timestamp: new Date().toISOString(),
  });
});

User Experience Features

1. Real-time Updates

  • Characters appear as they’re output
  • No waiting for full task completion
  • Live feedback during long-running operations

2. Auto-scrolling

The UI automatically scrolls to show the latest output:

// In React component
useEffect(() => {
  if (autoScroll && messages.length > 0) {
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
  }
}, [messages, autoScroll]);

3. Output Streaming States

| State | Description | UI Behavior | |——-|————-|————–| | executing | Task is running | Show live output | | completed | Task finished | Final output display | | error | Task failed | Error highlighting | | cancelled | Task cancelled | Stop streaming |

4. Message Types

// Output message
{
  type: 'output',
  content: 'Processing files...\n',
  timestamp: '2024-01-15T10:30:00.000Z'
}

// Progress message
{
  type: 'progress',
  content: '75% complete',
  metadata: { progress: 75, total: 100 },
  timestamp: '2024-01-15T10:30:15.000Z'
}

// Error message
{
  type: 'error',
  content: 'File not found: missing.txt',
  timestamp: '2024-01-15T10:30:30.000Z'
}

Integration with Task Manager

Task Lifecycle

class TaskManager {
  private tasks = new Map<string, Task>();

  async executeTask(config: TaskConfig): Promise<string> {
    const taskId = generateTaskId();

    // Create task
    const task = {
      id: taskId,
      status: 'executing' as const,
      startTime: new Date(),
      messages: [],
      // ... other task properties
    };

    // Store task
    this.tasks.set(taskId, task);

    // Start PTY process
    const pty = spawnNodeProcess(command, args, env);

    // Handle output streaming
    pty.on('data', (data) => {
      this.handleTaskOutput(taskId, data);
    });

    return taskId;
  }

  private handleTaskOutput(taskId: string, data: string) {
    const task = this.tasks.get(taskId);
    if (!task) return;

    // Add to task's messages
    task.messages.push({
      type: 'output',
      content: data,
      timestamp: new Date().toISOString(),
    });

    // Stream to UI
    mainWindow.webContents.send('task:update', {
      taskId,
      messages: [task.messages[task.messages.length - 1]],
    });
  }
}

Performance Considerations

Memory Management

  • Messages are cached in memory during task execution
  • Old messages can be pruned for long-running tasks
  • History is stored separately for persistence

CPU Optimization

  • Batching reduces IPC overhead
  • Efficient string handling for large outputs
  • Minimal processing in the streaming path

Network Considerations

  • All IPC communication is local (no network latency)
  • Optimized for local development and production use
  • Efficient data serialization

Debugging and Monitoring

Stream Debugging

// Enable debug logging
const DEBUG_STREAMING = process.env.DEBUG_STREAMING === '1';

if (DEBUG_STREAMING) {
  console.log(`[Streaming] Task ${taskId} output:`, data);
}

Performance Metrics

// Track streaming performance
const streamingMetrics = {
  messagesSent: 0,
  batchesSent: 0,
  totalBytesSent: 0,
  averageBatchSize: 0,
};

Common Issues

  1. Messages not appearing: Check IPC handlers and event registration
  2. High memory usage: Implement message pruning for long tasks
  3. UI lag: Adjust batching delay or optimize message handling
  4. Missing output: Ensure PTY process is properly configured

Future Enhancements

Planned Features

  1. Selective Streaming: Allow users to filter output types
  2. Output Search: Real-time search through streaming output
  3. Export Streaming: Capture streaming output to files
  4. Collaborative Streaming: Share live execution with others

Performance Improvements

  1. Web Workers: Offload message processing to Web Workers
  2. Compression: Compress large messages before IPC transfer
  3. Virtual Scrolling: Efficient rendering of large output streams
  4. Delta Updates: Send only changed portions of output

The task streaming system provides the foundation for a responsive and interactive development experience, ensuring users see immediate feedback from their tasks while maintaining optimal performance across different scenarios.


Back to top

OpenWork Documentation - Community documentation for accomplish-ai/openwork