最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

go - memory allocation using sync.Pool to optimise task retrieval from Redis - Stack Overflow

programmeradmin4浏览0评论

I have a service that selects tasks from Redis using a Lua script. I can select up to 1000 tasks at a time. Every 250 milliseconds I will retrieve tasks, and there are several such modules in one service (each module is responsible for processing background tasks, conditionally 3 modules each will retrieve 1000 tasks. So I have a question, do I need to use ```sync.Pool`` for memory allocation and optimisation? Will it not cause other problems and will I be able to get a performance gain, since the tasks consist of 5-8 fields in the structure.

My code


var taskPool = sync.Pool{
    New: func() interface{} {
        // This will be overridden by the generic type T at runtime
        var t interface{}
        return &t
    },
}

// bytePool is a sync.Pool for reusing []byte buffers for JSON unmarshaling.
var bytePool = sync.Pool{
    New: func() interface{} {
        return make([]byte, 0, 1024) // Initial capacity for typical JSON task size
    },
}

var extractCommand = redis.NewScript(`
local key = KEYS[1]
local max_tasks = tonumber(ARGV[1])
local tasks = {}

for i = 1, max_tasks do
    local task = redis.call('LPOP', key)
    if not task then
        break
    end
    table.insert(tasks, task)
end

return tasks
`)

const maxTask = 1000

type Fetcher[T any] struct {
    rdb    redis.UniversalClient
    logger *zerolog.Logger
}


func (f *Fetcher[T]) Fetch(ctx context.Context, keys []string) ([]T, error) {

    result, err := extractCommand.Run(ctx, f.rdb, keys, maxTask).Result()
    if err != nil {
        return nil, err
    }

    tasks := make([]T, 0)

    if results, ok := result.([]interface{}); ok && len(results) > 0 {
        for _, task := range results {
            outPtr := taskPool.Get()
            out, ok := outPtr.(*T)
            if !ok {
                out = new(T)
            }
            if tasksVal, valOk := task.(string); valOk {
                buf := bytePool.Get().([]byte)
                buf = buf[:0]
                buf = append(buf, tasksVal...)
                if err = json.Unmarshal(buf, out); err != nil {
                    taskPool.Put(out)
                    bytePool.Put(buf)

                    continue
                }

                tasks = append(tasks, *out)
                taskPool.Put(out)
                bytePool.Put(buf)
            }
        }
    }

    return tasks, nil
}
发布评论

评论列表(0)

  1. 暂无评论