ter = array_value($config, 'filter'); $arr = array_value($filter, $type); $enable = array_value($arr, 'enable'); $wordarr = array_value($arr, 'keyword'); if (0 == $enable || empty($wordarr)) return FALSE; foreach ($wordarr as $_keyword) { if (!$_keyword) continue; $r = strpos(strtolower($keyword), strtolower($_keyword)); if (FALSE !== $r) { $error = $_keyword; return TRUE; } } return FALSE; } // return http://domain.com OR https://domain.com function url_prefix() { $http = ((isset($_SERVER['HTTPS']) && 'on' == $_SERVER['HTTPS']) || (isset($_SERVER['HTTP_X_FORWARDED_PROTO']) && $_SERVER['HTTP_X_FORWARDED_PROTO'] == 'https')) ? 'https://' : 'http://'; return $http . $_SERVER['HTTP_HOST']; } // 唯一身份ID function uniq_id() { return uniqid(substr(md5(microtime(true) . mt_rand(1000, 9999)), 8, 8)); } // 生成订单号 14位 function trade_no() { $trade_no = str_replace('.', '', microtime(1)); $strlen = mb_strlen($trade_no, 'UTF-8'); $strlen = 14 - $strlen; $str = ''; if ($strlen) { for ($i = 0; $i <= $strlen; $i++) { if ($i < $strlen) $str .= '0'; } } return $trade_no . $str; } // 生成订单号 16位 function trade_no_16() { $explode = explode(' ', microtime()); $trade_no = $explode[1] . mb_substr($explode[0], 2, 6, 'UTF-8'); return $trade_no; } // 当前年的天数 function date_year($time = NULL) { $time = intval($time) ? $time : time(); return date('L', $time) + 365; } // 当前年份中的第几天 function date_z($time = NULL) { $time = intval($time) ? $time : time(); return date('z', $time); } // 当前月份中的第几天,没有前导零 1 到 31 function date_j($time = NULL) { $time = intval($time) ? $time : time(); return date('j', $time); } // 当前月份中的第几天,有前导零的2位数字 01 到 31 function date_d($time = NULL) { $time = intval($time) ? $time : time(); return date('d', $time); } // 当前时间为星期中的第几天 数字表示 1表示星期一 到 7表示星期天 function date_w_n($time = NULL) { $time = intval($time) ? $time : time(); return date('N', $time); } // 当前日第几周 function date_d_w($time = NULL) { $time = intval($time) ? $time : time(); return date('W', $time); } // 当前几月 没有前导零1-12 function date_n($time = NULL) { $time = intval($time) ? $time : time(); return date('n', $time); } // 当前月的天数 function date_t($time = NULL) { $time = intval($time) ? $time : time(); return date('t', $time); } // 0 o'clock on the day function clock_zero() { return strtotime(date('Ymd')); } // 24 o'clock on the day function clock_twenty_four() { return strtotime(date('Ymd')) + 86400; } // 8点过期 / expired at 8 a.m. function eight_expired($time = NULL) { $time = intval($time) ? $time : time(); // 当前时间大于8点则改为第二天8点过期 $life = date('G') <= 8 ? (strtotime(date('Ymd')) + 28800 - $time) : clock_twenty_four() - $time + 28800; return $life; } // 24点过期 / expired at 24 a.m. function twenty_four_expired($time = NULL) { $time = intval($time) ? $time : time(); $twenty_four = clock_twenty_four(); $life = $twenty_four - $time; return $life; } /** * @param $url 提交地址 * @param string $post POST数组 / 空为GET获取数据 / $post='GET'获取连续跳转最终URL * @param string $cookie cookie * @param int $timeout 超时 * @param int $ms 设为1是毫秒 * @return mixed 返回数据 */ function https_request($url, $post = '', $cookie = '', $timeout = 30, $ms = 0) { if (empty($url)) return FALSE; if (version_compare(PHP_VERSION, '5.2.3', '<')) { $ms = 0; $timeout = 30; } is_array($post) and $post = http_build_query($post); // 没有安装curl 使用http的形式,支持post if (!extension_loaded('curl')) { //throw new Exception('server not install CURL'); if ($post) { return https_post($url, $post, $cookie, $timeout); } else { return http_get($url, $cookie, $timeout); } } is_array($cookie) and $cookie = http_build_query($cookie); $curl = curl_init(); // 返回执行结果,不输出 curl_setopt($curl, CURLOPT_RETURNTRANSFER, true); //php5.5跟php5.6中的CURLOPT_SAFE_UPLOAD的默认值不同 if (class_exists('\CURLFile')) { curl_setopt($curl, CURLOPT_SAFE_UPLOAD, true); } else { defined('CURLOPT_SAFE_UPLOAD') and curl_setopt($curl, CURLOPT_SAFE_UPLOAD, false); } // 设定请求的RUL curl_setopt($curl, CURLOPT_URL, $url); // 设定返回信息中包含响应信息头 if (ini_get('safe_mode') && ini_get('open_basedir')) { // $post参数必须为GET if ('GET' == $post) { // 安全模式时将头文件的信息作为数据流输出 curl_setopt($curl, CURLOPT_HEADER, true); // 安全模式采用连续抓取 curl_setopt($curl, CURLOPT_NOBODY, true); } } else { curl_setopt($curl, CURLOPT_HEADER, false); // 允许跳转10次 curl_setopt($curl, CURLOPT_MAXREDIRS, 10); // 使用自动跳转,返回最后的Location curl_setopt($curl, CURLOPT_FOLLOWLOCATION, true); } $ua1 = 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1'; $ua = empty($_SERVER["HTTP_USER_AGENT"]) ? $ua1 : $_SERVER["HTTP_USER_AGENT"]; curl_setopt($curl, CURLOPT_USERAGENT, $ua); // 兼容HTTPS if (FALSE !== stripos($url, 'https://')) { curl_setopt($curl, CURLOPT_SSL_VERIFYPEER, FALSE); curl_setopt($curl, CURLOPT_SSL_VERIFYHOST, FALSE); //ssl版本控制 //curl_setopt($curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1); curl_setopt($curl, CURLOPT_SSLVERSION, true); } $header = array('Content-type: application/x-www-form-urlencoded;charset=UTF-8', 'X-Requested-With: XMLHttpRequest'); $cookie and $header[] = "Cookie: $cookie"; curl_setopt($curl, CURLOPT_HTTPHEADER, $header); if ($post) { // POST curl_setopt($curl, CURLOPT_POST, true); // 自动设置Referer curl_setopt($curl, CURLOPT_AUTOREFERER, true); curl_setopt($curl, CURLOPT_POSTFIELDS, $post); } if ($ms) { curl_setopt($curl, CURLOPT_NOSIGNAL, true); // 设置毫秒超时 curl_setopt($curl, CURLOPT_TIMEOUT_MS, intval($timeout)); // 超时毫秒 } else { curl_setopt($curl, CURLOPT_TIMEOUT, intval($timeout)); // 秒超时 } //优先解析 IPv6 超时后IPv4 //curl_setopt($curl, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4); curl_setopt($curl, CURLOPT_ENCODING, 'gzip'); // 返回执行结果 $output = curl_exec($curl); // 有效URL,输出URL非URL页面内容 CURLOPT_RETURNTRANSFER 必须为false 'GET' == $post and $output = curl_getinfo($curl, CURLINFO_EFFECTIVE_URL); curl_close($curl); return $output; } function save_image($img) { $ch = curl_init(); // 设定请求的RUL curl_setopt($ch, CURLOPT_URL, $img); // 设定返回信息中包含响应信息头 启用时会将头文件的信息作为数据流输出 //curl_setopt($ch, CURLOPT_HEADER, false); //curl_setopt($ch, CURLOPT_USERAGENT, $_SERVER["HTTP_USER_AGENT"]); // true表示$html,false表示echo $html curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 10); curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); //curl_setopt($ch, CURLOPT_BINARYTRANSFER, 1); //curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 0); curl_setopt($ch, CURLOPT_ENCODING, 'gzip'); $output = curl_exec($ch); curl_close($ch); return $output; } // 计算字串宽度:剧中对齐(字体大小/字串内容/字体链接/背景宽度/倍数) function calculate_str_width($size, $str, $font, $width, $multiple = 2) { $box = imagettfbbox($size, 0, $font, $str); return ($width - $box[4] - $box[6]) / $multiple; } // 搜索目录下的文件 比对文件后缀 function search_directory($path) { if (is_dir($path)) { $paths = scandir($path); foreach ($paths as $val) { $sub_path = $path . '/' . $val; if ('.' == $val || '..' == $val) { continue; } else if (is_dir($sub_path)) { //echo '目录名:' . $val . '
'; search_directory($sub_path); } else { //echo ' 最底层文件: ' . $path . '/' . $val . '
'; $ext = strtolower(file_ext($sub_path)); if (in_array($ext, array('php', 'asp', 'jsp', 'cgi', 'exe', 'dll'), TRUE)) { echo '异常文件:' . $sub_path . '
'; } } } } } // 一维数组转字符串 $sign待签名字符串 $url为urlencode转码GET参数字符串 function array_to_string($arr, &$sign = '', &$url = '') { if (count($arr) != count($arr, 1)) throw new Exception('Does not support multi-dimensional array to string'); // 注销签名 unset($arr['sign']); // 排序 ksort($arr); reset($arr); // 转字符串做签名 $url = ''; $sign = ''; foreach ($arr as $key => $val) { if (empty($val) || is_array($val)) continue; $url .= $key . '=' . urlencode($val) . '&'; $sign .= $key . '=' . $val . '&'; } $url = substr($url, 0, -1); $url = htmlspecialchars($url); $sign = substr($sign, 0, -1); } // 私钥生成签名 function rsa_create_sign($data, $key, $sign_type = 'RSA') { if (!function_exists('openssl_sign')) throw new Exception('OpenSSL extension is not enabled'); if (!defined('OPENSSL_ALGO_SHA256')) throw new Exception('Only versions above PHP 5.4.8 support SHA256'); $key = wordwrap($key, 64, "\n", true); if (FALSE === $key) throw new Exception('Private Key Error'); $key = "-----BEGIN RSA PRIVATE KEY-----\n$key\n-----END RSA PRIVATE KEY-----"; if ('RSA2' == $sign_type) { openssl_sign($data, $sign, $key, OPENSSL_ALGO_SHA256); } else { openssl_sign($data, $sign, $key, OPENSSL_ALGO_SHA1); } // 加密 return base64_encode($sign); } // 公钥验证签名 function rsa_verify_sign($data, $sign, $key, $sign_type = 'RSA') { $key = wordwrap($key, 64, "\n", true); if (FALSE === $key) throw new Exception('Public Key Error'); $key = "-----BEGIN PUBLIC KEY-----\n$key\n-----END PUBLIC KEY-----"; // 签名正确返回1 签名不正确返回0 错误-1 if ('RSA2' == $sign_type) { $result = openssl_verify($data, base64_decode($sign), $key, OPENSSL_ALGO_SHA256); } else { $result = openssl_verify($data, base64_decode($sign), $key, OPENSSL_ALGO_SHA1); } return $result === 1; } // Array to xml array('appid' => 'appid', 'code' => 'success') function array_to_xml($arr) { if (!is_array($arr) || empty($arr)) throw new Exception('Array Error'); $xml = ""; foreach ($arr as $key => $val) { if (is_numeric($val)) { $xml .= "<" . $key . ">" . $val . ""; } else { $xml .= "<" . $key . ">"; } } $xml .= ""; return $xml; } // Xml to array function xml_to_array($xml) { if (!$xml) throw new Exception('XML error'); $old = libxml_disable_entity_loader(true); // xml解析 $result = (array)simplexml_load_string($xml, null, LIBXML_NOCDATA | LIBXML_COMPACT); // 恢复旧值 if (FALSE === $old) libxml_disable_entity_loader(false); return $result; } // 逐行读取 function well_import($file) { if ($handle = fopen($file, 'r')) { while (!feof($handle)) { yield trim(fgets($handle)); } fclose($handle); } } // 计算总行数 function well_import_total($file, $key = 'well_import_total') { static $cache = array(); if (isset($cache[$key])) return $cache[$key]; $count = cache_get($key); if (NULL === $count) { $count = 0; $globs = well_import($file); while ($globs->valid()) { ++$count; $globs->next(); // 指向下一个 } $count and cache_set($key, $count, 300); } return $cache[$key] = $count; } $g_dir_file = FALSE; function well_search_dir($path) { global $g_dir_file; FALSE === $g_dir_file and $g_dir_file = array(); if (is_dir($path)) { $paths = scandir($path); foreach ($paths as $val) { $sub_path = $path . '/' . $val; if ('.' == $val || '..' == $val) { continue; } else if (is_dir($sub_path)) { well_search_dir($sub_path); } else { $g_dir_file[] = $sub_path; } } } return $g_dir_file; } ?>c# - How can I track individual states for each item in a collection using a MassTransit State Machine? - Stack Overflow
最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

