@jonloucks/concurrency-ts is a TypeScript library that provides powerful abstractions for managing asynchronous concurrency, state machines, and completion patterns. It offers type-safe primitives for building concurrent applications with guaranteed completion semantics.
Open interfacenpm install @jonloucks/concurrency-ts
The Concurrency interface is the main entry point for creating concurrency primitives. It provides factory methods for creating Waitable, Completable, and StateMachine instances, along with guaranteed execution patterns.
Key responsibilities:
A Waitable<T> provides a mutable reference that allows threads to wait until the value satisfies a given condition. It combines supplier, consumer, and notification capabilities.
Key characteristics:
Open for resource managementPrimary interfaces:
WaitableSupplier<T> - Provides the current valueWaitableConsumer<T> - Accepts new valuesWaitableNotify<T> - Notifies waiting threads of changesA Completable<T> observes a single activity from start to finish, tracking its completion state and final value. It ensures that completion notifications are delivered exactly once.
Key characteristics:
Completion states:
INCOMPLETE - Activity has not yet completedSUCCEEDED - Activity completed successfully with a valueFAILED - Activity failed (potentially with an error)CANCELED - Activity was canceled before completionA StateMachine<T> manages user-defined states with rules restricting state transitions. It provides a type-safe way to model complex state-based behavior.
Key characteristics:
WaitableNotifyimport { createConcurrency, Concurrency } from '@jonloucks/concurrency-ts';
// Create a Concurrency instance
const concurrency: Concurrency = createConcurrency({});
// Open it for use (returns AutoClose for cleanup)
using closeConcurrency = concurrency.open();
// Use the concurrency instance...
import { Waitable } from '@jonloucks/concurrency-ts';
// Create a waitable with an initial value
const waitable: Waitable<number> = concurrency.createWaitable({
initialValue: 42
});
using closeWaitable = waitable.open();
// Get the current value
const value = waitable.supply(); // 42
// Set a new value
waitable.consume(100);
// Get the notification interface
const notify = waitable.notify();
import { Completable } from '@jonloucks/concurrency-ts';
// Create a completable
const completable: Completable<string> = concurrency.createCompletable({});
using closeCompletable = completable.open();
// Check if completed
const isCompleted = completable.isCompleted(); // false
// Notify of successful completion
completable.notify({
state: 'SUCCEEDED',
value: 'Task completed successfully'
});
// Get the completion
const completion = completable.getCompletion();
console.log(completion?.value); // 'Task completed successfully'
import { StateMachine, Rule } from '@jonloucks/concurrency-ts';
// Define states
type AppState = 'INITIAL' | 'RUNNING' | 'PAUSED' | 'STOPPED';
// Create state machine
const stateMachine: StateMachine<AppState> = concurrency.createStateMachine({
initialValue: 'INITIAL',
states: ['INITIAL', 'RUNNING', 'PAUSED', 'STOPPED'],
getStateRules: (state: AppState): Rule<AppState>[] => {
switch (state) {
case 'INITIAL':
return [{ event: 'start', allowedStates: ['RUNNING'] }];
case 'RUNNING':
return [
{ event: 'pause', allowedStates: ['PAUSED'] },
{ event: 'stop', allowedStates: ['STOPPED'] }
];
case 'PAUSED':
return [
{ event: 'resume', allowedStates: ['RUNNING'] },
{ event: 'stop', allowedStates: ['STOPPED'] }
];
case 'STOPPED':
return []; // Terminal state
default:
return [];
}
}
});
using closeStateMachine = stateMachine.open();
// Get current state
console.log(stateMachine.getState()); // 'INITIAL'
// Transition to a new state
stateMachine.setState('start', 'RUNNING');
console.log(stateMachine.getState()); // 'RUNNING'
// Check if transition is allowed
const canPause = stateMachine.isTransitionAllowed('pause', 'PAUSED'); // true
const canStop = stateMachine.isTransitionAllowed('invalid', 'STOPPED'); // false
import { createConcurrency, Concurrency, ConcurrencyConfig } from '@jonloucks/concurrency-ts';
import { Contracts } from '@jonloucks/contracts-ts';
// Basic configuration
const concurrency1: Concurrency = createConcurrency({});
// With custom contracts
const contracts: Contracts = /* ... */;
const concurrency2: Concurrency = createConcurrency({
contracts: contracts
});
ConcurrencyConfig:
contracts?: Contracts - Optional contracts instance for validationimport { Waitable, WaitableConfig } from '@jonloucks/concurrency-ts';
// Without initial value
const waitable1: Waitable<string> = concurrency.createWaitable<string>({});
// With initial value
const waitable2: Waitable<number> = concurrency.createWaitable<number>({
initialValue: 0
});
// With custom contracts
const waitable3: Waitable<boolean> = concurrency.createWaitable<boolean>({
contracts: contracts,
initialValue: false
});
**WaitableConfig
contracts?: Contracts - Optional contracts for validationinitialValue?: T - Optional initial value// Supply (get) the current value
const currentValue = waitable.supply();
// Consume (set) a new value
waitable.consume(newValue);
// Get the notification interface
const notifier = waitable.notify();
// Wait for a condition to be met (via notification)
notifier.wait((value) => value > 10);
import { Completable, CompletableConfig } from '@jonloucks/concurrency-ts';
// Basic completable
const completable1: Completable<string> = concurrency.createCompletable({});
// With initial value
const completable2: Completable<number> = concurrency.createCompletable({
initialValue: 0
});
**CompletableConfig
contracts?: Contracts - Optional contracts for validationinitialValue?: T - Optional initial value// Check if completed
if (!completable.isCompleted()) {
// Notify of completion
completable.notify({
state: 'SUCCEEDED',
value: result
});
}
// Get the completion
const completion = completable.getCompletion();
if (completion) {
console.log('State:', completion.state);
console.log('Value:', completion.value);
}
// Observe state changes
const stateNotifier = completable.notifyState();
// Observe value changes
const valueNotifier = completable.notifyValue();
// Register completion callback
completable.onCompletion((completion) => {
if (completion.state === 'SUCCEEDED') {
console.log('Success:', completion.value);
} else if (completion.state === 'FAILED') {
console.error('Failed:', completion.value);
}
});
import { StateMachine, StateMachineConfig, Rule } from '@jonloucks/concurrency-ts';
type MyState = 'STATE_A' | 'STATE_B' | 'STATE_C';
const stateMachine: StateMachine<MyState> = concurrency.createStateMachine({
initialValue: 'STATE_A',
states: ['STATE_A', 'STATE_B', 'STATE_C'],
getStateRules: (state: MyState): Rule<MyState>[] => {
// Define allowed transitions for each state
if (state === 'STATE_A') {
return [{ event: 'move', allowedStates: ['STATE_B'] }];
} else if (state === 'STATE_B') {
return [
{ event: 'forward', allowedStates: ['STATE_C'] },
{ event: 'back', allowedStates: ['STATE_A'] }
];
}
return [];
}
});
**StateMachineConfig
contracts?: Contracts - Optional contracts for validationinitialValue: T - Required initial state (must be in states array)states: Array<T> - Array of all valid statesgetStateRules?: (state: T) => Rule<T>[] - Function to get transition rules for a state**Rule
event: string - The event name that triggers the transitionallowedStates: T[] - Array of states this transition can move to// Get current state
const currentState = stateMachine.getState();
// Check if a state exists
const hasState = stateMachine.hasState('STATE_B'); // true
// Check if transition is allowed
const isAllowed = stateMachine.isTransitionAllowed('move', 'STATE_B');
// Set state (must be allowed transition)
const changed = stateMachine.setState('move', 'STATE_B');
// Execute a transition with callback
const result = stateMachine.transition({
event: 'forward',
state: 'STATE_C',
execute: (fromState, toState) => {
console.log(`Transitioning from ${fromState} to ${toState}`);
return { success: true };
}
});
// Observe state changes
const stateNotifier = stateMachine.notify();
// Check if state machine has reached a final state
const isComplete = stateMachine.isCompleted();
The completeLater method ensures guaranteed execution of completion callbacks, even if the delegate fails to take ownership:
import { OnCompletion } from '@jonloucks/concurrency-ts';
// Create an OnCompletion callback
const onCompletion: OnCompletion<string> = {
onCompletion: (completion) => {
console.log('Completion received:', completion);
}
};
// Delegate that will receive the callback
const delegate = (callback: OnCompletion<string>) => {
// Process the callback
// If this throws or fails, the callback still gets a FAILED completion
callback.onCompletion({ state: 'SUCCEEDED', value: 'Done' });
};
// Guaranteed execution
concurrency.completeLater(onCompletion, delegate);
Key guarantees:
The completeNow method provides guaranteed execution for synchronous completion blocks:
import { OnCompletion } from '@jonloucks/concurrency-ts';
const onCompletion: OnCompletion<number> = {
onCompletion: (completion) => {
console.log('Completion:', completion);
}
};
// Execute a block that produces a completion value
const result = concurrency.completeNow(onCompletion, () => {
// Your synchronous work here
const value = performCalculation();
if (value < 0) {
throw new Error('Invalid value');
}
return value;
});
// Result contains the value if successful
console.log('Result:', result);
Key guarantees:
Advanced state machine usage with transition callbacks:
import { Transition } from '@jonloucks/concurrency-ts';
type WorkflowState = 'PENDING' | 'IN_PROGRESS' | 'COMPLETED' | 'FAILED';
const workflow: StateMachine<WorkflowState> = concurrency.createStateMachine({
initialValue: 'PENDING',
states: ['PENDING', 'IN_PROGRESS', 'COMPLETED', 'FAILED'],
getStateRules: (state) => {
switch (state) {
case 'PENDING':
return [{ event: 'start', allowedStates: ['IN_PROGRESS'] }];
case 'IN_PROGRESS':
return [
{ event: 'complete', allowedStates: ['COMPLETED'] },
{ event: 'fail', allowedStates: ['FAILED'] }
];
default:
return [];
}
}
});
using closeWorkflow = workflow.open();
// Execute transition with side effects
const transitionData = workflow.transition<{ timestamp: number }>({
event: 'start',
state: 'IN_PROGRESS',
execute: (fromState, toState) => {
console.log(`Starting workflow: ${fromState} -> ${toState}`);
// Perform side effects
notifyMonitoring('workflow_started');
// Return transition metadata
return {
timestamp: Date.now()
};
}
});
console.log('Workflow started at:', transitionData?.timestamp);
Use the using keyword or manually call close() to ensure proper cleanup:
// Recommended: using keyword (automatic cleanup)
{
using closeConcurrency = concurrency.open();
// Use concurrency...
} // Automatically closed
// Alternative: manual cleanup
const closeConcurrency = concurrency.open();
try {
// Use concurrency...
} finally {
closeConcurrency.close();
}
Leverage TypeScript’s type system for compile-time safety:
// Define explicit types for your states
type ConnectionState = 'DISCONNECTED' | 'CONNECTING' | 'CONNECTED' | 'ERROR';
const connection: StateMachine<ConnectionState> = concurrency.createStateMachine({
initialValue: 'DISCONNECTED',
states: ['DISCONNECTED', 'CONNECTING', 'CONNECTED', 'ERROR'],
// TypeScript ensures all states are ConnectionState
getStateRules: (state: ConnectionState) => { /* ... */ }
});
Use completeLater and completeNow patterns for guaranteed completion:
// BAD: Completion might not be called if error occurs
try {
const result = await doAsyncWork();
onCompletion.onCompletion({ state: 'SUCCEEDED', value: result });
} catch (error) {
// onCompletion might not be called!
}
// GOOD: Guaranteed completion
concurrency.completeLater(onCompletion, async (callback) => {
const result = await doAsyncWork();
callback.onCompletion({ state: 'SUCCEEDED', value: result });
});
Design state machines with clear terminal states:
type TaskState = 'CREATED' | 'RUNNING' | 'COMPLETED' | 'FAILED';
// Terminal states should not have outgoing transitions
getStateRules: (state: TaskState) => {
switch (state) {
case 'CREATED':
return [{ event: 'start', allowedStates: ['RUNNING'] }];
case 'RUNNING':
return [
{ event: 'succeed', allowedStates: ['COMPLETED'] },
{ event: 'fail', allowedStates: ['FAILED'] }
];
case 'COMPLETED':
case 'FAILED':
return []; // Terminal states - no outgoing transitions
}
}
Handle errors appropriately in completion callbacks:
const completable = concurrency.createCompletable<Result>();
completable.onCompletion((completion) => {
if (completion.state === 'SUCCEEDED') {
processSuccess(completion.value);
} else if (completion.state === 'FAILED') {
logError('Task failed', completion.value);
} else if (completion.state === 'CANCELED') {
logInfo('Task was canceled');
}
});
Use meaningful conditions when waiting on waitable values:
const counter: Waitable<number> = concurrency.createWaitable({ initialValue: 0 });
// Wait for specific condition
const notifier = counter.notify();
notifier.wait((value) => value >= 10);
// Update value from another part of the code
counter.consume(counter.supply() + 1);
Track the progress of asynchronous tasks:
import { createConcurrency, Completable } from '@jonloucks/concurrency-ts';
const concurrency = createConcurrency({});
using closeConcurrency = concurrency.open();
async function trackTask<T>(
taskName: string,
task: () => Promise<T>
): Promise<T> {
const completable: Completable<T> = concurrency.createCompletable({});
using closeCompletable = completable.open();
completable.onCompletion((completion) => {
console.log(`Task ${taskName}:`, completion.state);
});
return concurrency.completeNow(completable, () => {
return task();
});
}
// Use it
const result = await trackTask('fetchData', async () => {
const response = await fetch('https://api.example.com/data');
return response.json();
});
Model a network connection with a state machine:
import { createConcurrency, StateMachine, Rule } from '@jonloucks/concurrency-ts';
type ConnectionState =
| 'DISCONNECTED'
| 'CONNECTING'
| 'CONNECTED'
| 'DISCONNECTING'
| 'ERROR';
const concurrency = createConcurrency({});
using closeConcurrency = concurrency.open();
const connectionStateMachine: StateMachine<ConnectionState> =
concurrency.createStateMachine({
initialValue: 'DISCONNECTED',
states: ['DISCONNECTED', 'CONNECTING', 'CONNECTED', 'DISCONNECTING', 'ERROR'],
getStateRules: (state: ConnectionState): Rule<ConnectionState>[] => {
switch (state) {
case 'DISCONNECTED':
return [{ event: 'connect', allowedStates: ['CONNECTING'] }];
case 'CONNECTING':
return [
{ event: 'connected', allowedStates: ['CONNECTED'] },
{ event: 'error', allowedStates: ['ERROR'] },
{ event: 'cancel', allowedStates: ['DISCONNECTED'] }
];
case 'CONNECTED':
return [
{ event: 'disconnect', allowedStates: ['DISCONNECTING'] },
{ event: 'error', allowedStates: ['ERROR'] }
];
case 'DISCONNECTING':
return [{ event: 'disconnected', allowedStates: ['DISCONNECTED'] }];
case 'ERROR':
return [{ event: 'reset', allowedStates: ['DISCONNECTED'] }];
default:
return [];
}
}
});
using closeConnection = connectionStateMachine.open();
// Use the state machine
async function connect() {
if (connectionStateMachine.getState() === 'DISCONNECTED') {
connectionStateMachine.setState('connect', 'CONNECTING');
try {
await performConnection();
connectionStateMachine.setState('connected', 'CONNECTED');
} catch (error) {
connectionStateMachine.setState('error', 'ERROR');
}
}
}
Use waitables for producer-consumer patterns:
import { createConcurrency, Waitable } from '@jonloucks/concurrency-ts';
interface Queue<T> {
items: T[];
}
const concurrency = createConcurrency({});
using closeConcurrency = concurrency.open();
const queue: Waitable<Queue<string>> = concurrency.createWaitable({
initialValue: { items: [] }
});
using closeQueue = queue.open();
// Producer
async function produce(item: string) {
const current = queue.supply();
current.items.push(item);
queue.consume(current);
}
// Consumer
async function consume(): Promise<string | undefined> {
const notifier = queue.notify();
// Wait for items to be available
await notifier.wait((q) => q.items.length > 0);
const current = queue.supply();
const item = current.items.shift();
queue.consume(current);
return item;
}
// Usage
await produce('message1');
await produce('message2');
const item1 = await consume(); // 'message1'
const item2 = await consume(); // 'message2'
Combine multiple primitives for complex workflows:
import {
createConcurrency,
Completable,
StateMachine
} from '@jonloucks/concurrency-ts';
import { AutoClose } from '@jonloucks/contracts-ts';
type WorkflowStep = 'INIT' | 'STEP1' | 'STEP2' | 'STEP3' | 'DONE';
class Workflow {
private concurrency = createConcurrency({});
private closeConcurrency: AutoClose;
private stateMachine: StateMachine<WorkflowStep>;
private stepCompletables: Map<WorkflowStep, Completable<any>>;
constructor() {
this.closeConcurrency = this.concurrency.open();
this.stateMachine = this.concurrency.createStateMachine({
initialValue: 'INIT',
states: ['INIT', 'STEP1', 'STEP2', 'STEP3', 'DONE'],
getStateRules: (state) => {
switch (state) {
case 'INIT':
return [{ event: 'start', allowedStates: ['STEP1'] }];
case 'STEP1':
return [{ event: 'next', allowedStates: ['STEP2'] }];
case 'STEP2':
return [{ event: 'next', allowedStates: ['STEP3'] }];
case 'STEP3':
return [{ event: 'complete', allowedStates: ['DONE'] }];
default:
return [];
}
}
});
this.stepCompletables = new Map();
}
async executeStep(step: WorkflowStep, work: () => Promise<any>) {
const completable = this.concurrency.createCompletable({});
this.stepCompletables.set(step, completable);
using closeCompletable = completable.open();
return this.concurrency.completeNow(completable, async () => {
const result = await work();
return result;
});
}
async run() {
this.stateMachine.setState('start', 'STEP1');
await this.executeStep('STEP1', async () => {
console.log('Executing Step 1');
await delay(1000);
});
this.stateMachine.setState('next', 'STEP2');
await this.executeStep('STEP2', async () => {
console.log('Executing Step 2');
await delay(1000);
});
this.stateMachine.setState('next', 'STEP3');
await this.executeStep('STEP3', async () => {
console.log('Executing Step 3');
await delay(1000);
});
this.stateMachine.setState('complete', 'DONE');
console.log('Workflow completed!');
}
close() {
this.closeConcurrency.close();
}
}
function delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Usage
const workflow = new Workflow();
try {
await workflow.run();
} finally {
workflow.close();
}
For questions, issues, or feature requests, please:
This project is licensed under the MIT License - see the LICENSE file for details.