I'm implementing a priority queue for asynchronous tasks in TypeScript (using React for the UI). The queue is continuously filled with tasks, and there’s a button in the UI that can either trigger a priority change or add a new task to the queue. Given that JavaScript runs on a single thread (with concurrency but no parallelism), I’m wondering if I need to protect the queue from potential race conditions. Specifically:
- If the queue is being modified (tasks added or priorities changed) by the UI, and at the same time a context switch occurs after finding the index to change but before sorting, could it result in a priority being changed for the wrong task?
- Should I use a mutex or some other synchronization method to ensure thread safety?
I understand that the task execution is single-threaded. I’ve also considered using the p-queue package , but I’m not sure if it provides protection against race conditions. Any insight would be greatly appreciated, as I’m still learning about JavaScript/TypeScript concurrency models.
my implementation:
import { Subject } from "rxjs/internal/Subject";
import { Subscription } from "rxjs/internal/Subscription";
import { from, of, Observer, Observable } from "rxjs";
import { mergeMap } from "rxjs/internal/operators/mergeMap";
import { catchError, map, tap } from "rxjs/operators";
export type PriorityQueueItem<T> = {
readonly id: string;
priority: number;
task: () => Promise<T>;
};
export type PriorityQueue<T> = {
addTask: (item: PriorityQueueItem<T>) => void;
addTasksGroup: (items: PriorityQueueItem<T>[]) => void;
removeTask: (id: string) => boolean;
terminate: () => boolean;
subscribe: (subscriber: Subscriber<T>) => () => void;
updateTaskPriority: (id: string, newPriority: number) => boolean;
start: () => void;
};
export type Subscriber<T> = {
onSuccessfulTask: (result: T) => void;
onFailedTask: (error: Error) => void;
onAllTasksCompleted?: () => void;
}
type TaskResult<T> = {
error?: Error;
isError?: boolean;
data?: T;
}
export const createPriorityQueue = <T>(
initialTasks: PriorityQueueItem<T>[] = [],
): PriorityQueue<T> => {
let queue: PriorityQueueItem<T>[] = [...initialTasks];
let isProcessing = false;
let currentTask: PriorityQueueItem<T> | null = null;
const subscribers: Set<Observer<T>> = new Set();
const taskSubject: Subject<PriorityQueueItem<T>> = new Subject();
let subscription: Subscription | null = null;
const sortQueue = () => {
console.debug("Sorting queue");
queue.sort((a, b) => b.priority - a.priority);
}
const processNextTask = () => {
console.debug("Processing next task");
if (!isProcessing || queue.length === 0) {
console.debug("No tasks to process or processing is paused");
if (queue.length === 0) {
console.debug("Queue is empty, completing subscribers");
subscribers.forEach(subscriber => subscriberplete()); // do we want this?
}
return;
}
sortQueue();
const nextTask = queue.shift();
if (nextTask) {
console.debug("Next task found:", nextTask.id);
currentTask = nextTask;
taskSubject.next(nextTask);
}
}
const processTask = (task: PriorityQueueItem<T>): Observable<TaskResult<T>> => {
console.debug("Processing task:", task.id);
return from(task.task()).pipe(
map(data => ({ data })),
catchError(error => of({ error, isError: true }))
);
};
const initializeSubscription = (): void => {
if (subscription) {
console.debug("Subscription already initialized");
return;
}
console.debug("Initializing subscription");
subscription = taskSubject.pipe(
mergeMap(task =>
processTask(task),
1, // Concurrency = 1
),
tap(() => {
console.debug("Task completed, clearing current task");
currentTask = null; // Clear current task reference
processNextTask();
}),
).subscribe(
result => {
if (result && typeof result === 'object' && 'isError' in result) {
console.debug("Task failed with error:", result.error);
subscribers.forEach(subscriber => subscriber.error(result.error));
}
else {
console.debug("Task succeeded with result:", result);
subscribers.forEach(subscriber => subscriber.next(result as T));
}
}
);
}
return {
addTask: (task: PriorityQueueItem<T>) => {
console.debug("Adding task:", task.id);
queue.push({ ...task }); // Create copies
if (isProcessing) {
processNextTask();
}
},
addTasksGroup: (tasks: PriorityQueueItem<T>[]) => {
console.debug("Adding tasks group");
queue.push(...tasks.map(task => (
{ ...task }
))); // Create copies
if (isProcessing) {
processNextTask();
}
},
removeTask: (id: string) => {
console.debug("Removing task:", id);
const index = queue.findIndex(task => task.id === id);
if (index === -1) {
console.debug("Task not found:", id);
return false;
}
queue.splice(index, 1);
return true;
},
terminate: () => {
console.debug("Terminating queue");
isProcessing = false;
if (subscription) {
subscription.unsubscribe();
subscription = null;
}
queue = [];
return true;
},
subscribe: (subscriber: Subscriber<T>) => {
console.debug("Subscribing");
const subscriberObserver: Observer<T> = {
next: value => subscriber.onSuccessfulTask(value),
error: error => subscriber.onFailedTask(error),
complete: () => subscriber.onAllTasksCompleted?.() || emptyFunction,
};
subscribers.add(subscriberObserver);
initializeSubscription();
return () => {
console.debug("Unsubscribing");
subscribers.delete(subscriberObserver);
}
},
updateTaskPriority: (id: string, newPriority: number) => {
console.debug("Updating task priority:", id, newPriority);
const index = queue.findIndex(task => task.id === id);
if (index === -1) {
console.debug("Task not found:", id);
return false;
}
queue[index].priority = newPriority;
return true;
},
start: () => {
console.debug("Starting queue processing");
isProcessing = true;
if (!currentTask) {
processNextTask();
}
},
}
}
I'm implementing a priority queue for asynchronous tasks in TypeScript (using React for the UI). The queue is continuously filled with tasks, and there’s a button in the UI that can either trigger a priority change or add a new task to the queue. Given that JavaScript runs on a single thread (with concurrency but no parallelism), I’m wondering if I need to protect the queue from potential race conditions. Specifically:
- If the queue is being modified (tasks added or priorities changed) by the UI, and at the same time a context switch occurs after finding the index to change but before sorting, could it result in a priority being changed for the wrong task?
- Should I use a mutex or some other synchronization method to ensure thread safety?
I understand that the task execution is single-threaded. I’ve also considered using the p-queue package , but I’m not sure if it provides protection against race conditions. Any insight would be greatly appreciated, as I’m still learning about JavaScript/TypeScript concurrency models.
my implementation:
import { Subject } from "rxjs/internal/Subject";
import { Subscription } from "rxjs/internal/Subscription";
import { from, of, Observer, Observable } from "rxjs";
import { mergeMap } from "rxjs/internal/operators/mergeMap";
import { catchError, map, tap } from "rxjs/operators";
export type PriorityQueueItem<T> = {
readonly id: string;
priority: number;
task: () => Promise<T>;
};
export type PriorityQueue<T> = {
addTask: (item: PriorityQueueItem<T>) => void;
addTasksGroup: (items: PriorityQueueItem<T>[]) => void;
removeTask: (id: string) => boolean;
terminate: () => boolean;
subscribe: (subscriber: Subscriber<T>) => () => void;
updateTaskPriority: (id: string, newPriority: number) => boolean;
start: () => void;
};
export type Subscriber<T> = {
onSuccessfulTask: (result: T) => void;
onFailedTask: (error: Error) => void;
onAllTasksCompleted?: () => void;
}
type TaskResult<T> = {
error?: Error;
isError?: boolean;
data?: T;
}
export const createPriorityQueue = <T>(
initialTasks: PriorityQueueItem<T>[] = [],
): PriorityQueue<T> => {
let queue: PriorityQueueItem<T>[] = [...initialTasks];
let isProcessing = false;
let currentTask: PriorityQueueItem<T> | null = null;
const subscribers: Set<Observer<T>> = new Set();
const taskSubject: Subject<PriorityQueueItem<T>> = new Subject();
let subscription: Subscription | null = null;
const sortQueue = () => {
console.debug("Sorting queue");
queue.sort((a, b) => b.priority - a.priority);
}
const processNextTask = () => {
console.debug("Processing next task");
if (!isProcessing || queue.length === 0) {
console.debug("No tasks to process or processing is paused");
if (queue.length === 0) {
console.debug("Queue is empty, completing subscribers");
subscribers.forEach(subscriber => subscriber.complete()); // do we want this?
}
return;
}
sortQueue();
const nextTask = queue.shift();
if (nextTask) {
console.debug("Next task found:", nextTask.id);
currentTask = nextTask;
taskSubject.next(nextTask);
}
}
const processTask = (task: PriorityQueueItem<T>): Observable<TaskResult<T>> => {
console.debug("Processing task:", task.id);
return from(task.task()).pipe(
map(data => ({ data })),
catchError(error => of({ error, isError: true }))
);
};
const initializeSubscription = (): void => {
if (subscription) {
console.debug("Subscription already initialized");
return;
}
console.debug("Initializing subscription");
subscription = taskSubject.pipe(
mergeMap(task =>
processTask(task),
1, // Concurrency = 1
),
tap(() => {
console.debug("Task completed, clearing current task");
currentTask = null; // Clear current task reference
processNextTask();
}),
).subscribe(
result => {
if (result && typeof result === 'object' && 'isError' in result) {
console.debug("Task failed with error:", result.error);
subscribers.forEach(subscriber => subscriber.error(result.error));
}
else {
console.debug("Task succeeded with result:", result);
subscribers.forEach(subscriber => subscriber.next(result as T));
}
}
);
}
return {
addTask: (task: PriorityQueueItem<T>) => {
console.debug("Adding task:", task.id);
queue.push({ ...task }); // Create copies
if (isProcessing) {
processNextTask();
}
},
addTasksGroup: (tasks: PriorityQueueItem<T>[]) => {
console.debug("Adding tasks group");
queue.push(...tasks.map(task => (
{ ...task }
))); // Create copies
if (isProcessing) {
processNextTask();
}
},
removeTask: (id: string) => {
console.debug("Removing task:", id);
const index = queue.findIndex(task => task.id === id);
if (index === -1) {
console.debug("Task not found:", id);
return false;
}
queue.splice(index, 1);
return true;
},
terminate: () => {
console.debug("Terminating queue");
isProcessing = false;
if (subscription) {
subscription.unsubscribe();
subscription = null;
}
queue = [];
return true;
},
subscribe: (subscriber: Subscriber<T>) => {
console.debug("Subscribing");
const subscriberObserver: Observer<T> = {
next: value => subscriber.onSuccessfulTask(value),
error: error => subscriber.onFailedTask(error),
complete: () => subscriber.onAllTasksCompleted?.() || emptyFunction,
};
subscribers.add(subscriberObserver);
initializeSubscription();
return () => {
console.debug("Unsubscribing");
subscribers.delete(subscriberObserver);
}
},
updateTaskPriority: (id: string, newPriority: number) => {
console.debug("Updating task priority:", id, newPriority);
const index = queue.findIndex(task => task.id === id);
if (index === -1) {
console.debug("Task not found:", id);
return false;
}
queue[index].priority = newPriority;
return true;
},
start: () => {
console.debug("Starting queue processing");
isProcessing = true;
if (!currentTask) {
processNextTask();
}
},
}
}
Share
Improve this question
asked Jan 19 at 8:25
YaelYael
211 bronze badge
1 Answer
Reset to default 0As you have already mentioned, javascript UI is single threaded. We can allocate additional worker threads, but the other story entirely, and even in that case you don't need any kind of mutex or other synchronization mechanisms.
The p-queue
library you proposed has other purposes. It provides rate limiting capabilities, e.g. for limiting concurrently running http requests of some kind, or preventing oversaturation of event loop on the UI thread which might cause it's blockage.
In my opinion your design is already slightly overengineered and you shouldn't introduce new entities to it for no reason.