以下の内容はhttps://devlights.hatenablog.com/entry/2025/03/12/073000より取得しました。


Goメモ-551 (net/textproto)(パイプラインを使って順序保証しながらバッチ処理)

関連記事

GitHub - devlights/blog-summary: ブログ「いろいろ備忘録日記」のまとめ

概要

以下、自分用のメモです。忘れないようにここにメモメモ。。。

使ったこと無かったのですが、net/textproto というパッケージがあるのを最近知りました。

テキストベースのプロトコル(HTTPとかSMTPとか)の通信処理が作りやすくなるライブラリとのこと。

Package textproto implements generic support for text-based request/response protocols in the style of HTTP, NNTP, and SMTP.

(textprotoパッケージは、HTTP、NNTP、SMTPスタイルのテキストベースのリクエスト/レスポンスプロトコルの汎用サポートを実装します。)

\r\nがデリミタになってて、リクエストにコマンドが入ってて、レスポンスにコードが入っているやつですね。

こんな感じ。

HELO hoge.example.com
250 Hello hoge.example.com

こういうプロトコルを作るときに便利なライブラリです。ついでに ドットエンコーディング なデータを処理するための DotReaderDotWriter というのも用意されています。

このパッケージの *textproto.Conn の定義は以下のようになっています。

type Conn struct {
    Reader
    Writer
    Pipeline
    // contains filtered or unexported fields
}

Pipeline というのがあります。この Pipeline を使うと順序保証しながら、通信経路を流れるデータ量によってはバッチ処理が出来ます。

IDが割り振られるので、そのIDを元に送受信順序を担保するという感じですね。

サンプル

main.go

いちいちファイル分けるのが面倒なので、サーバとクライアント兼用です。

package main

import (
    "errors"
    "flag"
    "fmt"
    "io"
    "net"
    "net/textproto"
    "strconv"
    "strings"
    "time"
)

type (
    Args struct {
        IsServer bool
    }
)

const (
    OK = 200
    NG = 400

    CmdAdd   = "ADD"
    CmdTotal = "TOTAL"
    CmdQuit  = "QUIT"
)

var (
    args Args
)

func init() {
    flag.BoolVar(&args.IsServer, "server", false, "server mode")
}

func main() {
    flag.Parse()

    if err := run(); err != nil {
        panic(err)
    }
}

func run() error {
    var err error

    switch {
    case args.IsServer:
        err = runServer()
    default:
        err = runClient()
    }

    if err != nil {
        return err
    }

    return nil
}

func runServer() error {
    l, err := net.Listen("tcp", ":8888")
    if err != nil {
        return err
    }
    defer l.Close()

    // サンプルなので1回だけ受付
    // クライアントからは
    //   - ADD, ADD, ADD... (textprotoのPipelineを使って複数回分をバッチ送信)
    //   - VAL
    //   - QUIT
    // のシーケンスが来るとする
    conn, err := l.Accept()
    if err != nil {
        return err
    }
    defer conn.Close()

    tpConn := textproto.NewConn(conn)
    defer tpConn.Close()

    var total int64
    for {
        line, err := tpConn.ReadLine()
        if err != nil {
            if errors.Is(err, io.EOF) {
                return nil
            }

            return err
        }

        parts := strings.Fields(line)
        if len(parts) == 0 {
            tpConn.PrintfLine("%d %s", NG, "COMMAND COULD NOT BE READ")
            continue
        }

        command := strings.ToUpper(parts[0])
        cmdArgs := parts[1:]
        switch command {
        case CmdAdd:
            v, err := strconv.Atoi(cmdArgs[0])
            if err != nil {
                tpConn.PrintfLine("%d %s", NG, "VALUE SHOULD BE A NUMBER")
                continue
            }

            total += int64(v)
            tpConn.PrintfLine("%d %s %d", OK, "ADDED", v)
        case CmdTotal:
            tpConn.PrintfLine("%d %s", OK, strconv.FormatInt(total, 10))
        case CmdQuit:
            tpConn.PrintfLine("%d %s", OK, "BYE")
            return nil
        default:
            tpConn.PrintfLine("%d %s", NG, "UNKNOWN COMMAND")
        }
    }
}

