I have a service that selects tasks from Redis
using a Lua
script. I can select up to 1000 tasks at a time. Every 250 milliseconds I will retrieve tasks, and there are several such modules in one service (each module is responsible for processing background tasks, conditionally 3 modules each will retrieve 1000 tasks. So I have a question, do I need to use ```sync.Pool`` for memory allocation and optimisation? Will it not cause other problems and will I be able to get a performance gain, since the tasks consist of 5-8 fields in the structure.
My code
var taskPool = sync.Pool{
New: func() interface{} {
// This will be overridden by the generic type T at runtime
var t interface{}
return &t
},
}
// bytePool is a sync.Pool for reusing []byte buffers for JSON unmarshaling.
var bytePool = sync.Pool{
New: func() interface{} {
return make([]byte, 0, 1024) // Initial capacity for typical JSON task size
},
}
var extractCommand = redis.NewScript(`
local key = KEYS[1]
local max_tasks = tonumber(ARGV[1])
local tasks = {}
for i = 1, max_tasks do
local task = redis.call('LPOP', key)
if not task then
break
end
table.insert(tasks, task)
end
return tasks
`)
const maxTask = 1000
type Fetcher[T any] struct {
rdb redis.UniversalClient
logger *zerolog.Logger
}
func (f *Fetcher[T]) Fetch(ctx context.Context, keys []string) ([]T, error) {
result, err := extractCommand.Run(ctx, f.rdb, keys, maxTask).Result()
if err != nil {
return nil, err
}
tasks := make([]T, 0)
if results, ok := result.([]interface{}); ok && len(results) > 0 {
for _, task := range results {
outPtr := taskPool.Get()
out, ok := outPtr.(*T)
if !ok {
out = new(T)
}
if tasksVal, valOk := task.(string); valOk {
buf := bytePool.Get().([]byte)
buf = buf[:0]
buf = append(buf, tasksVal...)
if err = json.Unmarshal(buf, out); err != nil {
taskPool.Put(out)
bytePool.Put(buf)
continue
}
tasks = append(tasks, *out)
taskPool.Put(out)
bytePool.Put(buf)
}
}
}
return tasks, nil
}