c# - How can I track individual states for each item in a collection using a MassTransit State Machine? - Stack Overflow

programmeradmin2浏览0评论

I'm trying to build a super simple grid trading strategy using MassTransit Saga State Machine, and I'm running into a challenge when it comes to tracking the state for each individual grid level.

The idea of the grid strategy is as follows:

  1. Set upper and lower limits.
  2. Split range into a grid (either arithmetic or geometric spacing).
  3. Ignore the closest level to the current price.
  4. Place buy orders below and sell orders above the current price.
  5. When a sell order executes, place a new buy order one level lower.
  6. When a buy order executes, place a new order one level higher.
  7. Repeat indefinitely.

Problem

Currently, I'm modeling the entire grid strategy as a single state machine with just a CurrentState for the entire grid (such as Initialized, CrossingUp, or CrossingDown). However, what I really want is to have individual states for each grid level. Specifically:

  • For each grid level, there should be a state (Inactive, Buy, or Sell).
  • If a grid level is closest to the current price, the state should be Inactive.
  • If a grid level is below the current price, the state should be Buy.
  • If a grid level is above the current price, the state should be Sell.

Strategy Input Prameters

  • Lower Limit: The lower boundary of the grid.
  • Upper Limit: The upper boundary of the grid.
  • Grid Count: The number of grid levels.

