最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

concurrency - Typescript Promise Priority Queue Race Condition - Stack Overflow

programmeradmin1浏览0评论

I'm implementing a priority queue for asynchronous tasks in TypeScript (using React for the UI). The queue is continuously filled with tasks, and there’s a button in the UI that can either trigger a priority change or add a new task to the queue. Given that JavaScript runs on a single thread (with concurrency but no parallelism), I’m wondering if I need to protect the queue from potential race conditions. Specifically:

  1. If the queue is being modified (tasks added or priorities changed) by the UI, and at the same time a context switch occurs after finding the index to change but before sorting, could it result in a priority being changed for the wrong task?
  2. Should I use a mutex or some other synchronization method to ensure thread safety?

I understand that the task execution is single-threaded. I’ve also considered using the p-queue package , but I’m not sure if it provides protection against race conditions. Any insight would be greatly appreciated, as I’m still learning about JavaScript/TypeScript concurrency models.

my implementation:

import { Subject } from "rxjs/internal/Subject";
import { Subscription } from "rxjs/internal/Subscription";
import { from, of, Observer, Observable } from "rxjs";
import { mergeMap } from "rxjs/internal/operators/mergeMap";
import { catchError, map, tap } from "rxjs/operators";

export type PriorityQueueItem<T> = {
    readonly id: string;
    priority: number;
    task: () => Promise<T>;
};

export type PriorityQueue<T> = {
    addTask: (item: PriorityQueueItem<T>) => void;
    addTasksGroup: (items: PriorityQueueItem<T>[]) => void;
    removeTask: (id: string) => boolean;
    terminate: () => boolean;
    subscribe: (subscriber: Subscriber<T>) => () => void;
    updateTaskPriority: (id: string, newPriority: number) => boolean;
    start: () => void;
};

export type Subscriber<T> = {
    onSuccessfulTask: (result: T) => void;
    onFailedTask: (error: Error) => void;
    onAllTasksCompleted?: () => void;
}

type TaskResult<T> = {
    error?: Error;
    isError?: boolean;
    data?: T;
}

export const createPriorityQueue = <T>(
    initialTasks: PriorityQueueItem<T>[] = [],
): PriorityQueue<T> => {

    let queue: PriorityQueueItem<T>[] = [...initialTasks];
    let isProcessing = false;
    let currentTask: PriorityQueueItem<T> | null = null;

    const subscribers: Set<Observer<T>> = new Set();
    const taskSubject: Subject<PriorityQueueItem<T>> = new Subject();
    let subscription: Subscription | null = null;

    const sortQueue = () => {
        console.debug("Sorting queue");
        queue.sort((a, b) => b.priority - a.priority);
    }

    const processNextTask = () => {
        console.debug("Processing next task");
        if (!isProcessing || queue.length === 0) {
            console.debug("No tasks to process or processing is paused");
            if (queue.length === 0) {
                console.debug("Queue is empty, completing subscribers");
                subscribers.forEach(subscriber => subscriberplete()); // do we want this?
            }
            return;
        }
        sortQueue();
        const nextTask = queue.shift();
        if (nextTask) {
            console.debug("Next task found:", nextTask.id);
            currentTask = nextTask;
            taskSubject.next(nextTask);
        }
    }

    const processTask = (task: PriorityQueueItem<T>): Observable<TaskResult<T>> => {
        console.debug("Processing task:", task.id);
        return from(task.task()).pipe(
            map(data => ({ data })),
            catchError(error => of({ error, isError: true }))
        );
    };

    const initializeSubscription = (): void => {
        if (subscription) {
            console.debug("Subscription already initialized");
            return;
        }

        console.debug("Initializing subscription");
        subscription = taskSubject.pipe(
            mergeMap(task =>
                processTask(task),
                1, // Concurrency = 1
            ),
            tap(() => {
                console.debug("Task completed, clearing current task");
                currentTask = null;  // Clear current task reference
                processNextTask();
            }),
        ).subscribe(
            result => {
                if (result && typeof result === 'object' && 'isError' in result) {
                    console.debug("Task failed with error:", result.error);
                    subscribers.forEach(subscriber => subscriber.error(result.error));
                }
                else {
                    console.debug("Task succeeded with result:", result);
                    subscribers.forEach(subscriber => subscriber.next(result as T));
                }
            }
        );
    }

    return {
        addTask: (task: PriorityQueueItem<T>) => {
            console.debug("Adding task:", task.id);
            queue.push({ ...task }); // Create copies
            if (isProcessing) {
                processNextTask();
            }
        },
        addTasksGroup: (tasks: PriorityQueueItem<T>[]) => {
            console.debug("Adding tasks group");
            queue.push(...tasks.map(task => (
                { ...task }
            ))); // Create copies
            if (isProcessing) {
                processNextTask();
            }
        },
        removeTask: (id: string) => {
            console.debug("Removing task:", id);
            const index = queue.findIndex(task => task.id === id);
            if (index === -1) {
                console.debug("Task not found:", id);
                return false;
            }
            queue.splice(index, 1);
            return true;
        },
        terminate: () => {
            console.debug("Terminating queue");
            isProcessing = false;
            if (subscription) {
                subscription.unsubscribe();
                subscription = null;
            }
            queue = [];
            return true;
        },
        subscribe: (subscriber: Subscriber<T>) => {
            console.debug("Subscribing");
            const subscriberObserver: Observer<T> = {
                next: value => subscriber.onSuccessfulTask(value),
                error: error => subscriber.onFailedTask(error),
                complete: () => subscriber.onAllTasksCompleted?.() || emptyFunction,
            };

            subscribers.add(subscriberObserver);
            initializeSubscription();

            return () => {
                console.debug("Unsubscribing");
                subscribers.delete(subscriberObserver);
            }
        },
        updateTaskPriority: (id: string, newPriority: number) => {
            console.debug("Updating task priority:", id, newPriority);
            const index = queue.findIndex(task => task.id === id);
            if (index === -1) {
                console.debug("Task not found:", id);
                return false;
            }
            queue[index].priority = newPriority;
            return true;
        },
        start: () => {
            console.debug("Starting queue processing");
            isProcessing = true;
            if (!currentTask) {
                processNextTask();
            }
        },
    }
}