func runClient() error {
    conn, err := net.Dial("tcp", ":8888")
    if err != nil {
        return err
    }
    defer conn.Close()

    tpConn := textproto.NewConn(conn)
    defer tpConn.Close()

    //
    // パイプラインを使って複数のリクエストを順序保証しながらバッチ処理
    //
    const numRequests = 5
    ids := make([]uint, 0, numRequests)
    for range numRequests {
        // IDを採番
        id := tpConn.Next()

        // 取得したIDを使ってリクエスト
        tpConn.StartRequest(id)
        {
            msg := fmt.Sprintf("%s %d", CmdAdd, time.Now().Nanosecond())
            err = send(conn, tpConn, msg)
            if err != nil {
                return err
            }
        }
        tpConn.EndRequest(id)

        ids = append(ids, id)
    }

    for _, id := range ids {

        // リクエスト時に採番されたIDを使って応答受信
        tpConn.StartResponse(id)
        {
            err = recv(conn, tpConn)
            if err != nil {
                return err
            }
        }
        tpConn.EndResponse(id)
    }

    // 合計を聞く
    err = send(conn, tpConn, CmdTotal)
    if err != nil {
        return err
    }

    err = recv(conn, tpConn)
    if err != nil {
        return err
    }

    // 終わり
    err = send(conn, tpConn, CmdQuit)
    if err != nil {
        return err
    }

    err = recv(conn, tpConn)
    if err != nil {
        return err
    }

    return nil
}

func send(conn net.Conn, tpConn *textproto.Conn, msg string) error {
    err := conn.SetWriteDeadline(time.Now().Add(1 * time.Second))
    if err != nil {
        return err
    }

    fmt.Printf("< %s\n", msg)
    err = tpConn.PrintfLine("%s", msg)
    if err != nil {
        return err
    }

    return nil
}

func recv(conn net.Conn, tpConn *textproto.Conn) error {
    err := conn.SetReadDeadline(time.Now().Add(1 * time.Second))
    if err != nil {
        return err
    }

    code, msg, err := tpConn.ReadCodeLine(OK)
    if err != nil {
        return err
    }
    fmt.Printf("%d %s\n", code, msg)

    return nil
}

Taskfile.yml

# https://taskfile.dev

version: "3"

tasks:
  default:
    cmds:
      - task: build
      - task: run
  build:
    cmds:
      - go build -o app .
  run:
    cmds:
      - ./app -server &
      - sleep 1
      - ./app
  watch:
    cmds:
      - sudo tcpdump -i lo -n 'tcp port 8888' -S -X
    interactive: true

shell

実行すると以下のように出力されます。

$ task
task: [build] go build -o app .
task: [run] ./app -server &
task: [run] sleep 1
task: [run] ./app
< ADD 480635715
< ADD 480686755
< ADD 480707125
< ADD 480717295
< ADD 480732305
200 ADDED 480635715
200 ADDED 480686755
200 ADDED 480707125
200 ADDED 480717295
200 ADDED 480732305
< TOTAL
200 2403479195
< QUIT
200 BYE

ついでにパケットも眺めてみましょう。

$ task watch
task: [watch] sudo tcpdump -i lo -n 'tcp port 8888' -S -X
tcpdump: verbose output suppressed, use -v[v]... for full protocol decode
listening on lo, link-type EN10MB (Ethernet), snapshot length 262144 bytes
07:24:23.117926 IP 127.0.0.1.39138 > 127.0.0.1.8888: Flags [S], seq 3699364872, win 43690, options [mss 65495,sackOK,TS val 3360604880 ecr 0,nop,wscale 7], length 0
        0x0000:  4500 003c 48a8 4000 4006 f411 7f00 0001  E..<H.@.@.......
        0x0010:  7f00 0001 98e2 22b8 dc7f d408 0000 0000  ......".........
        0x0020:  a002 aaaa fe30 0000 0204 ffd7 0402 080a  .....0..........
        0x0030:  c84e c2d0 0000 0000 0103 0307            .N..........
