最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

go - golang grpc server and grpc ui fx modules - Stack Overflow

programmeradmin9浏览0评论

I'm trying to set up FX modules, but I have doubts and can't find a way forward. Basically, I have one module for the server:

    func NewGRPCServer(
    lc fx.Lifecycle, log *zap.Logger, tracer trace.Tracer,
    srvsInterceptors []grpc.UnaryServerInterceptor, serverOpt []grpc.ServerOption,
) *grpc.Server {
    defaultRecoveryHandler := func(ctx context.Context, r interface{}) (err error) {
        logger.FromContext(ctx).Error("recovered from panic", zap.Any("panic", r), zap.Stack("stacktrace"))

        return status.Error(codes.Internal, "unexpected error")
    }

    interceptors := append([]grpc.UnaryServerInterceptor{
        LoggerToContextInterceptor(log),
        TracerToContextInterceptor(tracer),
        grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandlerContext(defaultRecoveryHandler)),
    }, srvsInterceptors...)

    otelHandler := otelgrpc.NewServerHandler(
        otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
    )

    serverOpts := []grpc.ServerOption{
        grpc.ChainUnaryInterceptor(interceptors...),
        grpc.StatsHandler(otelHandler),
        grpc.KeepaliveEnforcementPolicy(
            keepalive.EnforcementPolicy{
                MinTime:             60 * time.Second,
                PermitWithoutStream: true,
            }),
        grpc.KeepaliveParams(
            keepalive.ServerParameters{
                Time:    60 * time.Second,
                Timeout: 10 * time.Second,
            },
        ),
    }

    serverOpts = append(serverOpts, serverOpt...)

    server := grpc.NewServer(serverOpts...)

    grpc_health_v1.RegisterHealthServer(server, health.NewServer())
    reflection.Register(server)

    return server
}

// NewListener creates a new network listener for the gRPC server using the gRPC server address parsed from the config.
func NewListener(cfg Config) (net.Listener, error) {
    lis, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.GRPC))
    if err != nil {
        return nil, fmt.Errorf("dial connection: %w", err)
    }

    return lis, nil
}

func GRPCModule() fx.Option {
    return fx.Module(
        "grpc",
        fx.Provide(
            fx.Annotate(
                NewGRPCServer,
                fx.ParamTags(``, ``, ``, `optional:"true"`, `optional:"true"`),
            ),
        ),
        fx.Invoke(func(lc fx.Lifecycle, server *grpc.Server, config Config, log *zap.Logger) {
            lc.Append(fx.Hook{
                OnStart: func(ctx context.Context) error {
                    lis, err := NewListener(config)
                    if err != nil {
                        return err
                    }

                    go func(srv *grpc.Server, logger *zap.Logger) {
                        logger.Info("Starting gRPC server")
                        if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
                            logger.Error("gRPC server failed", zap.Error(err))
                        }
                    }(server, log)

                    return nil
                },
                OnStop: func(ctx context.Context) error {
                    server.GracefulStop()
                    return nil
                },
            })
        }),
    )
}

And a module for UI:

func NewGRPCUIServer(
    lc fx.Lifecycle,
    logger *zap.Logger,
    tracer trace.Tracer,
    config Config,
) (*http.Server, error) {
    logger.Info("enter on new grpc ui server.")

    rpcGrpcHost := fmt.Sprintf("0.0.0.0:%d", config.GRPC)
    keepAliveOpt := grpc.WithKeepaliveParams(keepalive.ClientParameters{
        Time:                60 * time.Second,
        Timeout:             10 * time.Second,
        PermitWithoutStream: true,
    })

    cc, err := grpc.NewClient(
        rpcGrpcHost,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
        keepAliveOpt,
    )
    if err != nil {
        logger.Error("Failed to connect to gRPC server for UI", zap.Error(err))
        return nil, err
    }

    h, err := standalone.HandlerViaReflection(context.Background(), cc, rpcGrpcHost)
    if err != nil {
        logger.Error("Failed to create UI handler", zap.Error(err))
        return nil, err
    }

    mux := http.NewServeMux()
    mux.Handle("/grpc-ui/", http.StripPrefix("/grpc-ui", h))

    return httplib.NewHTTPServerFx(
        lc,
        httplib.Config{
            ServerAddr:         fmt.Sprintf(":%d", config.UI),
            ServerReadTimeout:  15 * time.Second,
            ServerWriteTimeout: 15 * time.Second,
        },
        logger,
        tracer,
        mux,
    )
}