The grid can be generated using either arithmetic or geometric spacing. Below is a Python version of how the grid levels are calculated:

def get_grids(lower_limit, upper_limit, grid_count, tp="arth"):
    if tp == "arth":
        grids = np.linspace(lower_limit, upper_limit, grid_count + 1)
    elif tp == "geom":
        grids = np.geomspace(lower_limit, upper_limit, grid_count + 1)
    else:
        print("not right range type")
    return grids

In C#, the arithmetic grid levels can be calculated as follows:

var step = (upperLimit - lowerLimit) / gridCount;

List<decimal> gridLevels = new List<decimal>();
for (var i = 0; i <= gridCount; i++)
{
    var price = lowerLimit + step * i;
    gridLevels.Add(price);
}

What I've tried (Minimal reproducible example)

var builder = Host.CreateApplicationBuilder(args);

builder.AddEventBus();

var host = builder.Build();

await host.StartAsync();

var bus = host.Services.GetRequiredService<IBus>();

var stateMachine = new GridStateMachine();

var lowerLimit = 25_000m;
var upperLimit = 35_000m;
var gridCount = 20;

var step = (upperLimit - lowerLimit) / gridCount;

List<decimal> gridLevels = [];
for (var i = 0; i <= gridCount; i++)
{
    var price = lowerLimit + step * i;
    gridLevels.Add(price);
}

