最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Locust distributed mode: "Unknown message type '_distributor_request' from worker.."

programmeradmin0浏览0评论

I am using Locus in distributed mode with 1 master and n workers to run a load test.

I am also using the locust-plugins library to utilize its Distributor utility. My goal is to preallocate one unique resource per user (from a list of “items”) and ensure each virtual user gets a unique item during the test. Here's a simplified outline of my setup:

Master node: Generates a list of items (one for each user) at test start. It then broadcasts this list to all workers using a custom message. Workers: Upon receiving the items message, each worker stores the list and prepares to use it for its users. I use a Distributor instance on each worker to serve the models to the user tasks.

Master sending the resource list (at test start):

@events.test_start.add_listener

def on_test_start(environment, **kwargs):
    if environment.runner is not None and environment.runner.master:
        # Master generates the list of resources for all users
        models = generate_items(total_user_count)
        # Send the list to all workers
        environment.runner.send_message("items_models", {"items_models": models})

Worker receiving and storing the list:

items_models_dist = None
        
def set_items_models(env, msg, **kwargs):
    global items_models_dist
    received_list = msg.data["items_models"]
    env.logger.info(f"Worker received {len(received_list)} items models. Proceeding...")
    items_models_dist = Distributor(env, iter(received_list))
            
    

@events.init.add_listener
    def on_worker_init(environment, **kwargs):
        if isinstance(environment.runner, runners.WorkerRunner):
            environment.runner.register_message("items_models", set_items_models)

User task using the Distributor:

class MyUser(HttpUser):
    @task
    def use_model(self):
        model = next(items_models_dist)

When I hit model = next(items_models_dist), I am getting: "Unknown message type received from worker worker.some.id (index 0): _distributor_request" and the user halts.

I did not explicitly call or register any handler for "_distributor_request" in my code. I assumed the plugin would take care of it. The warning and lack of user spawn suggests something is fundamentally wrong with how I’m using the plugin in distributed mode. Help?

Locust version: 2.32.3 Python version: 3.12.3 locust-plugins version: 4.5.3

I am running in distribution mode on one machine with command line: locust -f test.py --processes -1

I am using Locus in distributed mode with 1 master and n workers to run a load test.

I am also using the locust-plugins library to utilize its Distributor utility. My goal is to preallocate one unique resource per user (from a list of “items”) and ensure each virtual user gets a unique item during the test. Here's a simplified outline of my setup:

Master node: Generates a list of items (one for each user) at test start. It then broadcasts this list to all workers using a custom message. Workers: Upon receiving the items message, each worker stores the list and prepares to use it for its users. I use a Distributor instance on each worker to serve the models to the user tasks.

Master sending the resource list (at test start):

@events.test_start.add_listener

def on_test_start(environment, **kwargs):
    if environment.runner is not None and environment.runner.master:
        # Master generates the list of resources for all users
        models = generate_items(total_user_count)
        # Send the list to all workers
        environment.runner.send_message("items_models", {"items_models": models})

Worker receiving and storing the list:

items_models_dist = None
        
def set_items_models(env, msg, **kwargs):
    global items_models_dist
    received_list = msg.data["items_models"]
    env.logger.info(f"Worker received {len(received_list)} items models. Proceeding...")
    items_models_dist = Distributor(env, iter(received_list))
            
    

@events.init.add_listener
    def on_worker_init(environment, **kwargs):
        if isinstance(environment.runner, runners.WorkerRunner):
            environment.runner.register_message("items_models", set_items_models)

User task using the Distributor:

class MyUser(HttpUser):
    @task
    def use_model(self):
        model = next(items_models_dist)

When I hit model = next(items_models_dist), I am getting: "Unknown message type received from worker worker.some.id (index 0): _distributor_request" and the user halts.

I did not explicitly call or register any handler for "_distributor_request" in my code. I assumed the plugin would take care of it. The warning and lack of user spawn suggests something is fundamentally wrong with how I’m using the plugin in distributed mode. Help?

Locust version: 2.32.3 Python version: 3.12.3 locust-plugins version: 4.5.3

I am running in distribution mode on one machine with command line: locust -f test.py --processes -1

Share Improve this question asked Apr 1 at 11:25 PloniStackerPloniStacker 6689 silver badges32 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

You're supposed to initialize the Distributor object on both master and worker instances, like in the example here: https://github/SvenskaSpel/locust-plugins/blob/602d68e7d4bd77d3a0d2c20afa05b71f7557b547/examples/distributor_ex.py#L22 (just leave the iterator argument set to None on the workers)

There is no need to "manually" register any custom message handlers or call send_message if you're using Distributor.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论
ok 不同模板 switch ($forum['model']) { /*case '0': include _include(APP_PATH . 'view/htm/read.htm'); break;*/ default: include _include(theme_load('read', $fid)); break; } } break; case '10': // 主题外链 / thread external link http_location(htmlspecialchars_decode(trim($thread['description']))); break; case '11': // 单页 / single page $attachlist = array(); $imagelist = array(); $thread['filelist'] = array(); $threadlist = NULL; $thread['files'] > 0 and list($attachlist, $imagelist, $thread['filelist']) = well_attach_find_by_tid($tid); $data = data_read_cache($tid); empty($data) and message(-1, lang('data_malformation')); $tidlist = $forum['threads'] ? page_find_by_fid($fid, $page, $pagesize) : NULL; if ($tidlist) { $tidarr = arrlist_values($tidlist, 'tid'); $threadlist = well_thread_find($tidarr, $pagesize); // 按之前tidlist排序 $threadlist = array2_sort_key($threadlist, $tidlist, 'tid'); } $allowpost = forum_access_user($fid, $gid, 'allowpost'); $allowupdate = forum_access_mod($fid, $gid, 'allowupdate'); $allowdelete = forum_access_mod($fid, $gid, 'allowdelete'); $access = array('allowpost' => $allowpost, 'allowupdate' => $allowupdate, 'allowdelete' => $allowdelete); $header['title'] = $thread['subject']; $header['mobile_link'] = $thread['url']; $header['keywords'] = $thread['keyword'] ? $thread['keyword'] : $thread['subject']; $header['description'] = $thread['description'] ? $thread['description'] : $thread['brief']; $_SESSION['fid'] = $fid; if ($ajax) { empty($conf['api_on']) and message(0, lang('closed')); $apilist['header'] = $header; $apilist['extra'] = $extra; $apilist['access'] = $access; $apilist['thread'] = well_thread_safe_info($thread); $apilist['thread_data'] = $data; $apilist['forum'] = $forum; $apilist['imagelist'] = $imagelist; $apilist['filelist'] = $thread['filelist']; $apilist['threadlist'] = $threadlist; message(0, $apilist); } else { include _include(theme_load('single_page', $fid)); } break; default: message(-1, lang('data_malformation')); break; } ?>