I have legacy codebase that is starving ThreadPool because of many sync calls (instead of using async/await). I am trying to create background process that will prolong kubernetes lease due to managing locks in case of running in multiple replicas. This process should run in highest priority. But I am struggling to do that because It seems to be not such easy to bypass ThreadPool. I tried to use custom TaskScheduler with own threads but with no luck. Is it even possible?
Here is my implementation using CustomTaskScheduler. The program hangs on HttpClient SendAsync().
class Program
{
static async Task Main()
{
// Simulate thread pool starvation
ThreadPool.SetMaxThreads(50, 1);
for (int i = 0; i < 60; i++)
{
_ = Task.Run(() =>
{
Thread.Sleep(100000);
});
}
using (var scheduler = new CustomTaskScheduler(workerCount: 1))
{
var factory = new TaskFactory(scheduler);
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
int taskNum = i;
await factory.StartNew(async () =>
{
Console.WriteLine($"Task {taskNum} is running on thread {Thread.CurrentThread.ManagedThreadId}");
await RunAsyncFunction(taskNum);
}, CancellationToken.None, TaskCreationOptions.None, scheduler).Unwrap();
}
}
Console.WriteLine("All tasks completed.");
await Task.Delay(1000000);
}
static async Task RunAsyncFunction(int taskNum)
{
Console.WriteLine($"Task {taskNum} started on thread {Thread.CurrentThread.ManagedThreadId}");
var client = new HttpClient();
await client.SendAsync(new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"));
Console.WriteLine($"Task {taskNum} resumed on thread {Thread.CurrentThread.ManagedThreadId}");
}
}
public class CustomTaskScheduler : TaskScheduler, IDisposable
{
private readonly System.Collections.Concurrent.BlockingCollection<Task> taskQueue = new();
private readonly List<Thread> workerThreads = new();
private readonly CancellationTokenSource cts = new();
public CustomTaskScheduler(int workerCount)
{
for (int i = 0; i < workerCount; i++)
{
var thread = new Thread(WorkerLoop)
{
IsBackground = true
};
workerThreads.Add(thread);
thread.Start();
}
}
protected override IEnumerable<Task> GetScheduledTasks() => taskQueue.ToArray();
protected override void QueueTask(Task task)
{
if (cts.IsCancellationRequested)
throw new InvalidOperationException("Scheduler is shutting down.");
taskQueue.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return TryExecuteTask(task);
}
private void WorkerLoop()
{
try
{
foreach (var task in taskQueue.GetConsumingEnumerable(cts.Token))
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
TryExecuteTask(task);
}
}
catch (OperationCanceledException) when (cts.IsCancellationRequested) { }
}
public void Dispose()
{
cts.Cancel();
taskQueue.CompleteAdding();
foreach (var worker in workerThreads)
{
worker.Join();
}
taskQueue.Dispose();
cts.Dispose();
}
}
EDIT: It is interesting that the code hangs even if I use synchronous version of HttpClient Send() in my own thread. So it is still using ThreadPool. Here is the synchronous version
ThreadPool.SetMaxThreads(12, 10000);
for (int i = 0; i < 20; i++)
{
_ = Task.Run(() =>
{
Thread.Sleep(100000);
});
}
var thread = new Thread(SendHttpRequest);
thread.IsBackground = true;
thread.Start();
Thread.Sleep(100000);
static void SendHttpRequest()
{
using (HttpClient client = new HttpClient())
using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"))
{
var response = client.Send(request);
// Never goes here if ThreadPool is exhausted
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
}
}
I have legacy codebase that is starving ThreadPool because of many sync calls (instead of using async/await). I am trying to create background process that will prolong kubernetes lease due to managing locks in case of running in multiple replicas. This process should run in highest priority. But I am struggling to do that because It seems to be not such easy to bypass ThreadPool. I tried to use custom TaskScheduler with own threads but with no luck. Is it even possible?
Here is my implementation using CustomTaskScheduler. The program hangs on HttpClient SendAsync().
class Program
{
static async Task Main()
{
// Simulate thread pool starvation
ThreadPool.SetMaxThreads(50, 1);
for (int i = 0; i < 60; i++)
{
_ = Task.Run(() =>
{
Thread.Sleep(100000);
});
}
using (var scheduler = new CustomTaskScheduler(workerCount: 1))
{
var factory = new TaskFactory(scheduler);
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
int taskNum = i;
await factory.StartNew(async () =>
{
Console.WriteLine($"Task {taskNum} is running on thread {Thread.CurrentThread.ManagedThreadId}");
await RunAsyncFunction(taskNum);
}, CancellationToken.None, TaskCreationOptions.None, scheduler).Unwrap();
}
}
Console.WriteLine("All tasks completed.");
await Task.Delay(1000000);
}
static async Task RunAsyncFunction(int taskNum)
{
Console.WriteLine($"Task {taskNum} started on thread {Thread.CurrentThread.ManagedThreadId}");
var client = new HttpClient();
await client.SendAsync(new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"));
Console.WriteLine($"Task {taskNum} resumed on thread {Thread.CurrentThread.ManagedThreadId}");
}
}
public class CustomTaskScheduler : TaskScheduler, IDisposable
{
private readonly System.Collections.Concurrent.BlockingCollection<Task> taskQueue = new();
private readonly List<Thread> workerThreads = new();
private readonly CancellationTokenSource cts = new();
public CustomTaskScheduler(int workerCount)
{
for (int i = 0; i < workerCount; i++)
{
var thread = new Thread(WorkerLoop)
{
IsBackground = true
};
workerThreads.Add(thread);
thread.Start();
}
}
protected override IEnumerable<Task> GetScheduledTasks() => taskQueue.ToArray();
protected override void QueueTask(Task task)
{
if (cts.IsCancellationRequested)
throw new InvalidOperationException("Scheduler is shutting down.");
taskQueue.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return TryExecuteTask(task);
}
private void WorkerLoop()
{
try
{
foreach (var task in taskQueue.GetConsumingEnumerable(cts.Token))
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
TryExecuteTask(task);
}
}
catch (OperationCanceledException) when (cts.IsCancellationRequested) { }
}
public void Dispose()
{
cts.Cancel();
taskQueue.CompleteAdding();
foreach (var worker in workerThreads)
{
worker.Join();
}
taskQueue.Dispose();
cts.Dispose();
}
}
EDIT: It is interesting that the code hangs even if I use synchronous version of HttpClient Send() in my own thread. So it is still using ThreadPool. Here is the synchronous version
ThreadPool.SetMaxThreads(12, 10000);
for (int i = 0; i < 20; i++)
{
_ = Task.Run(() =>
{
Thread.Sleep(100000);
});
}
var thread = new Thread(SendHttpRequest);
thread.IsBackground = true;
thread.Start();
Thread.Sleep(100000);
static void SendHttpRequest()
{
using (HttpClient client = new HttpClient())
using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"))
{
var response = client.Send(request);
// Never goes here if ThreadPool is exhausted
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
}
}
Share
Improve this question
edited Mar 13 at 1:31
Zhi Lv
22k1 gold badge27 silver badges37 bronze badges
asked Mar 6 at 15:01
MayoMayo
9493 gold badges10 silver badges16 bronze badges
10
|
Show 5 more comments
2 Answers
Reset to default 2You can control where the code runs after the await
, but you can't control where the built-in HttpClient.SendAsync
API completes. AFAIK it is implemented to complete on the ThreadPool
, and you can't do anything about it (apart from reimplementing the HttpClient
from scratch). So as long as the ThreadPool
is saturated, the HttpClient.SendAsync
will not complete, because there is no ThreadPool
thread available to complete it. When one becomes available, it will immediately reschedule the continuation to your CustomTaskScheduler
, so that thread will have a minuscule work to do, and will be immediately available to do other pending ThreadPool
work. Which is something, but most likely it won't help your case because there is something else that saturates your ThreadPool
, that will soon steal this thread too.
The HttpClient.SendAsync
is not special regarding where it completes. Most built-in .NET async APIs complete on the ThreadPool
, including for example the Task.Delay
method. So your best bet is to make sure that the ThreadPool
is healthy, and there is no code in your application that aggresively starves it. A possible cause for a saturated ThreadPool
might be a Parallel.ForEach
loop call with unconfigured MaxDegreeOfParallelism
, especially if the source
of the loop is a blocking collection.
await
is going to schedule the continuation to run via SynchronizationContext.Current
, not necessarily TaskScheduler.Current
. And many asynchronous operations are going to explicitly use the synchronization context, rather than the current task scheduler, or worse in your case, explicitly use the default scheduler/sync context instead of the current.
If you make your own synchronization context and set it as the current context when running operations using your custom thread pool (and to not be mean, set what was previously the current context when you're done) then those places won't use the thread pool. It's also worth noting that in your example, you have awaits outside of the custom task scheduler, which will all schedule their continuations using the thread pool, not your custom scheduler.
But even if you do all of that, you have a more fundamental problem that it's best practices for libraries to use ConfigureAwait(false)
in most contexts, so they're going to intentionally ensure that their continuations run in the default thread pool thread rather than in your custom context, since they don't think they need it. You're simply going to have a bad time blocking the default thread pool and trying to work around that.
Instead of blocking the thread pool with long running work that prevents it from being used as intended, make that work use a custom thread pool (or just use LongRunning
on Task.Run
) so that all of the things that want to use the thread pool can.
HttpClient
class is intended to be instantiated once, and reused throughout the life of an application. This component is thread-safe. – Theodor Zoulias Commented Mar 6 at 15:07.Send(...)
andThread.Sleep
? I would also try to address the exhaustion issue, since that is a serious problem that could snowball. – JonasH Commented Mar 6 at 15:56