2021-12-04 16:42:11 +00:00

129 lines
3.5 KiB
Go

package jhttp
import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
"sync"
)
// A Channel implements a channel.Channel that dispatches requests via HTTP to
// a user-provided URL. Each message sent to the channel is an HTTP POST
// request with the message as its body.
type Channel struct {
url string
cli HTTPClient
wg *sync.WaitGroup
rsp chan response
}
type response struct {
rsp *http.Response
err error
}
// HTTPClient is the interface to an HTTP client used by a Channel. It is
// compatible with the standard library http.Client type.
type HTTPClient interface {
Do(*http.Request) (*http.Response, error)
}
// ChannelOptions gives optional parameters for constructing an HTTP channel.
// A nil *ChannelOptions is ready for use, and provides default options as
// described.
type ChannelOptions struct {
// The HTTP client to use to send requests. If nil, uses http.DefaultClient.
Client HTTPClient
}
func (o *ChannelOptions) httpClient() HTTPClient {
if o == nil || o.Client == nil {
return http.DefaultClient
}
return o.Client
}
// NewChannel constructs a new channel that posts to the specified URL.
func NewChannel(url string, opts *ChannelOptions) *Channel {
return &Channel{
url: url,
cli: opts.httpClient(),
wg: new(sync.WaitGroup),
rsp: make(chan response),
}
}
// Send forwards msg to the server as the body of an HTTP POST request.
func (c *Channel) Send(msg []byte) error {
cli := c.cli
if cli == nil {
return errors.New("channel is closed")
}
// Each Send starts a goroutine to wait for the corresponding reply from the
// HTTP server, and c.wg tracks these goroutines. Acknowledgement replies to
// notifications are handled immediately; (non-empty) responses to calls are
// delivered to the c.rsp channel. Error handling happens in Recv, since
// that is where the caller is prepared to receive an error.
//
// The goroutine for a Send whose request returns a non-empty result cannot
// exit until either a Recv occurs, or until the channel is closed (draining
// any further undelivered responses). The caller should thus avoid sending
// too many call requests with no intervening Recv calls.
req, err := http.NewRequest("POST", c.url, bytes.NewReader(msg))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
c.wg.Add(1)
go func() {
defer c.wg.Done()
rsp, err := cli.Do(req)
// If the server replied with an empty acknowledgement for a
// notification, exit early so that we don't depend on a Recv.
if err == nil && rsp.StatusCode == http.StatusNoContent {
rsp.Body.Close()
return
}
c.rsp <- response{rsp, err}
}()
return nil
}
// Recv receives the next available response and reports its body.
func (c *Channel) Recv() ([]byte, error) {
next, ok := <-c.rsp
if !ok {
return nil, io.EOF
} else if next.err != nil {
return nil, next.err // HTTP failure, not request failure
}
// Ensure the body is fully read and closed before continuing.
data, err := io.ReadAll(next.rsp.Body)
next.rsp.Body.Close()
// N.B. Empty responses (StatusNoContent) is handled by Send, so we will
// not see those responses here.
if next.rsp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected HTTP status %s", next.rsp.Status)
}
// ok, we have a message to report
return data, err
}
// Close shuts down the channel, discarding any pending responses.
func (c *Channel) Close() error {
c.cli = nil // no further requests may be sent
// Drain any pending requests.
go func() { c.wg.Wait(); close(c.rsp) }()
for range c.rsp {
// discard
}
return nil
}