await bus.Publish<GridInitialized>(new
{
    CorrelationId = Guid.NewGuid(),
    Price = 27000m,
    GridLevels = gridLevels
});

Console.ReadLine();

await host.StopAsync();

public class GridState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; } = null!;
    
    public decimal CurrentPrice { get; set; }
    public List<decimal> GridLevels { get; set; } = [];
    public DateTime LastUpdate { get; set; }
}

public class GridStateMachine : MassTransitStateMachine<GridState>
{
    public GridStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Event(() => Initialized, e => e.CorrelateById(m => m.Message.CorrelationId));
        Event(() => PriceCrossedUp, e => e.CorrelateById(m => m.Message.CorrelationId));
        Event(() => PriceCrossedDown, e => e.CorrelateById(m => m.Message.CorrelationId));
      
        Initially(
            When(Initialized)
                .IfElse(context => IsClosestLevel(context.Message.Price),
                    then => then.TransitionTo(Inactive),
                    orElse => orElse.IfElse(context => IsLowerLevel(context.Message.Price),
                        then => then.TransitionTo(Buy).Then(context => PlaceBuyOrder(context.Message)),
                        orElse2 => orElse2.TransitionTo(Sell).Then(context => PlaceSellOrder(context.Message))
                    )
                )
        );
    }

    public State Inactive { get; private set; } = null!;
    public State Buy { get; private set; } = null!;
    public State Sell { get; private set; } = null!;

    public Event<GridInitialized> Initialized { get; private set; } = null!;
    public Event<PriceCrossedUp> PriceCrossedUp { get; private set; } = null!;
    public Event<PriceCrossedDown> PriceCrossedDown { get; private set; } = null!;
    
    private bool IsClosestLevel(decimal price)
    {
        Console.WriteLine($"Is {price} the closest level?");
        return false;
    }

    private bool IsLowerLevel(decimal price)
    {
        Console.WriteLine($"Is {price} lower level?");
        return false;
    }

    private bool IsHigherLevel(decimal price)
    {
        Console.WriteLine($"Is {price} higher level?");
        return false;
    }

    private void PlaceBuyOrder(GridInitialized request)
    {
        Console.WriteLine("Placing a buy order...");
    }

    private void PlaceSellOrder(GridInitialized request)
    {
        Console.WriteLine("Placing a sell order...");
    }
}