07:24:23.117937 IP 127.0.0.1.8888 > 127.0.0.1.39138: Flags [S.], seq 111167116, ack 3699364873, win 43690, options [mss 65495,sackOK,TS val 3360604880 ecr 3360604880,nop,wscale 7], length 0
        0x0000:  4500 003c 0000 4000 4006 3cba 7f00 0001  E..<..@.@.<.....
        0x0010:  7f00 0001 22b8 98e2 06a0 468c dc7f d409  ....".....F.....
        0x0020:  a012 aaaa fe30 0000 0204 ffd7 0402 080a  .....0..........
        0x0030:  c84e c2d0 c84e c2d0 0103 0307            .N...N......
07:24:23.117946 IP 127.0.0.1.39138 > 127.0.0.1.8888: Flags [.], ack 111167117, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 0
        0x0000:  4500 0034 48a9 4000 4006 f418 7f00 0001  E..4H.@.@.......
        0x0010:  7f00 0001 98e2 22b8 dc7f d409 06a0 468d  ......".......F.
        0x0020:  8010 0156 fe28 0000 0101 080a c84e c2d0  ...V.(.......N..
        0x0030:  c84e c2d0                                .N..
07:24:23.118084 IP 127.0.0.1.39138 > 127.0.0.1.8888: Flags [P.], seq 3699364873:3699364888, ack 111167117, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 15
        0x0000:  4500 0043 48aa 4000 4006 f408 7f00 0001  E..CH.@.@.......
        0x0010:  7f00 0001 98e2 22b8 dc7f d409 06a0 468d  ......".......F.
        0x0020:  8018 0156 fe37 0000 0101 080a c84e c2d0  ...V.7.......N..
        0x0030:  c84e c2d0 4144 4420 3131 3830 3334 3434  .N..ADD.11803444
        0x0040:  310d 0a                                  1..
07:24:23.118097 IP 127.0.0.1.8888 > 127.0.0.1.39138: Flags [.], ack 3699364888, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 0
        0x0000:  4500 0034 47bb 4000 4006 f506 7f00 0001  E..4G.@.@.......
        0x0010:  7f00 0001 22b8 98e2 06a0 468d dc7f d418  ....".....F.....
        0x0020:  8010 0156 fe28 0000 0101 080a c84e c2d0  ...V.(.......N..
        0x0030:  c84e c2d0                                .N..
07:24:23.118124 IP 127.0.0.1.39138 > 127.0.0.1.8888: Flags [P.], seq 3699364888:3699364903, ack 111167117, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 15
        0x0000:  4500 0043 48ab 4000 4006 f407 7f00 0001  E..CH.@.@.......
        0x0010:  7f00 0001 98e2 22b8 dc7f d418 06a0 468d  ......".......F.
        0x0020:  8018 0156 fe37 0000 0101 080a c84e c2d0  ...V.7.......N..
        0x0030:  c84e c2d0 4144 4420 3131 3831 3034 3831  .N..ADD.11810481
        0x0040:  310d 0a                                  1..
07:24:23.118128 IP 127.0.0.1.8888 > 127.0.0.1.39138: Flags [.], ack 3699364903, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 0
        0x0000:  4500 0034 47bc 4000 4006 f505 7f00 0001  E..4G.@.@.......
        0x0010:  7f00 0001 22b8 98e2 06a0 468d dc7f d427  ....".....F....'
        0x0020:  8010 0156 fe28 0000 0101 080a c84e c2d0  ...V.(.......N..
        0x0030:  c84e c2d0                                .N..
07:24:23.118142 IP 127.0.0.1.39138 > 127.0.0.1.8888: Flags [P.], seq 3699364903:3699364918, ack 111167117, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 15
        0x0000:  4500 0043 48ac 4000 4006 f406 7f00 0001  E..CH.@.@.......
        0x0010:  7f00 0001 98e2 22b8 dc7f d427 06a0 468d  ......"....'..F.
        0x0020:  8018 0156 fe37 0000 0101 080a c84e c2d0  ...V.7.......N..
        0x0030:  c84e c2d0 4144 4420 3131 3831 3332 3739  .N..ADD.11813279
        0x0040:  310d 0a                                  1..
07:24:23.118153 IP 127.0.0.1.39138 > 127.0.0.1.8888: Flags [P.], seq 3699364918:3699364933, ack 111167117, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 15
        0x0000:  4500 0043 48ad 4000 4006 f405 7f00 0001  E..CH.@.@.......
        0x0010:  7f00 0001 98e2 22b8 dc7f d436 06a0 468d  ......"....6..F.
        0x0020:  8018 0156 fe37 0000 0101 080a c84e c2d0  ...V.7.......N..
        0x0030:  c84e c2d0 4144 4420 3131 3831 3435 3039  .N..ADD.11814509
        0x0040:  310d 0a                                  1..
07:24:23.118154 IP 127.0.0.1.8888 > 127.0.0.1.39138: Flags [P.], seq 111167117:111167130, ack 3699364903, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 13
        0x0000:  4500 0041 47bd 4000 4006 f4f7 7f00 0001  E..AG.@.@.......
        0x0010:  7f00 0001 22b8 98e2 06a0 468d dc7f d427  ....".....F....'
        0x0020:  8018 0156 fe35 0000 0101 080a c84e c2d0  ...V.5.......N..
        0x0030:  c84e c2d0 3230 3020 5355 4343 4553 530d  .N..200.SUCCESS.
        0x0040:  0a                                       .
07:24:23.118167 IP 127.0.0.1.39138 > 127.0.0.1.8888: Flags [.], ack 111167130, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 0
        0x0000:  4500 0034 48ae 4000 4006 f413 7f00 0001  E..4H.@.@.......
        0x0010:  7f00 0001 98e2 22b8 dc7f d445 06a0 469a  ......"....E..F.
        0x0020:  8010 0156 fe28 0000 0101 080a c84e c2d0  ...V.(.......N..
        0x0030:  c84e c2d0                                .N..
07:24:23.118170 IP 127.0.0.1.39138 > 127.0.0.1.8888: Flags [P.], seq 3699364933:3699364948, ack 111167130, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 15
        0x0000:  4500 0043 48af 4000 4006 f403 7f00 0001  E..CH.@.@.......
        0x0010:  7f00 0001 98e2 22b8 dc7f d445 06a0 469a  ......"....E..F.
        0x0020:  8018 0156 fe37 0000 0101 080a c84e c2d0  ...V.7.......N..
        0x0030:  c84e c2d0 4144 4420 3131 3831 3536 3438  .N..ADD.11815648
        0x0040:  310d 0a                                  1..
07:24:23.118188 IP 127.0.0.1.8888 > 127.0.0.1.39138: Flags [.], ack 3699364948, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 0
        0x0000:  4500 0034 47be 4000 4006 f503 7f00 0001  E..4G.@.@.......
        0x0010:  7f00 0001 22b8 98e2 06a0 469a dc7f d454  ....".....F....T
        0x0020:  8010 0156 fe28 0000 0101 080a c84e c2d0  ...V.(.......N..
        0x0030:  c84e c2d0                                .N..
07:24:23.118203 IP 127.0.0.1.8888 > 127.0.0.1.39138: Flags [P.], seq 111167130:111167143, ack 3699364948, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 13
        0x0000:  4500 0041 47bf 4000 4006 f4f5 7f00 0001  E..AG.@.@.......
        0x0010:  7f00 0001 22b8 98e2 06a0 469a dc7f d454  ....".....F....T
        0x0020:  8018 0156 fe35 0000 0101 080a c84e c2d0  ...V.5.......N..
        0x0030:  c84e c2d0 3230 3020 5355 4343 4553 530d  .N..200.SUCCESS.
        0x0040:  0a                                       .
07:24:23.118220 IP 127.0.0.1.8888 > 127.0.0.1.39138: Flags [P.], seq 111167143:111167156, ack 3699364948, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 13
        0x0000:  4500 0041 47c0 4000 4006 f4f4 7f00 0001  E..AG.@.@.......
        0x0010:  7f00 0001 22b8 98e2 06a0 46a7 dc7f d454  ....".....F....T
        0x0020:  8018 0156 fe35 0000 0101 080a c84e c2d0  ...V.5.......N..
        0x0030:  c84e c2d0 3230 3020 5355 4343 4553 530d  .N..200.SUCCESS.
        0x0040:  0a                                       .
07:24:23.118232 IP 127.0.0.1.8888 > 127.0.0.1.39138: Flags [P.], seq 111167156:111167169, ack 3699364948, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 13
        0x0000:  4500 0041 47c1 4000 4006 f4f3 7f00 0001  E..AG.@.@.......
        0x0010:  7f00 0001 22b8 98e2 06a0 46b4 dc7f d454  ....".....F....T
        0x0020:  8018 0156 fe35 0000 0101 080a c84e c2d0  ...V.5.......N..
        0x0030:  c84e c2d0 3230 3020 5355 4343 4553 530d  .N..200.SUCCESS.
        0x0040:  0a                                       .
07:24:23.118244 IP 127.0.0.1.8888 > 127.0.0.1.39138: Flags [P.], seq 111167169:111167182, ack 3699364948, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 13
        0x0000:  4500 0041 47c2 4000 4006 f4f2 7f00 0001  E..AG.@.@.......
        0x0010:  7f00 0001 22b8 98e2 06a0 46c1 dc7f d454  ....".....F....T
        0x0020:  8018 0156 fe35 0000 0101 080a c84e c2d0  ...V.5.......N..
        0x0030:  c84e c2d0 3230 3020 5355 4343 4553 530d  .N..200.SUCCESS.
        0x0040:  0a                                       .
07:24:23.118257 IP 127.0.0.1.39138 > 127.0.0.1.8888: Flags [.], ack 111167182, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 0
        0x0000:  4500 0034 48b0 4000 4006 f411 7f00 0001  E..4H.@.@.......
        0x0010:  7f00 0001 98e2 22b8 dc7f d454 06a0 46ce  ......"....T..F.
        0x0020:  8010 0156 fe28 0000 0101 080a c84e c2d0  ...V.(.......N..
        0x0030:  c84e c2d0                                .N..
07:24:23.118305 IP 127.0.0.1.39138 > 127.0.0.1.8888: Flags [P.], seq 3699364948:3699364955, ack 111167182, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 7
        0x0000:  4500 003b 48b1 4000 4006 f409 7f00 0001  E..;H.@.@.......
        0x0010:  7f00 0001 98e2 22b8 dc7f d454 06a0 46ce  ......"....T..F.
        0x0020:  8018 0156 fe2f 0000 0101 080a c84e c2d0  ...V./.......N..
        0x0030:  c84e c2d0 544f 5441 4c0d 0a              .N..TOTAL..
07:24:23.118358 IP 127.0.0.1.8888 > 127.0.0.1.39138: Flags [P.], seq 111167182:111167197, ack 3699364955, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 15
        0x0000:  4500 0043 47c3 4000 4006 f4ef 7f00 0001  E..CG.@.@.......
        0x0010:  7f00 0001 22b8 98e2 06a0 46ce dc7f d45b  ....".....F....[
        0x0020:  8018 0156 fe37 0000 0101 080a c84e c2d0  ...V.7.......N..
        0x0030:  c84e c2d0 3230 3020 3539 3035 3733 3631  .N..200.59057361
        0x0040:  350d 0a                                  5..
07:24:23.118405 IP 127.0.0.1.39138 > 127.0.0.1.8888: Flags [P.], seq 3699364955:3699364961, ack 111167197, win 342, options [nop,nop,TS val 3360604880 ecr 3360604880], length 6
        0x0000:  4500 003a 48b2 4000 4006 f409 7f00 0001  E..:H.@.@.......
        0x0010:  7f00 0001 98e2 22b8 dc7f d45b 06a0 46dd  ......"....[..F.
        0x0020:  8018 0156 fe2e 0000 0101 080a c84e c2d0  ...V.........N..
        0x0030:  c84e c2d0 5155 4954 0d0a                 .N..QUIT..
07:24:23.118435 IP 127.0.0.1.8888 > 127.0.0.1.39138: Flags [P.], seq 111167197:111167206, ack 3699364961, win 342, options [nop,nop,TS val 3360604881 ecr 3360604880], length 9
        0x0000:  4500 003d 47c4 4000 4006 f4f4 7f00 0001  E..=G.@.@.......
        0x0010:  7f00 0001 22b8 98e2 06a0 46dd dc7f d461  ....".....F....a
        0x0020:  8018 0156 fe31 0000 0101 080a c84e c2d1  ...V.1.......N..
        0x0030:  c84e c2d0 3230 3020 4259 450d 0a         .N..200.BYE..
07:24:23.118459 IP 127.0.0.1.8888 > 127.0.0.1.39138: Flags [F.], seq 111167206, ack 3699364961, win 342, options [nop,nop,TS val 3360604881 ecr 3360604880], length 0
        0x0000:  4500 0034 47c5 4000 4006 f4fc 7f00 0001  E..4G.@.@.......
        0x0010:  7f00 0001 22b8 98e2 06a0 46e6 dc7f d461  ....".....F....a
        0x0020:  8011 0156 fe28 0000 0101 080a c84e c2d1  ...V.(.......N..
        0x0030:  c84e c2d0                                .N..
07:24:23.118518 IP 127.0.0.1.39138 > 127.0.0.1.8888: Flags [F.], seq 3699364961, ack 111167207, win 342, options [nop,nop,TS val 3360604881 ecr 3360604881], length 0
        0x0000:  4500 0034 48b3 4000 4006 f40e 7f00 0001  E..4H.@.@.......
        0x0010:  7f00 0001 98e2 22b8 dc7f d461 06a0 46e7  ......"....a..F.
        0x0020:  8011 0156 fe28 0000 0101 080a c84e c2d1  ...V.(.......N..
        0x0030:  c84e c2d1                                .N..
07:24:23.118551 IP 127.0.0.1.8888 > 127.0.0.1.39138: Flags [.], ack 3699364962, win 342, options [nop,nop,TS val 3360604881 ecr 3360604881], length 0
        0x0000:  4500 0034 47c6 4000 4006 f4fb 7f00 0001  E..4G.@.@.......
        0x0010:  7f00 0001 22b8 98e2 06a0 46e7 dc7f d462  ....".....F....b
        0x0020:  8010 0156 fe28 0000 0101 080a c84e c2d1  ...V.(.......N..
        0x0030:  c84e c2d1                                .N..
^C
25 packets captured
50 packets received by filter
0 packets dropped by kernel

ちょっと量が多いですが、ペイロードをよーく見ると 送信・受信 じゃなくて 送信・送信 のように連続して流れている部分があったりします。

参考情報

Goのおすすめ書籍


過去の記事については、以下のページからご参照下さい。

サンプルコードは、以下の場所で公開しています。




以上の内容はhttps://devlights.hatenablog.com/entry/2025/03/12/073000より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14