2022-02-06 07:06:32 +00:00

284 lines
9.1 KiB
Go

// Copyright (C) 2017 Michael J. Fromberger. All Rights Reserved.
package jrpc2
import (
"context"
"encoding/json"
"fmt"
"log"
"runtime"
"time"
"github.com/creachadair/jrpc2/code"
"github.com/creachadair/jrpc2/metrics"
)
// ServerOptions control the behaviour of a server created by NewServer.
// A nil *ServerOptions provides sensible defaults.
// It is safe to share server options among multiple server instances.
type ServerOptions struct {
// If not nil, send debug text logs here.
Logger Logger
// If not nil, the methods of this value are called to log each request
// received and each response or error returned.
RPCLog RPCLogger
// Instructs the server to allow server callbacks and notifications, a
// non-standard extension to the JSON-RPC protocol. If AllowPush is false,
// the Notify and Callback methods of the server report errors if called.
AllowPush bool
// Instructs the server to disable the built-in rpc.* handler methods.
//
// By default, a server reserves all rpc.* methods, even if the given
// assigner maps them. When this option is true, rpc.* methods are passed
// along to the given assigner.
DisableBuiltin bool
// Allows up to the specified number of goroutines to execute in parallel in
// request handlers. A value less than 1 uses runtime.NumCPU(). Note that
// this setting does not constrain order of issue.
Concurrency int
// If set, this function is called to create a new base request context.
// If unset, the server uses a background context.
NewContext func() context.Context
// If set, this function is called with the method name and encoded request
// parameters received from the client, before they are delivered to the
// handler. Its return value replaces the context and argument values. This
// allows the server to decode context metadata sent by the client.
// If unset, context and parameters are used as given.
DecodeContext func(context.Context, string, json.RawMessage) (context.Context, json.RawMessage, error)
// If set, use this value to record server metrics. All servers created
// from the same options will share the same metrics collector. If none is
// set, an empty collector will be created for each new server.
Metrics *metrics.M
// If nonzero this value as the server start time; otherwise, use the
// current time when Start is called. All servers created from the same
// options will share the same start time if one is set.
StartTime time.Time
}
func (s *ServerOptions) logFunc() func(string, ...interface{}) {
if s == nil || s.Logger == nil {
return func(string, ...interface{}) {}
}
return s.Logger.Printf
}
func (s *ServerOptions) allowPush() bool { return s != nil && s.AllowPush }
func (s *ServerOptions) allowBuiltin() bool { return s == nil || !s.DisableBuiltin }
func (s *ServerOptions) concurrency() int64 {
if s == nil || s.Concurrency < 1 {
return int64(runtime.NumCPU())
}
return int64(s.Concurrency)
}
func (s *ServerOptions) startTime() time.Time {
if s == nil {
return time.Time{}
}
return s.StartTime
}
func (o *ServerOptions) newContext() func() context.Context {
if o == nil || o.NewContext == nil {
return context.Background
}
return o.NewContext
}
type decoder = func(context.Context, string, json.RawMessage) (context.Context, json.RawMessage, error)
func (s *ServerOptions) decodeContext() decoder {
if s == nil || s.DecodeContext == nil {
return func(ctx context.Context, method string, params json.RawMessage) (context.Context, json.RawMessage, error) {
return ctx, params, nil
}
}
return s.DecodeContext
}
func (s *ServerOptions) metrics() *metrics.M {
if s == nil || s.Metrics == nil {
return metrics.New()
}
return s.Metrics
}
func (s *ServerOptions) rpcLog() RPCLogger {
if s == nil || s.RPCLog == nil {
return nullRPCLogger{}
}
return s.RPCLog
}
// ClientOptions control the behaviour of a client created by NewClient.
// A nil *ClientOptions provides sensible defaults.
type ClientOptions struct {
// If not nil, send debug text logs here.
Logger Logger
// If set, this function is called with the context, method name, and
// encoded request parameters before the request is sent to the server.
// Its return value replaces the request parameters. This allows the client
// to send context metadata along with the request. If unset, the parameters
// are unchanged.
EncodeContext func(context.Context, string, json.RawMessage) (json.RawMessage, error)
// If set, this function is called if a notification is received from the
// server. If unset, server notifications are logged and discarded. At
// most one invocation of the callback will be active at a time.
// Server notifications are a non-standard extension of JSON-RPC.
OnNotify func(*Request)
// If set, this function is called if a request is received from the server.
// If unset, server requests are logged and discarded. Multiple invocations
// of the callback handler may be active concurrently.
//
// The callback handler can retrieve the client from its context using the
// jrpc2.ClientFromContext function. The context terminates when the client
// is closed.
//
// If a callback handler panics, the client will recover the panic and
// report a system error back to the server describing the error.
//
// Server callbacks are a non-standard extension of JSON-RPC.
OnCallback func(context.Context, *Request) (interface{}, error)
// If set, this function is called when the context for a request terminates.
// The function receives the client and the response that was cancelled.
// The hook can obtain the ID and error value from rsp.
//
// Note that the hook does not receive the request context, which has
// already ended by the time the hook is called.
OnCancel func(cli *Client, rsp *Response)
}
func (c *ClientOptions) logFunc() func(string, ...interface{}) {
if c == nil || c.Logger == nil {
return func(string, ...interface{}) {}
}
return c.Logger.Printf
}
type encoder = func(context.Context, string, json.RawMessage) (json.RawMessage, error)
func (c *ClientOptions) encodeContext() encoder {
if c == nil || c.EncodeContext == nil {
return func(_ context.Context, methods string, params json.RawMessage) (json.RawMessage, error) {
return params, nil
}
}
return c.EncodeContext
}
func (c *ClientOptions) handleNotification() func(*jmessage) {
if c == nil || c.OnNotify == nil {
return nil
}
h := c.OnNotify
return func(req *jmessage) { h(&Request{method: req.M, params: req.P}) }
}
func (c *ClientOptions) handleCancel() func(*Client, *Response) {
if c == nil {
return nil
}
return c.OnCancel
}
func (c *ClientOptions) handleCallback() func(context.Context, *jmessage) []byte {
if c == nil || c.OnCallback == nil {
return nil
}
cb := c.OnCallback
return func(ctx context.Context, req *jmessage) []byte {
// Recover panics from the callback handler to ensure the server gets a
// response even if the callback fails without a result.
//
// Otherwise, a client and a server (a) running in the same process, and
// (b) where panics are recovered at a higher level, and (c) without
// cleaning up the client, can cause the server to stall in a manner that
// is difficult to debug.
//
// See https://github.com/creachadair/jrpc2/issues/41.
rsp := &jmessage{ID: req.ID}
v, err := panicToError(func() (interface{}, error) {
return cb(ctx, &Request{
id: req.ID,
method: req.M,
params: req.P,
})
})
if err == nil {
rsp.R, err = json.Marshal(v)
}
if err != nil {
rsp.R = nil
if e, ok := err.(*Error); ok {
rsp.E = e
} else {
rsp.E = &Error{Code: code.FromError(err), Message: err.Error()}
}
}
bits, _ := rsp.toJSON()
return bits
}
}
func panicToError(f func() (interface{}, error)) (v interface{}, err error) {
defer func() {
if p := recover(); p != nil {
err = fmt.Errorf("panic in callback handler: %v", p)
}
}()
return f()
}
// A Logger records text logs from a server or a client. A nil logger discards
// text log input.
type Logger func(text string)
// Printf writes a formatted message to the logger. If lg == nil, the message
// is discarded.
func (lg Logger) Printf(msg string, args ...interface{}) {
if lg != nil {
lg(fmt.Sprintf(msg, args...))
}
}
// StdLogger adapts a *log.Logger to a Logger. If logger == nil, the returned
// function sends logs to the default logger.
func StdLogger(logger *log.Logger) Logger {
if logger == nil {
return func(text string) { log.Output(2, text) }
}
return func(text string) { logger.Output(2, text) }
}
// An RPCLogger receives callbacks from a server to record the receipt of
// requests and the delivery of responses. These callbacks are invoked
// synchronously with the processing of the request.
type RPCLogger interface {
// Called for each request received prior to invoking its handler.
LogRequest(ctx context.Context, req *Request)
// Called for each response produced by a handler, immediately prior to
// sending it back to the client. The inbound request can be recovered from
// the context using jrpc2.InboundRequest.
LogResponse(ctx context.Context, rsp *Response)
}
type nullRPCLogger struct{}
func (nullRPCLogger) LogRequest(context.Context, *Request) {}
func (nullRPCLogger) LogResponse(context.Context, *Response) {}