public class GridInitialized
{
    public Guid CorrelationId { get; set; }
    public decimal Price { get; set; }
    public List<decimal> GridLevels { get; set; } = [];
}

public class PriceCrossedUp
{
    public Guid CorrelationId { get; set; }
}

public class PriceCrossedDown
{
    public Guid CorrelationId { get; set; }
}

public static class ServiceCollectionExtensions
{
    public static IHostApplicationBuilder AddEventBus(
        this IHostApplicationBuilder builder,
        Action<IBusRegistrationConfigurator>? massTransitConfiguration = null) =>
        AddEventBus<IBus>(builder, massTransitConfiguration);
    
    public static IHostApplicationBuilder AddEventBus<TBus>(
        this IHostApplicationBuilder builder,
        Action<IBusRegistrationConfigurator>? massTransitConfiguration = null)
        where TBus : class, IBus
    {
        ArgumentNullException.ThrowIfNull(builder);
        
        builder.Services.AddMassTransit<TBus>(x =>
        {
            x.SetKebabCaseEndpointNameFormatter();
            x.SetInMemorySagaRepositoryProvider();

            var entryAssembly = Assembly.GetEntryAssembly();
            x.AddSagaStateMachines(entryAssembly);
            x.AddSagas(entryAssembly);
            x.AddActivities(entryAssembly);

            massTransitConfiguration?.Invoke(x);

            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.Host("localhost", "/", h =>
                {
                    h.Username("guest");
                    h.Password("guest");
                });

                cfg.ConfigureEndpoints(context);
            });
        });

        return builder;
    }
}

