最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

c# - Run background process in high priority TaskScheduler - Stack Overflow

programmeradmin0浏览0评论

I have legacy codebase that is starving ThreadPool because of many sync calls (instead of using async/await). I am trying to create background process that will prolong kubernetes lease due to managing locks in case of running in multiple replicas. This process should run in highest priority. But I am struggling to do that because It seems to be not such easy to bypass ThreadPool. I tried to use custom TaskScheduler with own threads but with no luck. Is it even possible?

Here is my implementation using CustomTaskScheduler. The program hangs on HttpClient SendAsync().

class Program
{
    static async Task Main()
    {
        // Simulate thread pool starvation
        ThreadPool.SetMaxThreads(50, 1);
        for (int i = 0; i < 60; i++)
        {
            _ = Task.Run(() =>
            {
                Thread.Sleep(100000);
            });
        }

        using (var scheduler = new CustomTaskScheduler(workerCount: 1))
        {
            var factory = new TaskFactory(scheduler);
            var tasks = new List<Task>();

            for (int i = 0; i < 5; i++)
            {
                int taskNum = i;
                await factory.StartNew(async () =>
                {
                    Console.WriteLine($"Task {taskNum} is running on thread {Thread.CurrentThread.ManagedThreadId}");
                    await RunAsyncFunction(taskNum);
                }, CancellationToken.None, TaskCreationOptions.None, scheduler).Unwrap();
            }
        }

        Console.WriteLine("All tasks completed.");
        await Task.Delay(1000000);
    }

    static async Task RunAsyncFunction(int taskNum)
    {
        Console.WriteLine($"Task {taskNum} started on thread {Thread.CurrentThread.ManagedThreadId}");
        var client = new HttpClient();
        await client.SendAsync(new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"));
        Console.WriteLine($"Task {taskNum} resumed on thread {Thread.CurrentThread.ManagedThreadId}");
    }
}

public class CustomTaskScheduler : TaskScheduler, IDisposable
{
    private readonly System.Collections.Concurrent.BlockingCollection<Task> taskQueue = new();
    private readonly List<Thread> workerThreads = new();
    private readonly CancellationTokenSource cts = new();

    public CustomTaskScheduler(int workerCount)
    {
        for (int i = 0; i < workerCount; i++)
        {
            var thread = new Thread(WorkerLoop)
            {
                IsBackground = true
            };
            workerThreads.Add(thread);
            thread.Start();
        }
    }

    protected override IEnumerable<Task> GetScheduledTasks() => taskQueue.ToArray();

    protected override void QueueTask(Task task)
    {
        if (cts.IsCancellationRequested)
            throw new InvalidOperationException("Scheduler is shutting down.");

        taskQueue.Add(task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return TryExecuteTask(task);
    }

    private void WorkerLoop()
    {
        try
        {
            foreach (var task in taskQueue.GetConsumingEnumerable(cts.Token))
            {
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
                TryExecuteTask(task);
            }
        }
        catch (OperationCanceledException) when (cts.IsCancellationRequested) { }
    }

    public void Dispose()
    {
        cts.Cancel();
        taskQueue.CompleteAdding();
        foreach (var worker in workerThreads)
        {
            worker.Join();
        }
        taskQueue.Dispose();
        cts.Dispose();
    }
}

EDIT: It is interesting that the code hangs even if I use synchronous version of HttpClient Send() in my own thread. So it is still using ThreadPool. Here is the synchronous version


ThreadPool.SetMaxThreads(12, 10000);
for (int i = 0; i < 20; i++)
{
    _ = Task.Run(() =>
    {
        Thread.Sleep(100000);
    });
}

var thread = new Thread(SendHttpRequest);
thread.IsBackground = true;
thread.Start();

Thread.Sleep(100000);


static void SendHttpRequest()
{
    using (HttpClient client = new HttpClient())
    using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"))
    {
        var response = client.Send(request);
        // Never goes here if ThreadPool is exhausted
        Console.WriteLine(response.Content.ReadAsStringAsync().Result);
    }
}

I have legacy codebase that is starving ThreadPool because of many sync calls (instead of using async/await). I am trying to create background process that will prolong kubernetes lease due to managing locks in case of running in multiple replicas. This process should run in highest priority. But I am struggling to do that because It seems to be not such easy to bypass ThreadPool. I tried to use custom TaskScheduler with own threads but with no luck. Is it even possible?

Here is my implementation using CustomTaskScheduler. The program hangs on HttpClient SendAsync().

class Program
{
    static async Task Main()
    {
        // Simulate thread pool starvation
        ThreadPool.SetMaxThreads(50, 1);
        for (int i = 0; i < 60; i++)
        {
            _ = Task.Run(() =>
            {
                Thread.Sleep(100000);
            });
        }

        using (var scheduler = new CustomTaskScheduler(workerCount: 1))
        {
            var factory = new TaskFactory(scheduler);
            var tasks = new List<Task>();

            for (int i = 0; i < 5; i++)
            {
                int taskNum = i;
                await factory.StartNew(async () =>
                {
                    Console.WriteLine($"Task {taskNum} is running on thread {Thread.CurrentThread.ManagedThreadId}");
                    await RunAsyncFunction(taskNum);
                }, CancellationToken.None, TaskCreationOptions.None, scheduler).Unwrap();
            }
        }

        Console.WriteLine("All tasks completed.");
        await Task.Delay(1000000);
    }