I'm implementing a priority queue for asynchronous tasks in TypeScript (using React for the UI). The queue is continuously filled with tasks, and there’s a button in the UI that can either trigger a priority change or add a new task to the queue. Given that JavaScript runs on a single thread (with concurrency but no parallelism), I’m wondering if I need to protect the queue from potential race conditions. Specifically:

  1. If the queue is being modified (tasks added or priorities changed) by the UI, and at the same time a context switch occurs after finding the index to change but before sorting, could it result in a priority being changed for the wrong task?
  2. Should I use a mutex or some other synchronization method to ensure thread safety?

I understand that the task execution is single-threaded. I’ve also considered using the p-queue package , but I’m not sure if it provides protection against race conditions. Any insight would be greatly appreciated, as I’m still learning about JavaScript/TypeScript concurrency models.

my implementation:

import { Subject } from "rxjs/internal/Subject";
import { Subscription } from "rxjs/internal/Subscription";
import { from, of, Observer, Observable } from "rxjs";
import { mergeMap } from "rxjs/internal/operators/mergeMap";
import { catchError, map, tap } from "rxjs/operators";

export type PriorityQueueItem<T> = {
    readonly id: string;
    priority: number;
    task: () => Promise<T>;
};

export type PriorityQueue<T> = {
    addTask: (item: PriorityQueueItem<T>) => void;
    addTasksGroup: (items: PriorityQueueItem<T>[]) => void;
    removeTask: (id: string) => boolean;
    terminate: () => boolean;
    subscribe: (subscriber: Subscriber<T>) => () => void;
    updateTaskPriority: (id: string, newPriority: number) => boolean;
    start: () => void;
};

export type Subscriber<T> = {
    onSuccessfulTask: (result: T) => void;
    onFailedTask: (error: Error) => void;
    onAllTasksCompleted?: () => void;
}

type TaskResult<T> = {
    error?: Error;
    isError?: boolean;
    data?: T;
}