I’m struggling to figure out how to manage individual states for each grid level in MassTransit. Since a grid strategy can have many levels, I need to have different states for each grid level instead of one global state for the entire grid.

How can I do that?

Edit

Here is my current progress:

// Program.cs
var builder = Host.CreateApplicationBuilder(args);

builder.Services.AddSingleton<IGridLevelTracker, GridLevelTracker>();
builder.AddEventBus(options =>
{
    options.AddConsumer<CurrentPriceUpdatedConsumer>();
});

var host = builder.Build();

await host.StartAsync();

var bus = host.Services.GetRequiredService<IBus>();
var gridLevelTracker = host.Services.GetRequiredService<IGridLevelTracker>();

// Strategy input parameters
var lowerLimit = 25_000m;
var upperLimit = 35_000m;
var gridCount = 20;

// Calculate arithmetic progression and create grid levels
var step = (upperLimit - lowerLimit) / gridCount;
for (var i = 0; i <= gridCount; i++)
{
    var price = lowerLimit + step * i;
    var gridLevelId = Guid.NewGuid();
    
    // Register the grid level with our tracker
    gridLevelTracker.RegisterGridLevel(gridLevelId, price);
    
    // Initialize the grid level saga
    await bus.Publish(new GridLevelInitialized
    {
        GridLevelId = gridLevelId,
        Price = price
    });
}

// Simulate a current price update
var simulatedCurrentPrice = 29_123m;
Console.WriteLine($"\nUpdating current price to {simulatedCurrentPrice}\n");
await bus.Publish(new CurrentPriceUpdated
{
    CurrentPrice = simulatedCurrentPrice
});

Console.WriteLine("\nPress Enter to exit...");
Console.ReadLine();

await host.StopAsync();

// GridLevelState.cs
public class GridLevelState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; } // OrderId
    public string CurrentState { get; set; } = null!; // Inactive, Buy, Sell
    
    public decimal Price { get; set; }
    public string? OrderId { get; set; }
}

