I have a firebase subscription in my angular app which fires multiple times. How can ich achieve that the tasks are processed as a queue so that I can run each task synchronously once?
this.tasks.subscribe(async tasks => {
for (const x of tasks)
await dolongtask(x); // has to be sync
await removetask(x);
});
The problem is that the subribe event fires when the longtask is still processing.
I have a firebase subscription in my angular app which fires multiple times. How can ich achieve that the tasks are processed as a queue so that I can run each task synchronously once?
this.tasks.subscribe(async tasks => {
for (const x of tasks)
await dolongtask(x); // has to be sync
await removetask(x);
});
The problem is that the subribe event fires when the longtask is still processing.
Share Improve this question asked Sep 9, 2018 at 16:17 danieldaniel 35.8k40 gold badges107 silver badges162 bronze badges 5- 2 Make longtask return an observable that pletes when the task is done and use concatMap. Be aware that this backpressure can result in a memory leak. – Ingo Bürk Commented Sep 9, 2018 at 16:23
- why do I have to return an observable from longtask? – daniel Commented Sep 9, 2018 at 16:49
- Because concatMap expects observables. You can of course do it without observables, but then what's the point of using rxjs? – Ingo Bürk Commented Sep 9, 2018 at 16:50
- can you paste some code? – daniel Commented Sep 9, 2018 at 16:57
-
this.tasks.pipe(concatMap(tasks => processTasks(tasks))).subscribe()
. Can't say more since you didn't say what your function does. – Ingo Bürk Commented Sep 9, 2018 at 17:18
3 Answers
Reset to default 4IMHO, I would try and leverage the power of rxjs since we're using it here already anyway and avoid implementing a custom queuing concept as suggested by another answer (though you certainly can do that).
If we simplify the given case a bit, we just have some observable and want to perform a long-running procedure for each emission – in sequence. rxjs allows doing this by means of the concatMap
operator essentially out of the box:
$data.pipe(concatMap(item => processItem(item))).subscribe();
This only assumes that processItem
returns an observable. Since you used await
, I assume your function(s) currently return Promises. These can be trivially converted into observables using from
.
The only detail left to look at from the OP is that the observable actually emits an array of items and we want to perform the operation on each item of each emission. To do that, we just flatten the observable using mergeMap
.
Let's put it all together. Note that if you take away preparing some stub data and logging, the actual implementation of this is only two lines of code (using mergeMap + concatMap).
const { from, interval } = rxjs;
const { mergeMap, concatMap, take, bufferCount, tap } = rxjs.operators;
// Stub for the long-running operation
function processTask(task) {
console.log("Processing task: ", task);
return new Promise(resolve => {
setTimeout(() => {
console.log("Finished task: ", task);
resolve(task);
}, 500 * Math.random() + 300);
});
}
// Turn processTask into a function returning an observable
const processTask$ = item => from(processTask(item));
// Some stubbed data stream
const tasks$ = interval(250).pipe(
take(9),
bufferCount(3),
);
tasks$.pipe(
tap(task => console.log("Received task: ", task)),
// Flatten the tasks array since we want to work in sequence anyway
mergeMap(tasks => tasks),
// Process each task, but do so consecutively
concatMap(task => processTask$(task)),
).subscribe(null, null, () => console.log("Done"));
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>
I am making a couple of assumptions from the code you gave,
other applications add tasks to the firebase db (asynchronously), and this code is implementing the task processor.
your firebase query returns all unprocessed tasks (in a collection) and it emits the full list every time a new task is added.
the query will drop a task only after
removeTask()
has been run
If this is so, you need a deduping mechanism before the processor.
For the purpose of illustration, I've simulated the firebase query with a subject (renamed it to tasksQuery$) and a sequence of firebase events are simulated at the bottom of the script. I hope it's not too confusing!
console.clear()
const { mergeMap, filter } = rxjs.operators;
// Simulate tasks query
const tasksQuery$ = new rxjs.Subject();
// Simulate dolongtask and removetask (assume both return promises that can be awaited)
const dolongtask = (task) => {
console.log( `Processing: ${task.id}`);
return new Promise(resolve => {
setTimeout(() => {
console.log( `Processed: ${task.id}`);
resolve('done')
}, 1000);
});
}
const removeTask = (task) => {
console.log( `Removing: ${task.id}`);
return new Promise(resolve => {
setTimeout(() => {
console.log( `Removed: ${task.id}`);
resolve('done')
}, 200);
});
}
// Set up queue (this block could be a class in Typescript)
let tasks = [];
const queue$ = new rxjs.Subject();
const addToQueue = (task) => {
tasks = [...tasks, task];
queue$.next(task);
}
const removeFromQueue = () => tasks = tasks.slice(1);
const queueContains = (task) => tasks.map(t => t.id).includes(task.id)
// Dedupe and enqueue
tasksQuery$.pipe(
mergeMap(tasks => tasks), // flatten the ining task array
filter(task => task && !queueContains(task)) // check not in queue
).subscribe(task => addToQueue(task) );
//Process the queue
queue$.subscribe(async task => {
await dolongtask(task);
await removeTask(task); // Assume this sends 'delete' to firebase
removeFromQueue();
});
// Run simulation
tasksQuery$.next([{id:1},{id:2}]);
// Add after delay to show repeated items in firebase
setTimeout(() => {
tasksQuery$.next([{id:1},{id:2},{id:3}]);
}, 500);
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>
Leaving aside your title 'Rxjs subscription queue', you can actually fix your async/await code.
The problem is that async/await does not play nicely with for loops, see this question Using async/await with a forEach loop.
For example, you can replace the for loop as per @Bergi's answer,
with Promise.all()
console.clear();
const { interval } = rxjs;
const { take, bufferCount } = rxjs.operators;
function processTask(task) {
console.log(`Processing task ${task}`);
return new Promise(resolve => {
setTimeout(() => {
resolve(task);
}, 500 * Math.random() + 300);
});
}
function removeTask(task) {
console.log(`Removing task ${task}`);
return new Promise(resolve => {
setTimeout(() => {
resolve(task);
}, 50);
});
}
const tasks$ = interval(250).pipe(
take(10),
bufferCount(3),
);
tasks$.subscribe(async tasks => {
await Promise.all(
tasks.map(async task => {
await processTask(task); // has to be sync
await removeTask(task);
console.log(`Finished task ${task}`);
})
);
});
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>
Better yet, you can shape the query to avoid using a for loop,
with mergeMap()
console.clear();
const { interval } = rxjs;
const { mergeMap, take, bufferCount } = rxjs.operators;
function processTask(task) {
console.log(`Processing task ${task}`);
return new Promise(resolve => {
setTimeout(() => {
resolve(task);
}, 500 * Math.random() + 300);
});
}
function removeTask(task) {
console.log(`Removing task ${task}`);
return new Promise(resolve => {
setTimeout(() => {
resolve(task);
}, 50);
});
}
const tasks$ = interval(250).pipe(
take(10),
bufferCount(3),
);
tasks$
.pipe(mergeMap(tasks => tasks))
.subscribe(
async task => {
await processTask(task); // has to be sync
await removeTask(task);
console.log(`Finished task ${task}`);
}
);
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>