export const createPriorityQueue = <T>(
    initialTasks: PriorityQueueItem<T>[] = [],
): PriorityQueue<T> => {

    let queue: PriorityQueueItem<T>[] = [...initialTasks];
    let isProcessing = false;
    let currentTask: PriorityQueueItem<T> | null = null;

    const subscribers: Set<Observer<T>> = new Set();
    const taskSubject: Subject<PriorityQueueItem<T>> = new Subject();
    let subscription: Subscription | null = null;

    const sortQueue = () => {
        console.debug("Sorting queue");
        queue.sort((a, b) => b.priority - a.priority);
    }

    const processNextTask = () => {
        console.debug("Processing next task");
        if (!isProcessing || queue.length === 0) {
            console.debug("No tasks to process or processing is paused");
            if (queue.length === 0) {
                console.debug("Queue is empty, completing subscribers");
                subscribers.forEach(subscriber => subscriber.complete()); // do we want this?
            }
            return;
        }
        sortQueue();
        const nextTask = queue.shift();
        if (nextTask) {
            console.debug("Next task found:", nextTask.id);
            currentTask = nextTask;
            taskSubject.next(nextTask);
        }
    }

    const processTask = (task: PriorityQueueItem<T>): Observable<TaskResult<T>> => {
        console.debug("Processing task:", task.id);
        return from(task.task()).pipe(
            map(data => ({ data })),
            catchError(error => of({ error, isError: true }))
        );
    };

    const initializeSubscription = (): void => {
        if (subscription) {
            console.debug("Subscription already initialized");
            return;
        }

        console.debug("Initializing subscription");
        subscription = taskSubject.pipe(
            mergeMap(task =>
                processTask(task),
                1, // Concurrency = 1
            ),
            tap(() => {
                console.debug("Task completed, clearing current task");
                currentTask = null;  // Clear current task reference
                processNextTask();
            }),
        ).subscribe(
            result => {
                if (result && typeof result === 'object' && 'isError' in result) {
                    console.debug("Task failed with error:", result.error);
                    subscribers.forEach(subscriber => subscriber.error(result.error));
                }
                else {
                    console.debug("Task succeeded with result:", result);
                    subscribers.forEach(subscriber => subscriber.next(result as T));
                }
            }
        );
    }

    return {
        addTask: (task: PriorityQueueItem<T>) => {
            console.debug("Adding task:", task.id);
            queue.push({ ...task }); // Create copies
            if (isProcessing) {
                processNextTask();
            }
        },
        addTasksGroup: (tasks: PriorityQueueItem<T>[]) => {
            console.debug("Adding tasks group");
            queue.push(...tasks.map(task => (
                { ...task }
            ))); // Create copies
            if (isProcessing) {
                processNextTask();
            }
        },
        removeTask: (id: string) => {
            console.debug("Removing task:", id);
            const index = queue.findIndex(task => task.id === id);
            if (index === -1) {
                console.debug("Task not found:", id);
                return false;
            }
            queue.splice(index, 1);
            return true;
        },
        terminate: () => {
            console.debug("Terminating queue");
            isProcessing = false;
            if (subscription) {
                subscription.unsubscribe();
                subscription = null;
            }
            queue = [];
            return true;
        },
        subscribe: (subscriber: Subscriber<T>) => {
            console.debug("Subscribing");
            const subscriberObserver: Observer<T> = {
                next: value => subscriber.onSuccessfulTask(value),
                error: error => subscriber.onFailedTask(error),
                complete: () => subscriber.onAllTasksCompleted?.() || emptyFunction,
            };

            subscribers.add(subscriberObserver);
            initializeSubscription();

            return () => {
                console.debug("Unsubscribing");
                subscribers.delete(subscriberObserver);
            }
        },
        updateTaskPriority: (id: string, newPriority: number) => {
            console.debug("Updating task priority:", id, newPriority);
            const index = queue.findIndex(task => task.id === id);
            if (index === -1) {
                console.debug("Task not found:", id);
                return false;
            }
            queue[index].priority = newPriority;
            return true;
        },
        start: () => {
            console.debug("Starting queue processing");
            isProcessing = true;
            if (!currentTask) {
                processNextTask();
            }
        },
    }
}
Share Improve this question asked Jan 19 at 8:25 YaelYael 211 bronze badge
Add a comment  | 

1 Answer 1

Reset to default 0

As you have already mentioned, javascript UI is single threaded. We can allocate additional worker threads, but the other story entirely, and even in that case you don't need any kind of mutex or other synchronization mechanisms.

The p-queue library you proposed has other purposes. It provides rate limiting capabilities, e.g. for limiting concurrently running http requests of some kind, or preventing oversaturation of event loop on the UI thread which might cause it's blockage.

In my opinion your design is already slightly overengineered and you shouldn't introduce new entities to it for no reason.

发布评论

评论列表(0)

  1. 暂无评论