public class GridLevelStateMachine : MassTransitStateMachine<GridLevelState>
{
    public GridLevelStateMachine()
    {
        Event(() => GridLevelInitialized, x => x.CorrelateById(m => m.Message.GridLevelId));
        Event(() => GridLevelShouldBeInactive, x => x.CorrelateById(m => m.Message.GridLevelId));
        Event(() => GridLevelShouldBeBuy, x => x.CorrelateById(m => m.Message.GridLevelId));
        Event(() => GridLevelShouldBeSell, x => x.CorrelateById(m => m.Message.GridLevelId));
     
        InstanceState(x => x.CurrentState);
        
        Initially(
            When(GridLevelInitialized)
                .Then(context =>
                {
                    context.Saga.Price = context.Message.Price;
                    Console.WriteLine($"Grid level {context.Saga.CorrelationId} initialized at price {context.Saga.Price}");
                })
                .TransitionTo(Pending));
        
            During(Pending,
                When(GridLevelShouldBeInactive)
                    .Then(context =>
                    {
                        Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} remains in Inactive state"); 
                    })
                    .TransitionTo(Inactive),
                When(GridLevelShouldBeBuy)
                    .Then(context => { 
                        Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} transitioning to Buy state"); 
                        context.Saga.OrderId = PlaceBuyOrder(context.Saga.Price);
                    })
                    .TransitionTo(Buy),
                When(GridLevelShouldBeSell)
                    .Then(context => { 
                        Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} transitioning to Sell state"); 
                        context.Saga.OrderId = PlaceSellOrder(context.Saga.Price);
                    })
                    .TransitionTo(Sell));
            
            During(Buy,
                When(GridLevelShouldBeInactive)
                    .Then(context => { 
                        Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} transitioning to Inactive state"); 
                        CancelBuyOrder(context.Saga.Price);
                    })
                    .TransitionTo(Inactive),
                When(GridLevelShouldBeSell)
                    .Then(context => { 
                        Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} transitioning to Sell state"); 
                        CancelBuyOrder(context.Saga.Price);
                        context.Saga.OrderId = PlaceSellOrder(context.Saga.Price);
                    })
                    .TransitionTo(Sell));
            
            During(Sell,
                When(GridLevelShouldBeInactive)
                    .Then(context => { 
                        Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} transitioning to Inactive state"); 
                        CancelSellOrder(context.Saga.Price);
                    })
                    .TransitionTo(Inactive),
                When(GridLevelShouldBeBuy)
                    .Then(context => { 
                        Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} transitioning to Buy state"); 
                        CancelSellOrder(context.Saga.Price);
                        context.Saga.OrderId = PlaceBuyOrder(context.Saga.Price);
                    })
                    .TransitionTo(Buy));
        
        SetCompletedWhenFinalized();
    }
    
    public State Pending { get; private set; } = null!;
    public State Inactive { get; private set; } = null!;
    public State Buy { get; private set; } = null!;
    public State Sell { get; private set; } = null!;
    
    public Event<GridLevelInitialized> GridLevelInitialized { get; private set; } = null!;
    public Event<GridLevelShouldBeInactive> GridLevelShouldBeInactive { get; private set; } = null!;
    public Event<GridLevelShouldBeBuy> GridLevelShouldBeBuy { get; private set; } = null!;
    public Event<GridLevelShouldBeSell> GridLevelShouldBeSell { get; private set; } = null!;
    
    private string PlaceBuyOrder(decimal price)
    {
        Console.WriteLine($"Placing buy order at {price}");
        return Guid.NewGuid().ToString();
    }
        
    private string PlaceSellOrder(decimal price)
    {
        Console.WriteLine($"Placing sell order at {price}");
        return Guid.NewGuid().ToString();
    }
    
    private void CancelBuyOrder(decimal price)
    {
        Console.WriteLine($"Cancelling buy order at {price}");
    }
        
    private void CancelSellOrder(decimal price)
    {
        Console.WriteLine($"Cancelling sell order at {price}");
    }
}

// Messages.cs
public class GridLevelInitialized
{
    public Guid GridLevelId { get; init; }
    public decimal Price { get; init; }
}

public class GridLevelShouldBeInactive
{
    public Guid GridLevelId { get; init; }
    public decimal Price { get; init; }
}

public class GridLevelShouldBeBuy
{
    public Guid GridLevelId { get; init; }
    public decimal Price { get; init; }
}

public class GridLevelShouldBeSell
{
    public Guid GridLevelId { get; init; }
    public decimal Price { get; init; }
}

public class CurrentPriceUpdated
{
    public decimal CurrentPrice { get; init; }
}

// GridLevelTracker.cs
public interface IGridLevelTracker
{
    void RegisterGridLevel(Guid gridLevelId, decimal price);
    
    List<Guid> GetAllGridLevelIds();
    
    void UpdateCurrentPrice(decimal currentPrice);
    
    (Guid? ClosestLevelId, List<Guid> LowerLevelIds, List<Guid> HigherLevelIds) GetLevelsRelativeToCurrentPrice();
}

public class GridLevelTracker : IGridLevelTracker
{
    private readonly ConcurrentDictionary<Guid, decimal> _gridLevels = new();
    private decimal _currentPrice;