    static async Task RunAsyncFunction(int taskNum)
    {
        Console.WriteLine($"Task {taskNum} started on thread {Thread.CurrentThread.ManagedThreadId}");
        var client = new HttpClient();
        await client.SendAsync(new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"));
        Console.WriteLine($"Task {taskNum} resumed on thread {Thread.CurrentThread.ManagedThreadId}");
    }
}

public class CustomTaskScheduler : TaskScheduler, IDisposable
{
    private readonly System.Collections.Concurrent.BlockingCollection<Task> taskQueue = new();
    private readonly List<Thread> workerThreads = new();
    private readonly CancellationTokenSource cts = new();

    public CustomTaskScheduler(int workerCount)
    {
        for (int i = 0; i < workerCount; i++)
        {
            var thread = new Thread(WorkerLoop)
            {
                IsBackground = true
            };
            workerThreads.Add(thread);
            thread.Start();
        }
    }

    protected override IEnumerable<Task> GetScheduledTasks() => taskQueue.ToArray();

    protected override void QueueTask(Task task)
    {
        if (cts.IsCancellationRequested)
            throw new InvalidOperationException("Scheduler is shutting down.");

        taskQueue.Add(task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return TryExecuteTask(task);
    }

    private void WorkerLoop()
    {
        try
        {
            foreach (var task in taskQueue.GetConsumingEnumerable(cts.Token))
            {
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
                TryExecuteTask(task);
            }
        }
        catch (OperationCanceledException) when (cts.IsCancellationRequested) { }
    }

    public void Dispose()
    {
        cts.Cancel();
        taskQueue.CompleteAdding();
        foreach (var worker in workerThreads)
        {
            worker.Join();
        }
        taskQueue.Dispose();
        cts.Dispose();
    }
}

EDIT: It is interesting that the code hangs even if I use synchronous version of HttpClient Send() in my own thread. So it is still using ThreadPool. Here is the synchronous version


ThreadPool.SetMaxThreads(12, 10000);
for (int i = 0; i < 20; i++)
{
    _ = Task.Run(() =>
    {
        Thread.Sleep(100000);
    });
}

var thread = new Thread(SendHttpRequest);
thread.IsBackground = true;
thread.Start();

Thread.Sleep(100000);


static void SendHttpRequest()
{
    using (HttpClient client = new HttpClient())
    using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"))
    {
        var response = client.Send(request);
        // Never goes here if ThreadPool is exhausted
        Console.WriteLine(response.Content.ReadAsStringAsync().Result);
    }
}

Share Improve this question edited Mar 13 at 1:31 Zhi Lv 22k1 gold badge27 silver badges37 bronze badges asked Mar 6 at 15:01 MayoMayo 9493 gold badges10 silver badges16 bronze badges 10
  • 1 As a side-note, the HttpClient class is intended to be instantiated once, and reused throughout the life of an application. This component is thread-safe. – Theodor Zoulias Commented Mar 6 at 15:07
  • Check: IHttpClientFactory stackoverflow/questions/73037522/… – Taco Commented Mar 6 at 15:14
  • Yeah you are right, but the code is just for demonstrating the problem. I am using IHttpContextFactory in my code. – Mayo Commented Mar 6 at 15:21
  • 2 So, you want to run some code periodically, even if the threadpool is exhausted? Why not just start a regular thread that uses .Send(...) and Thread.Sleep? I would also try to address the exhaustion issue, since that is a serious problem that could snowball. – JonasH Commented Mar 6 at 15:56
  • 1 Rather than completely blocking the real thread pool and trying to create a custom thread pool to use as intended with short running work surrounding actually asynchronous operations, do the reverse. Use non-thread pool threads for the long running synchronous work. – Servy Commented Mar 6 at 16:14
 |  Show 5 more comments

2 Answers 2

Reset to default 2

You can control where the code runs after the await, but you can't control where the built-in HttpClient.SendAsync API completes. AFAIK it is implemented to complete on the ThreadPool, and you can't do anything about it (apart from reimplementing the HttpClient from scratch). So as long as the ThreadPool is saturated, the HttpClient.SendAsync will not complete, because there is no ThreadPool thread available to complete it. When one becomes available, it will immediately reschedule the continuation to your CustomTaskScheduler, so that thread will have a minuscule work to do, and will be immediately available to do other pending ThreadPool work. Which is something, but most likely it won't help your case because there is something else that saturates your ThreadPool, that will soon steal this thread too.

The HttpClient.SendAsync is not special regarding where it completes. Most built-in .NET async APIs complete on the ThreadPool, including for example the Task.Delay method. So your best bet is to make sure that the ThreadPool is healthy, and there is no code in your application that aggresively starves it. A possible cause for a saturated ThreadPool might be a Parallel.ForEach loop call with unconfigured MaxDegreeOfParallelism, especially if the source of the loop is a blocking collection.

await is going to schedule the continuation to run via SynchronizationContext.Current, not necessarily TaskScheduler.Current. And many asynchronous operations are going to explicitly use the synchronization context, rather than the current task scheduler, or worse in your case, explicitly use the default scheduler/sync context instead of the current.

If you make your own synchronization context and set it as the current context when running operations using your custom thread pool (and to not be mean, set what was previously the current context when you're done) then those places won't use the thread pool. It's also worth noting that in your example, you have awaits outside of the custom task scheduler, which will all schedule their continuations using the thread pool, not your custom scheduler.

But even if you do all of that, you have a more fundamental problem that it's best practices for libraries to use ConfigureAwait(false) in most contexts, so they're going to intentionally ensure that their continuations run in the default thread pool thread rather than in your custom context, since they don't think they need it. You're simply going to have a bad time blocking the default thread pool and trying to work around that.

Instead of blocking the thread pool with long running work that prevents it from being used as intended, make that work use a custom thread pool (or just use LongRunning on Task.Run) so that all of the things that want to use the thread pool can.

发布评论

评论列表(0)

  1. 暂无评论