284 lines
9.1 KiB
Go
284 lines
9.1 KiB
Go
// Copyright (C) 2017 Michael J. Fromberger. All Rights Reserved.
|
|
|
|
package jrpc2
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"runtime"
|
|
"time"
|
|
|
|
"github.com/creachadair/jrpc2/code"
|
|
"github.com/creachadair/jrpc2/metrics"
|
|
)
|
|
|
|
// ServerOptions control the behaviour of a server created by NewServer.
|
|
// A nil *ServerOptions provides sensible defaults.
|
|
// It is safe to share server options among multiple server instances.
|
|
type ServerOptions struct {
|
|
// If not nil, send debug text logs here.
|
|
Logger Logger
|
|
|
|
// If not nil, the methods of this value are called to log each request
|
|
// received and each response or error returned.
|
|
RPCLog RPCLogger
|
|
|
|
// Instructs the server to allow server callbacks and notifications, a
|
|
// non-standard extension to the JSON-RPC protocol. If AllowPush is false,
|
|
// the Notify and Callback methods of the server report errors if called.
|
|
AllowPush bool
|
|
|
|
// Instructs the server to disable the built-in rpc.* handler methods.
|
|
//
|
|
// By default, a server reserves all rpc.* methods, even if the given
|
|
// assigner maps them. When this option is true, rpc.* methods are passed
|
|
// along to the given assigner.
|
|
DisableBuiltin bool
|
|
|
|
// Allows up to the specified number of goroutines to execute in parallel in
|
|
// request handlers. A value less than 1 uses runtime.NumCPU(). Note that
|
|
// this setting does not constrain order of issue.
|
|
Concurrency int
|
|
|
|
// If set, this function is called to create a new base request context.
|
|
// If unset, the server uses a background context.
|
|
NewContext func() context.Context
|
|
|
|
// If set, this function is called with the method name and encoded request
|
|
// parameters received from the client, before they are delivered to the
|
|
// handler. Its return value replaces the context and argument values. This
|
|
// allows the server to decode context metadata sent by the client.
|
|
// If unset, context and parameters are used as given.
|
|
DecodeContext func(context.Context, string, json.RawMessage) (context.Context, json.RawMessage, error)
|
|
|
|
// If set, use this value to record server metrics. All servers created
|
|
// from the same options will share the same metrics collector. If none is
|
|
// set, an empty collector will be created for each new server.
|
|
Metrics *metrics.M
|
|
|
|
// If nonzero this value as the server start time; otherwise, use the
|
|
// current time when Start is called. All servers created from the same
|
|
// options will share the same start time if one is set.
|
|
StartTime time.Time
|
|
}
|
|
|
|
func (s *ServerOptions) logFunc() func(string, ...interface{}) {
|
|
if s == nil || s.Logger == nil {
|
|
return func(string, ...interface{}) {}
|
|
}
|
|
return s.Logger.Printf
|
|
}
|
|
|
|
func (s *ServerOptions) allowPush() bool { return s != nil && s.AllowPush }
|
|
func (s *ServerOptions) allowBuiltin() bool { return s == nil || !s.DisableBuiltin }
|
|
|
|
func (s *ServerOptions) concurrency() int64 {
|
|
if s == nil || s.Concurrency < 1 {
|
|
return int64(runtime.NumCPU())
|
|
}
|
|
return int64(s.Concurrency)
|
|
}
|
|
|
|
func (s *ServerOptions) startTime() time.Time {
|
|
if s == nil {
|
|
return time.Time{}
|
|
}
|
|
return s.StartTime
|
|
}
|
|
|
|
func (o *ServerOptions) newContext() func() context.Context {
|
|
if o == nil || o.NewContext == nil {
|
|
return context.Background
|
|
}
|
|
return o.NewContext
|
|
}
|
|
|
|
type decoder = func(context.Context, string, json.RawMessage) (context.Context, json.RawMessage, error)
|
|
|
|
func (s *ServerOptions) decodeContext() decoder {
|
|
if s == nil || s.DecodeContext == nil {
|
|
return func(ctx context.Context, method string, params json.RawMessage) (context.Context, json.RawMessage, error) {
|
|
return ctx, params, nil
|
|
}
|
|
}
|
|
return s.DecodeContext
|
|
}
|
|
|
|
func (s *ServerOptions) metrics() *metrics.M {
|
|
if s == nil || s.Metrics == nil {
|
|
return metrics.New()
|
|
}
|
|
return s.Metrics
|
|
}
|
|
|
|
func (s *ServerOptions) rpcLog() RPCLogger {
|
|
if s == nil || s.RPCLog == nil {
|
|
return nullRPCLogger{}
|
|
}
|
|
return s.RPCLog
|
|
}
|
|
|
|
// ClientOptions control the behaviour of a client created by NewClient.
|
|
// A nil *ClientOptions provides sensible defaults.
|
|
type ClientOptions struct {
|
|
// If not nil, send debug text logs here.
|
|
Logger Logger
|
|
|
|
// If set, this function is called with the context, method name, and
|
|
// encoded request parameters before the request is sent to the server.
|
|
// Its return value replaces the request parameters. This allows the client
|
|
// to send context metadata along with the request. If unset, the parameters
|
|
// are unchanged.
|
|
EncodeContext func(context.Context, string, json.RawMessage) (json.RawMessage, error)
|
|
|
|
// If set, this function is called if a notification is received from the
|
|
// server. If unset, server notifications are logged and discarded. At
|
|
// most one invocation of the callback will be active at a time.
|
|
// Server notifications are a non-standard extension of JSON-RPC.
|
|
OnNotify func(*Request)
|
|
|
|
// If set, this function is called if a request is received from the server.
|
|
// If unset, server requests are logged and discarded. Multiple invocations
|
|
// of the callback handler may be active concurrently.
|
|
//
|
|
// The callback handler can retrieve the client from its context using the
|
|
// jrpc2.ClientFromContext function. The context terminates when the client
|
|
// is closed.
|
|
//
|
|
// If a callback handler panics, the client will recover the panic and
|
|
// report a system error back to the server describing the error.
|
|
//
|
|
// Server callbacks are a non-standard extension of JSON-RPC.
|
|
OnCallback func(context.Context, *Request) (interface{}, error)
|
|
|
|
// If set, this function is called when the context for a request terminates.
|
|
// The function receives the client and the response that was cancelled.
|
|
// The hook can obtain the ID and error value from rsp.
|
|
//
|
|
// Note that the hook does not receive the request context, which has
|
|
// already ended by the time the hook is called.
|
|
OnCancel func(cli *Client, rsp *Response)
|
|
}
|
|
|
|
func (c *ClientOptions) logFunc() func(string, ...interface{}) {
|
|
if c == nil || c.Logger == nil {
|
|
return func(string, ...interface{}) {}
|
|
}
|
|
return c.Logger.Printf
|
|
}
|
|
|
|
type encoder = func(context.Context, string, json.RawMessage) (json.RawMessage, error)
|
|
|
|
func (c *ClientOptions) encodeContext() encoder {
|
|
if c == nil || c.EncodeContext == nil {
|
|
return func(_ context.Context, methods string, params json.RawMessage) (json.RawMessage, error) {
|
|
return params, nil
|
|
}
|
|
}
|
|
return c.EncodeContext
|
|
}
|
|
|
|
func (c *ClientOptions) handleNotification() func(*jmessage) {
|
|
if c == nil || c.OnNotify == nil {
|
|
return nil
|
|
}
|
|
h := c.OnNotify
|
|
return func(req *jmessage) { h(&Request{method: req.M, params: req.P}) }
|
|
}
|
|
|
|
func (c *ClientOptions) handleCancel() func(*Client, *Response) {
|
|
if c == nil {
|
|
return nil
|
|
}
|
|
return c.OnCancel
|
|
}
|
|
|
|
func (c *ClientOptions) handleCallback() func(context.Context, *jmessage) []byte {
|
|
if c == nil || c.OnCallback == nil {
|
|
return nil
|
|
}
|
|
cb := c.OnCallback
|
|
return func(ctx context.Context, req *jmessage) []byte {
|
|
// Recover panics from the callback handler to ensure the server gets a
|
|
// response even if the callback fails without a result.
|
|
//
|
|
// Otherwise, a client and a server (a) running in the same process, and
|
|
// (b) where panics are recovered at a higher level, and (c) without
|
|
// cleaning up the client, can cause the server to stall in a manner that
|
|
// is difficult to debug.
|
|
//
|
|
// See https://github.com/creachadair/jrpc2/issues/41.
|
|
rsp := &jmessage{ID: req.ID}
|
|
v, err := panicToError(func() (interface{}, error) {
|
|
return cb(ctx, &Request{
|
|
id: req.ID,
|
|
method: req.M,
|
|
params: req.P,
|
|
})
|
|
})
|
|
if err == nil {
|
|
rsp.R, err = json.Marshal(v)
|
|
}
|
|
if err != nil {
|
|
rsp.R = nil
|
|
if e, ok := err.(*Error); ok {
|
|
rsp.E = e
|
|
} else {
|
|
rsp.E = &Error{Code: code.FromError(err), Message: err.Error()}
|
|
}
|
|
}
|
|
bits, _ := rsp.toJSON()
|
|
return bits
|
|
}
|
|
}
|
|
|
|
func panicToError(f func() (interface{}, error)) (v interface{}, err error) {
|
|
defer func() {
|
|
if p := recover(); p != nil {
|
|
err = fmt.Errorf("panic in callback handler: %v", p)
|
|
}
|
|
}()
|
|
return f()
|
|
}
|
|
|
|
// A Logger records text logs from a server or a client. A nil logger discards
|
|
// text log input.
|
|
type Logger func(text string)
|
|
|
|
// Printf writes a formatted message to the logger. If lg == nil, the message
|
|
// is discarded.
|
|
func (lg Logger) Printf(msg string, args ...interface{}) {
|
|
if lg != nil {
|
|
lg(fmt.Sprintf(msg, args...))
|
|
}
|
|
}
|
|
|
|
// StdLogger adapts a *log.Logger to a Logger. If logger == nil, the returned
|
|
// function sends logs to the default logger.
|
|
func StdLogger(logger *log.Logger) Logger {
|
|
if logger == nil {
|
|
return func(text string) { log.Output(2, text) }
|
|
}
|
|
return func(text string) { logger.Output(2, text) }
|
|
}
|
|
|
|
// An RPCLogger receives callbacks from a server to record the receipt of
|
|
// requests and the delivery of responses. These callbacks are invoked
|
|
// synchronously with the processing of the request.
|
|
type RPCLogger interface {
|
|
// Called for each request received prior to invoking its handler.
|
|
LogRequest(ctx context.Context, req *Request)
|
|
|
|
// Called for each response produced by a handler, immediately prior to
|
|
// sending it back to the client. The inbound request can be recovered from
|
|
// the context using jrpc2.InboundRequest.
|
|
LogResponse(ctx context.Context, rsp *Response)
|
|
}
|
|
|
|
type nullRPCLogger struct{}
|
|
|
|
func (nullRPCLogger) LogRequest(context.Context, *Request) {}
|
|
func (nullRPCLogger) LogResponse(context.Context, *Response) {}
|