type GRPCUIParams struct {
    fx.In

    WebServer *http.Server `name:"grpc-ui-server"`
    Logger    *zap.Logger
}

func GRPCUIModule() fx.Option {
    return fx.Module(
        "x:ui",
        fx.Provide(
            NewGRPCUIServer,
            fx.Annotate(
                NewGRPCUIServer,
                fx.ParamTags(``, ``, ``, ``, ``),
                fx.ResultTags(`name:"grpc-ui-server"`),
            ),
        ),
        fx.Invoke(func(params GRPCUIParams) {
            params.Logger.Info("gRPC UI Server initialized:", zap.String("address", params.WebServer.Addr))
        }),
    )
}

But for some reason it's failing at handlerViaReflection:

h, err := standalone.HandlerViaReflection(context.Background(), cc, rpcGrpcHost)
if err != nil {
    logger.Error("Failed to create UI handler", zap.Error(err))
    return nil, err
}

And it's giving an error because the gRPC server hasn't started yet. I placed a breakpoint here in the server module:

fx.Invoke(func(lc fx.Lifecycle, server *grpc.Server, config Config, log *zap.Logger) {
    lc.Append(fx.Hook{
        OnStart: func(ctx context.Context) error {
            lis, err := NewListener(config)
            if err != nil {
                return err
            }

            go func(srv *grpc.Server, logger *zap.Logger) {
                logger.Info("Starting gRPC server")
                if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
                    logger.Error("gRPC server failed", zap.Error(err))
                }
            }(server, log)

            return nil
        },

And it doesn't enter; it always goes to the UI module first. It creates the server: NewGRPCServer, but never initializes it. Does anyone know how I can solve this?

I'm trying to set up FX modules, but I have doubts and can't find a way forward. Basically, I have one module for the server:

    func NewGRPCServer(
    lc fx.Lifecycle, log *zap.Logger, tracer trace.Tracer,
    srvsInterceptors []grpc.UnaryServerInterceptor, serverOpt []grpc.ServerOption,
) *grpc.Server {
    defaultRecoveryHandler := func(ctx context.Context, r interface{}) (err error) {
        logger.FromContext(ctx).Error("recovered from panic", zap.Any("panic", r), zap.Stack("stacktrace"))

        return status.Error(codes.Internal, "unexpected error")
    }

    interceptors := append([]grpc.UnaryServerInterceptor{
        LoggerToContextInterceptor(log),
        TracerToContextInterceptor(tracer),
        grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandlerContext(defaultRecoveryHandler)),
    }, srvsInterceptors...)

    otelHandler := otelgrpc.NewServerHandler(
        otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
    )

    serverOpts := []grpc.ServerOption{
        grpc.ChainUnaryInterceptor(interceptors...),
        grpc.StatsHandler(otelHandler),
        grpc.KeepaliveEnforcementPolicy(
            keepalive.EnforcementPolicy{
                MinTime:             60 * time.Second,
                PermitWithoutStream: true,
            }),
        grpc.KeepaliveParams(
            keepalive.ServerParameters{
                Time:    60 * time.Second,
                Timeout: 10 * time.Second,
            },
        ),
    }

    serverOpts = append(serverOpts, serverOpt...)

    server := grpc.NewServer(serverOpts...)

    grpc_health_v1.RegisterHealthServer(server, health.NewServer())
    reflection.Register(server)

    return server
}

// NewListener creates a new network listener for the gRPC server using the gRPC server address parsed from the config.
func NewListener(cfg Config) (net.Listener, error) {
    lis, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.GRPC))
    if err != nil {
        return nil, fmt.Errorf("dial connection: %w", err)
    }

    return lis, nil
}

func GRPCModule() fx.Option {
    return fx.Module(
        "grpc",
        fx.Provide(
            fx.Annotate(
                NewGRPCServer,
                fx.ParamTags(``, ``, ``, `optional:"true"`, `optional:"true"`),
            ),
        ),
        fx.Invoke(func(lc fx.Lifecycle, server *grpc.Server, config Config, log *zap.Logger) {
            lc.Append(fx.Hook{
                OnStart: func(ctx context.Context) error {
                    lis, err := NewListener(config)
                    if err != nil {
                        return err
                    }

                    go func(srv *grpc.Server, logger *zap.Logger) {
                        logger.Info("Starting gRPC server")
                        if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
                            logger.Error("gRPC server failed", zap.Error(err))
                        }
                    }(server, log)

                    return nil
                },
                OnStop: func(ctx context.Context) error {
                    server.GracefulStop()
                    return nil
                },
            })
        }),
    )
}

