105 lines
2.5 KiB
Go
105 lines
2.5 KiB
Go
package jhttp
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"sync"
|
|
)
|
|
|
|
// A Channel implements an channel.Channel that dispatches requests via HTTP to
|
|
// a user-provided URL. Each message sent to the channel is an HTTP POST
|
|
// request with the message as its body.
|
|
type Channel struct {
|
|
url string
|
|
cli *http.Client
|
|
wg *sync.WaitGroup
|
|
rsp chan response
|
|
}
|
|
|
|
type response struct {
|
|
rsp *http.Response
|
|
err error
|
|
}
|
|
|
|
// NewChannel constructs a new channel that posts to the specified URL.
|
|
func NewChannel(url string) *Channel {
|
|
return &Channel{
|
|
url: url,
|
|
cli: http.DefaultClient,
|
|
wg: new(sync.WaitGroup),
|
|
rsp: make(chan response),
|
|
}
|
|
}
|
|
|
|
// Send forwards msg to the server as the body of an HTTP POST request.
|
|
func (c *Channel) Send(msg []byte) error {
|
|
cli := c.cli
|
|
if cli == nil {
|
|
return errors.New("channel is closed")
|
|
}
|
|
|
|
// Each Send starts a goroutine to wait for the corresponding reply from the
|
|
// HTTP server, and c.wg tracks these goroutines. Responses are delivered to
|
|
// the c.rsp channel without interpretation; error handling happens in Recv.
|
|
//
|
|
// The goroutine for a Send cannot exit until either a Recv occurs, or until
|
|
// the channel is closed (draining any further undelivered responses). The
|
|
// caller should thus avoid calling Send a large number of times with no
|
|
// intervening Recv calls.
|
|
req, err := http.NewRequest("POST", c.url, bytes.NewReader(msg))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
rsp, err := cli.Do(req)
|
|
c.rsp <- response{rsp, err}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// Recv receives the next available response and reports its body.
|
|
func (c *Channel) Recv() ([]byte, error) {
|
|
for {
|
|
next, ok := <-c.rsp
|
|
if !ok {
|
|
return nil, io.EOF
|
|
} else if next.err != nil {
|
|
return nil, next.err // HTTP failure, not request failure
|
|
}
|
|
|
|
// Ensure the body is fully read and closed before continuing.
|
|
data, err := ioutil.ReadAll(next.rsp.Body)
|
|
next.rsp.Body.Close()
|
|
|
|
switch next.rsp.StatusCode {
|
|
case http.StatusOK:
|
|
// ok, we have a message to report
|
|
return data, err
|
|
case http.StatusNoContent:
|
|
// ok, but no message to report; wait for another
|
|
continue
|
|
default:
|
|
return nil, fmt.Errorf("unexpected HTTP status %s", next.rsp.Status)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close shuts down the channel, discarding any pending responses.
|
|
func (c *Channel) Close() error {
|
|
c.cli = nil // no further requests may be sent
|
|
|
|
// Drain any pending requests.
|
|
go func() { c.wg.Wait(); close(c.rsp) }()
|
|
for range c.rsp {
|
|
// discard
|
|
}
|
|
return nil
|
|
}
|