ここを参考に go-kit を試した。http で受けたメッセージを amqp に publish する。
サービスのインタフェース定義
サービスはインタフェースで定義する
type PublishService interface {
Publish(message string) (string, error)
}
サービスを実装
type publishService struct{}
func (publishService) Publish(s string) (string, error) {
if s == "" {
return "", errors.New("empty")
}
err := pub("testqueue", s)
if err != nil {
return "", err
}
return "Done", nil
}
func pub(qname string, message string) error {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Print("failed Dial")
return err
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Print("failed Channel")
return err
}
defer ch.Close()
q, err := ch.QueueDeclare(
qname, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Print("failed QueueDeclare")
return err
}
err = ch.Publish(
"", // exchange
q.Name, //q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
if err != nil {
log.Print("failed Publish")
return err
}
log.Printf("pulished %s : %s", qname, message)
return nil
}
メッセージ関係
type publishRequest struct {
S string `json:"s"`
}
type publishResponse struct {
V string `json:"v"`
Err string `json:"err,omitempty"`
}
func decodePublishRequest(_ context.Context, r *http.Request) (interface{}, error) {
var request publishRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}
func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
return json.NewEncoder(w).Encode(response)
}
エンドポイント
func makePublishEndpoint(svc PublishService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(publishRequest)
v, err := svc.Publish(req.S)
if err != nil {
return publishResponse{v, err.Error()}, nil
}
return publishResponse{v, ""}, nil
}
}
main()
func main() {
svc := publishService{}
publishHandler := httptransport.NewServer(
makePublishEndpoint(svc),
decodePublishRequest,
encodeResponse,
)
http.Handle("/publish", publishHandler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
curl
$ curl -s -X POST -d'{"s":"hello, world1"}' localhost:8080/publish
{"v":"Done"}
$ curl -s -X POST -d'{"s":"hello, world2"}' localhost:8080/publish
{"v":"Done"}
log
$go run main.go 2020/02/20 20:29:06 pulished testqueue : hello, world1 2020/02/20 20:29:08 pulished testqueue : hello, world2
ここらへん
後で試す