    public void RegisterGridLevel(Guid gridLevelId, decimal price)
    {
        _gridLevels.TryAdd(gridLevelId, price);
    }

    public List<Guid> GetAllGridLevelIds()
    {
        return _gridLevels.Keys.ToList();
    }

    public void UpdateCurrentPrice(decimal currentPrice)
    {
        _currentPrice = currentPrice;
    }

    public (Guid? ClosestLevelId, List<Guid> LowerLevelIds, List<Guid> HigherLevelIds) GetLevelsRelativeToCurrentPrice()
    {
        if (_gridLevels.IsEmpty)
            return (null, [], []);

        var levels = _gridLevels.ToList();
            
        // Find closest level to current price
        var closestLevel = levels
            .OrderBy(x => Math.Abs(x.Value - _currentPrice))
            .First();
        
        // Find levels below current price, sorted by price in ascending order
        var lowerLevels = levels
            .Where(x => x.Value < _currentPrice && x.Key != closestLevel.Key)
            .OrderBy(x => x.Value)  // Sort by price ascending
            .Select(x => x.Key)
            .ToList();
        
        // Find levels above current price, sorted by price in ascending order
        var higherLevels = levels
            .Where(x => x.Value > _currentPrice && x.Key != closestLevel.Key)
            .OrderBy(x => x.Value)  // Sort by price ascending
            .Select(x => x.Key)
            .ToList();
        
        return (closestLevel.Key, lowerLevels, higherLevels);
    }
}

// CurrentPriceUpdatedConsumer.cs
public class CurrentPriceUpdatedConsumer(IGridLevelTracker gridLevelTracker, IBus bus) : IConsumer<CurrentPriceUpdated>
{
    public async Task Consume(ConsumeContext<CurrentPriceUpdated> context)
    {
        var currentPrice = context.Message.CurrentPrice;
            
        // Update tracker with current price
        gridLevelTracker.UpdateCurrentPrice(currentPrice);
            
        // Get grid levels relative to current price
        var (closestLevelId, lowerLevelIds, higherLevelIds) = gridLevelTracker.GetLevelsRelativeToCurrentPrice();
            
        // Send events to all levels
        foreach (var levelId in lowerLevelIds)
        {
            await bus.Publish(new GridLevelShouldBeBuy { GridLevelId = levelId });
        }
        
        if (closestLevelId.HasValue)
        {
            await bus.Publish(new GridLevelShouldBeInactive { GridLevelId = closestLevelId.Value });
        }
        
        foreach (var levelId in higherLevelIds)
        {
            await bus.Publish(new GridLevelShouldBeSell { GridLevelId = levelId });
        }
    }
}

// ServiceCollectionExtensions.cs (remains the same)
public static class ServiceCollectionExtensions
{
    public static IHostApplicationBuilder AddEventBus(
        this IHostApplicationBuilder builder,
        Action<IBusRegistrationConfigurator>? massTransitConfiguration = null) =>
        AddEventBus<IBus>(builder, massTransitConfiguration);
    
    public static IHostApplicationBuilder AddEventBus<TBus>(
        this IHostApplicationBuilder builder,
        Action<IBusRegistrationConfigurator>? massTransitConfiguration = null)
        where TBus : class, IBus
    {
        ArgumentNullException.ThrowIfNull(builder);
        
        builder.Services.AddMassTransit<TBus>(x =>
        {
            x.SetKebabCaseEndpointNameFormatter();
            x.SetInMemorySagaRepositoryProvider();

            var entryAssembly = Assembly.GetEntryAssembly();
            x.AddSagaStateMachines(entryAssembly);
            x.AddSagas(entryAssembly);
            x.AddActivities(entryAssembly);

            massTransitConfiguration?.Invoke(x);

            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.Host("localhost", "/", h =>
                {
                    h.Username("guest");
                    h.Password("guest");
                });

                cfg.ConfigureEndpoints(context);
            });
        });

        return builder;
    }
}

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论