And a module for UI:

func NewGRPCUIServer(
    lc fx.Lifecycle,
    logger *zap.Logger,
    tracer trace.Tracer,
    config Config,
) (*http.Server, error) {
    logger.Info("enter on new grpc ui server.")

    rpcGrpcHost := fmt.Sprintf("0.0.0.0:%d", config.GRPC)
    keepAliveOpt := grpc.WithKeepaliveParams(keepalive.ClientParameters{
        Time:                60 * time.Second,
        Timeout:             10 * time.Second,
        PermitWithoutStream: true,
    })

    cc, err := grpc.NewClient(
        rpcGrpcHost,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
        keepAliveOpt,
    )
    if err != nil {
        logger.Error("Failed to connect to gRPC server for UI", zap.Error(err))
        return nil, err
    }

    h, err := standalone.HandlerViaReflection(context.Background(), cc, rpcGrpcHost)
    if err != nil {
        logger.Error("Failed to create UI handler", zap.Error(err))
        return nil, err
    }

    mux := http.NewServeMux()
    mux.Handle("/grpc-ui/", http.StripPrefix("/grpc-ui", h))

    return httplib.NewHTTPServerFx(
        lc,
        httplib.Config{
            ServerAddr:         fmt.Sprintf(":%d", config.UI),
            ServerReadTimeout:  15 * time.Second,
            ServerWriteTimeout: 15 * time.Second,
        },
        logger,
        tracer,
        mux,
    )
}

type GRPCUIParams struct {
    fx.In

    WebServer *http.Server `name:"grpc-ui-server"`
    Logger    *zap.Logger
}

func GRPCUIModule() fx.Option {
    return fx.Module(
        "x:ui",
        fx.Provide(
            NewGRPCUIServer,
            fx.Annotate(
                NewGRPCUIServer,
                fx.ParamTags(``, ``, ``, ``, ``),
                fx.ResultTags(`name:"grpc-ui-server"`),
            ),
        ),
        fx.Invoke(func(params GRPCUIParams) {
            params.Logger.Info("gRPC UI Server initialized:", zap.String("address", params.WebServer.Addr))
        }),
    )
}

But for some reason it's failing at handlerViaReflection:

h, err := standalone.HandlerViaReflection(context.Background(), cc, rpcGrpcHost)
if err != nil {
    logger.Error("Failed to create UI handler", zap.Error(err))
    return nil, err
}

And it's giving an error because the gRPC server hasn't started yet. I placed a breakpoint here in the server module:

fx.Invoke(func(lc fx.Lifecycle, server *grpc.Server, config Config, log *zap.Logger) {
    lc.Append(fx.Hook{
        OnStart: func(ctx context.Context) error {
            lis, err := NewListener(config)
            if err != nil {
                return err
            }

            go func(srv *grpc.Server, logger *zap.Logger) {
                logger.Info("Starting gRPC server")
                if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
                    logger.Error("gRPC server failed", zap.Error(err))
                }
            }(server, log)

            return nil
        },

And it doesn't enter; it always goes to the UI module first. It creates the server: NewGRPCServer, but never initializes it. Does anyone know how I can solve this?

Share Improve this question edited Apr 2 at 9:29 Ming asked Apr 2 at 3:53 MingMing 1,6344 gold badges18 silver badges50 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 2

Create ≠ Start: NewGRPCServer only creates a *grpc.Server object but does not start listening on the port.

In your code, both NewGRPCServer and NewGRPCUIServer are provided through fx.Provide. The UI server attempts to connect to the gRPC server immediately upon construction. However, the actual startup of your gRPC server occurs in the OnStart lifecycle hook, which happens after all initializations are complete.

You can try moving the reflection connection logic of the UI service to the OnStart hook as well, to ensure that the connection attempt happens only after the gRPC server has started. For example:

func NewGRPCUIServer(lc fx.Lifecycle, ...) (*http.Server, error) {
    server := &http.Server{...}

    lc.Append(fx.Hook{
        OnStart: func(ctx context.Context) error {
            cc, err := grpc.NewClient(...)
            h, err := standalone.HandlerViaReflection(...)
            // ...
        },
    })

    return server, nil
}

btw, if it always goes to the UI module first, check whether the dependency order is correct. Ensure that the gRPC server module is registered before the UI module when registering Fx modules.

发布评论

评论列表(0)

  1. 暂无评论