diff --git a/runctl.go b/runctl.go index 234a79e..846ab10 100644 --- a/runctl.go +++ b/runctl.go @@ -131,22 +131,36 @@ func NewRunControl(cfg config.RunCtl, stdout io.Writer) (*RunControl, error) { } rc.srv = srv - port := "" - switch ip := rc.srv.Addr().(*net.TCPAddr).IP; { - case ip.To4() != nil: - port = ip.String() + ":0" + var ( + portLog string + portHB string + ) + switch addr := rc.srv.Addr().(type) { + case *net.TCPAddr: + port := "" + switch ip := addr.IP; { + case ip.To4() != nil: + port = ip.String() + ":0" + default: + // IPv6 + port = "[" + ip.String() + "]:0" + } + portLog = port + portHB = port + case *net.UnixAddr: + portLog = addr.Name + "-log" + portHB = addr.Name + "-hbeat" default: - // IPv6 - port = "[" + ip.String() + "]:0" + return nil, xerrors.Errorf("invalid net address type %T: %v", addr, addr) } - srv, err = net.Listen(cfg.Net, port) + srv, err = net.Listen(cfg.Net, portLog) if err != nil { return nil, xerrors.Errorf("could not create %s log server: %w", cfg.Net, err) } rc.log = srv - hbeat, err := net.Listen(cfg.Net, port) + hbeat, err := net.Listen(cfg.Net, portHB) if err != nil { return nil, xerrors.Errorf("could not create %s hbeat server: %w", cfg.Net, err) } diff --git a/runctl_test.go b/runctl_test.go index a83a995..7015389 100644 --- a/runctl_test.go +++ b/runctl_test.go @@ -33,104 +33,128 @@ func TestRunControlAPI(t *testing.T) { proclvl = log.LvlInfo ) - port, err := tcputil.GetTCPPort() + tmpdir, err := ioutil.TempDir("", "go-tdaq-") if err != nil { - t.Fatalf("could not find a tcp port for run-ctl: %+v", err) + t.Fatal(err) } + defer os.RemoveAll(tmpdir) - rcAddr := ":" + port + for _, tc := range []struct { + network string + port func(string) (string, error) + }{ + { + network: "tcp", + port: func(string) (string, error) { + p, err := tcputil.GetTCPPort() + return ":" + p, err + }, + }, + // { // FIXME(sbinet) + // network: "unix", + // port: func(n string) (string, error) { + // return filepath.Join(tmpdir, n), nil + // }, + // }, + } { + t.Run(tc.network, func(t *testing.T) { + rcAddr, err := tc.port("runctl") + if err != nil { + t.Fatalf("could not find a port for run-ctl: %+v", err) + } - port, err = tcputil.GetTCPPort() - if err != nil { - t.Fatalf("could not find a tcp port for run-ctl web server: %+v", err) - } - webAddr := ":" + port - - stdout := new(bytes.Buffer) - app := job.New("tcp", stdout) - defer func() { - if err != nil { - t.Logf("stdout:\n%v\n", stdout.String()) - } - }() - - app.Cfg.RunCtl = rcAddr - app.Cfg.Web = webAddr - app.Cfg.Level = rclvl - - app.Add( - func() job.Proc { - dev := new(xdaq.I64Gen) - return job.Proc{ - Dev: dev, - Level: proclvl, - Name: "data-src", - Cmds: job.CmdHandlers{ - "/config": dev.OnConfig, - "/init": dev.OnInit, - "/reset": dev.OnReset, - "/start": dev.OnStart, - "/stop": dev.OnStop, - "/quit": dev.OnQuit, - }, - Outputs: job.OutputHandlers{ - "/i64": dev.Output, - }, - Handlers: job.RunHandlers{dev.Loop}, - } - }(), - ) + webAddr, err := tc.port("web") + if err != nil { + t.Fatalf("could not find a port for run-ctl web server: %+v", err) + } - for _, i := range []int{1, 2, 3} { - name := fmt.Sprintf("data-sink-%d", i) - app.Add( - func() job.Proc { - dev := new(xdaq.I64Dumper) - return job.Proc{ - Dev: dev, - Name: name, - Inputs: job.InputHandlers{ - "/i64": dev.Input, - }, + stdout := new(bytes.Buffer) + app := job.New(tc.network, stdout) + defer func() { + if err != nil { + t.Logf("stdout:\n%v\n", stdout.String()) } - }(), - ) - } - - err = app.Start() - if err != nil { - t.Fatalf("could not start job: %+v", err) - } + }() + + app.Cfg.RunCtl = rcAddr + app.Cfg.Web = webAddr + app.Cfg.Level = rclvl + + app.Add( + func() job.Proc { + dev := new(xdaq.I64Gen) + return job.Proc{ + Dev: dev, + Level: proclvl, + Name: "data-src", + Cmds: job.CmdHandlers{ + "/config": dev.OnConfig, + "/init": dev.OnInit, + "/reset": dev.OnReset, + "/start": dev.OnStart, + "/stop": dev.OnStop, + "/quit": dev.OnQuit, + }, + Outputs: job.OutputHandlers{ + "/i64": dev.Output, + }, + Handlers: job.RunHandlers{dev.Loop}, + } + }(), + ) + + for _, i := range []int{1, 2, 3} { + name := fmt.Sprintf("data-sink-%d", i) + app.Add( + func() job.Proc { + dev := new(xdaq.I64Dumper) + return job.Proc{ + Dev: dev, + Name: name, + Inputs: job.InputHandlers{ + "/i64": dev.Input, + }, + } + }(), + ) + } - for _, tt := range []struct { - name string - cmd tdaq.CmdType - }{ - {"config", tdaq.CmdConfig}, - {"init", tdaq.CmdInit}, - {"reset", tdaq.CmdReset}, - {"config", tdaq.CmdConfig}, - {"init", tdaq.CmdInit}, - {"start", tdaq.CmdStart}, - {"stop", tdaq.CmdStop}, - {"status", tdaq.CmdStatus}, - {"start", tdaq.CmdStart}, - {"stop", tdaq.CmdStop}, - {"quit", tdaq.CmdQuit}, - } { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - func() { - defer cancel() - err = app.Do(ctx, tt.cmd) + err = app.Start() if err != nil { - t.Fatalf("could not send command %v: %+v", tt.cmd, err) + t.Fatalf("could not start job: %+v", err) } - }() - } - err = app.Wait() - if err != nil { - t.Fatalf("could not run app: %+v", err) + for _, tt := range []struct { + name string + cmd tdaq.CmdType + }{ + {"config", tdaq.CmdConfig}, + {"init", tdaq.CmdInit}, + {"reset", tdaq.CmdReset}, + {"config", tdaq.CmdConfig}, + {"init", tdaq.CmdInit}, + {"start", tdaq.CmdStart}, + {"stop", tdaq.CmdStop}, + {"status", tdaq.CmdStatus}, + {"start", tdaq.CmdStart}, + {"stop", tdaq.CmdStop}, + {"quit", tdaq.CmdQuit}, + } { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + func() { + defer cancel() + err = app.Do(ctx, tt.cmd) + if err != nil { + t.Fatalf("could not send command %v: %+v", tt.cmd, err) + } + }() + } + + err = app.Wait() + if err != nil { + t.Fatalf("could not run app: %+v", err) + } + }) } } @@ -142,121 +166,145 @@ func TestRunControlWithDuplicateProc(t *testing.T) { proclvl = log.LvlInfo ) - port, err := tcputil.GetTCPPort() + tmpdir, err := ioutil.TempDir("", "go-tdaq-") if err != nil { - t.Fatalf("could not find a tcp port for run-ctl: %+v", err) + t.Fatal(err) } + defer os.RemoveAll(tmpdir) - rcAddr := ":" + port + for _, tc := range []struct { + network string + port func(string) (string, error) + }{ + { + network: "tcp", + port: func(string) (string, error) { + p, err := tcputil.GetTCPPort() + return ":" + p, err + }, + }, + // { // FIXME(sbinet) + // network: "unix", + // port: func(n string) (string, error) { + // return filepath.Join(tmpdir, n), nil + // }, + // }, + } { + t.Run(tc.network, func(t *testing.T) { + rcAddr, err := tc.port("runctl") + if err != nil { + t.Fatalf("could not find a port for run-ctl: %+v", err) + } - port, err = tcputil.GetTCPPort() - if err != nil { - t.Fatalf("could not find a tcp port for run-ctl web server: %+v", err) - } - webAddr := ":" + port + webAddr, err := tc.port("web") + if err != nil { + t.Fatalf("could not find a port for run-ctl web server: %+v", err) + } - stdout := iomux.NewWriter(new(bytes.Buffer)) + stdout := iomux.NewWriter(new(bytes.Buffer)) - fname, err := ioutil.TempFile("", "tdaq-") - if err != nil { - t.Fatalf("could not create a temporary log file for run-ctl log server: %+v", err) - } - fname.Close() - defer func() { - if err != nil { - raw, err := ioutil.ReadFile(fname.Name()) - if err == nil { - t.Logf("log-file:\n%v\n", string(raw)) - } - } - os.Remove(fname.Name()) - }() - - cfg := config.RunCtl{ - Name: "run-ctl", - Level: rclvl, - Net: "tcp", - RunCtl: rcAddr, - Web: webAddr, - LogFile: fname.Name(), - HBeatFreq: 50 * time.Millisecond, - } + fname, err := ioutil.TempFile("", "tdaq-") + if err != nil { + t.Fatalf("could not create a temporary log file for run-ctl log server: %+v", err) + } + fname.Close() + defer func() { + if err != nil { + raw, err := ioutil.ReadFile(fname.Name()) + if err == nil { + t.Logf("log-file:\n%v\n", string(raw)) + } + } + os.Remove(fname.Name()) + }() + + cfg := config.RunCtl{ + Name: "run-ctl", + Level: rclvl, + Net: tc.network, + RunCtl: rcAddr, + Web: webAddr, + LogFile: fname.Name(), + HBeatFreq: 50 * time.Millisecond, + } - rc, err := tdaq.NewRunControl(cfg, stdout) - if err != nil { - t.Fatalf("could not create run-ctl: %+v", err) - } + rc, err := tdaq.NewRunControl(cfg, stdout) + if err != nil { + t.Fatalf("could not create run-ctl: %+v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + grp, ctx := errgroup.WithContext(ctx) + + errc := make(chan error) + go func() { + errc <- rc.Run(ctx) + }() + + grp.Go(func() error { + dev := xdaq.I64Gen{} + + cfg := config.Process{ + Name: "proc-1", + Level: proclvl, + Net: tc.network, + RunCtl: rcAddr, + } + srv := tdaq.New(cfg, stdout) + srv.OutputHandle("/i64", dev.Output) + + srv.RunHandle(dev.Loop) + + err := srv.Run(ctx) + return err + }) + + grp.Go(func() error { + dev := xdaq.I64Dumper{} + cfg := config.Process{ + Name: "proc-1", + Level: proclvl, + Net: tc.network, + RunCtl: rcAddr, + } + srv := tdaq.New(cfg, stdout) + srv.InputHandle("/i64", dev.Input) + err := srv.Run(ctx) + return err + }) + + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + + loop: + for { + select { + case <-timeout.C: + t.Fatalf("devices did not connect") + case err := <-errc: + if err == nil { + t.Fatalf("expected an error!") + } + if !xerrors.Is(err, context.Canceled) { + t.Fatalf("expected a canceled-context error, got: %+v", err) + } + break loop + } + } - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() - - grp, ctx := errgroup.WithContext(ctx) - - errc := make(chan error) - go func() { - errc <- rc.Run(ctx) - }() - - grp.Go(func() error { - dev := xdaq.I64Gen{} - - cfg := config.Process{ - Name: "proc-1", - Level: proclvl, - Net: "tcp", - RunCtl: rcAddr, - } - srv := tdaq.New(cfg, stdout) - srv.OutputHandle("/i64", dev.Output) - - srv.RunHandle(dev.Loop) - - err := srv.Run(ctx) - return err - }) - - grp.Go(func() error { - dev := xdaq.I64Dumper{} - cfg := config.Process{ - Name: "proc-1", - Level: proclvl, - Net: "tcp", - RunCtl: rcAddr, - } - srv := tdaq.New(cfg, stdout) - srv.InputHandle("/i64", dev.Input) - err := srv.Run(ctx) - return err - }) - - timeout := time.NewTimer(5 * time.Second) - defer timeout.Stop() - -loop: - for { - select { - case <-timeout.C: - t.Fatalf("devices did not connect") - case err := <-errc: + err = grp.Wait() if err == nil { t.Fatalf("expected an error!") } - if !xerrors.Is(err, context.Canceled) { - t.Fatalf("expected a canceled-context error, got: %+v", err) + want := xerrors.Errorf(`could not join run-ctl: received error /join-ack from run-ctl: duplicate tdaq process with name "proc-1"`) + if got, want := err.Error(), want.Error(); !strings.HasPrefix(got, want) { + t.Fatalf("invalid error.\ngot= %v\nwant=%v\n", got, want) } - break loop - } - } - - err = grp.Wait() - if err == nil { - t.Fatalf("expected an error!") - } - want := xerrors.Errorf(`could not join run-ctl: received error /join-ack from run-ctl: duplicate tdaq process with name "proc-1"`) - if got, want := err.Error(), want.Error(); !strings.HasPrefix(got, want) { - t.Fatalf("invalid error.\ngot= %v\nwant=%v\n", got, want) + err = nil + }) } - err = nil } func TestRunControlWithDuplicateOutput(t *testing.T) { @@ -267,148 +315,171 @@ func TestRunControlWithDuplicateOutput(t *testing.T) { proclvl = log.LvlInfo ) - port, err := tcputil.GetTCPPort() + tmpdir, err := ioutil.TempDir("", "go-tdaq-") if err != nil { - t.Fatalf("could not find a tcp port for run-ctl: %+v", err) + t.Fatal(err) } + defer os.RemoveAll(tmpdir) - rcAddr := ":" + port + for _, tc := range []struct { + network string + port func(string) (string, error) + }{ + { + network: "tcp", + port: func(string) (string, error) { + p, err := tcputil.GetTCPPort() + return ":" + p, err + }, + }, + // { // FIXME(sbinet) + // network: "unix", + // port: func(n string) (string, error) { + // return filepath.Join(tmpdir, n), nil + // }, + // }, + } { + t.Run(tc.network, func(t *testing.T) { + rcAddr, err := tc.port("runctl") + if err != nil { + t.Fatalf("could not find a port for run-ctl: %+v", err) + } - port, err = tcputil.GetTCPPort() - if err != nil { - t.Fatalf("could not find a tcp port for run-ctl web server: %+v", err) - } - webAddr := ":" + port + webAddr, err := tc.port("web") + if err != nil { + t.Fatalf("could not find a port for run-ctl web server: %+v", err) + } - stdout := iomux.NewWriter(new(bytes.Buffer)) + stdout := iomux.NewWriter(new(bytes.Buffer)) - fname, err := ioutil.TempFile("", "tdaq-") - if err != nil { - t.Fatalf("could not create a temporary log file for run-ctl log server: %+v", err) - } - fname.Close() - defer func() { - if err != nil { - raw, err := ioutil.ReadFile(fname.Name()) - if err == nil { - t.Logf("log-file:\n%v\n", string(raw)) - } - } - os.Remove(fname.Name()) - }() - - cfg := config.RunCtl{ - Name: "run-ctl", - Level: rclvl, - Net: "tcp", - RunCtl: rcAddr, - Web: webAddr, - LogFile: fname.Name(), - HBeatFreq: 50 * time.Millisecond, - } - - rc, err := tdaq.NewRunControl(cfg, stdout) - if err != nil { - t.Fatalf("could not create run-ctl: %+v", err) - } + fname, err := ioutil.TempFile("", "tdaq-") + if err != nil { + t.Fatalf("could not create a temporary log file for run-ctl log server: %+v", err) + } + fname.Close() + defer func() { + if err != nil { + raw, err := ioutil.ReadFile(fname.Name()) + if err == nil { + t.Logf("log-file:\n%v\n", string(raw)) + } + } + os.Remove(fname.Name()) + }() + + cfg := config.RunCtl{ + Name: "run-ctl", + Level: rclvl, + Net: tc.network, + RunCtl: rcAddr, + Web: webAddr, + LogFile: fname.Name(), + HBeatFreq: 50 * time.Millisecond, + } - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() - - grp, ctx := errgroup.WithContext(ctx) - - errc := make(chan error) - go func() { - errc <- rc.Run(ctx) - }() - - grp.Go(func() error { - dev := xdaq.I64Gen{} - - cfg := config.Process{ - Name: "proc-1", - Level: proclvl, - Net: "tcp", - RunCtl: rcAddr, - } - srv := tdaq.New(cfg, stdout) - srv.OutputHandle("/i64", dev.Output) - - srv.RunHandle(dev.Loop) - - err := srv.Run(ctx) - return err - }) - - grp.Go(func() error { - dev := xdaq.I64Gen{} - cfg := config.Process{ - Name: "proc-2", - Level: proclvl, - Net: "tcp", - RunCtl: rcAddr, - } - srv := tdaq.New(cfg, stdout) - srv.OutputHandle("/i64", dev.Output) - err := srv.Run(ctx) - return err - }) - - timeout := time.NewTimer(5 * time.Second) - defer timeout.Stop() -loop: - for { - select { - case <-timeout.C: - t.Fatalf("devices did not connect") - default: - n := rc.NumClients() - if n == 2 { - break loop - } - } - } + rc, err := tdaq.NewRunControl(cfg, stdout) + if err != nil { + t.Fatalf("could not create run-ctl: %+v", err) + } - for _, tt := range []struct { - name string - cmd tdaq.CmdType - err error - }{ - {"config", tdaq.CmdConfig, nil}, - {"init", tdaq.CmdInit, xerrors.Errorf(`could not create DAG of data dependencies: could not build graph for analysis: node "proc-1" already declared "/i64" as its output (dup-node="proc-2")`)}, - {"quit", tdaq.CmdQuit, nil}, - } { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - err = rc.Do(ctx, tt.cmd) - switch { - case err == nil && tt.err == nil: - // ok - case err != nil && tt.err != nil: - if got, want := err.Error(), tt.err.Error(); got != want { - t.Fatalf("sent command /%s\ngot = %+v\nwant= %+v\n", tt.cmd, err, tt.err) + + grp, ctx := errgroup.WithContext(ctx) + + errc := make(chan error) + go func() { + errc <- rc.Run(ctx) + }() + + grp.Go(func() error { + dev := xdaq.I64Gen{} + + cfg := config.Process{ + Name: "proc-1", + Level: proclvl, + Net: tc.network, + RunCtl: rcAddr, } - case err != nil && tt.err == nil: - t.Fatalf("could not send command %v: %+v", tt.cmd, err) - case err == nil && tt.err != nil: - t.Fatalf("sent command /%s. got=nil, want=%+v\n", tt.cmd, tt.err) - default: - t.Fatalf("err: %+v", err) - } - }() - } + srv := tdaq.New(cfg, stdout) + srv.OutputHandle("/i64", dev.Output) + + srv.RunHandle(dev.Loop) + + err := srv.Run(ctx) + return err + }) + + grp.Go(func() error { + dev := xdaq.I64Gen{} + cfg := config.Process{ + Name: "proc-2", + Level: proclvl, + Net: tc.network, + RunCtl: rcAddr, + } + srv := tdaq.New(cfg, stdout) + srv.OutputHandle("/i64", dev.Output) + err := srv.Run(ctx) + return err + }) + + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + loop: + for { + select { + case <-timeout.C: + t.Fatalf("devices did not connect") + default: + n := rc.NumClients() + if n == 2 { + break loop + } + } + } - err = grp.Wait() - if err != nil { - t.Fatalf("could not run device run-group: %+v", err) - } + for _, tt := range []struct { + name string + cmd tdaq.CmdType + err error + }{ + {"config", tdaq.CmdConfig, nil}, + {"init", tdaq.CmdInit, xerrors.Errorf(`could not create DAG of data dependencies: could not build graph for analysis: node "proc-1" already declared "/i64" as its output (dup-node="proc-2")`)}, + {"quit", tdaq.CmdQuit, nil}, + } { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + func() { + defer cancel() + err = rc.Do(ctx, tt.cmd) + switch { + case err == nil && tt.err == nil: + // ok + case err != nil && tt.err != nil: + if got, want := err.Error(), tt.err.Error(); got != want { + t.Fatalf("sent command /%s\ngot = %+v\nwant= %+v\n", tt.cmd, err, tt.err) + } + case err != nil && tt.err == nil: + t.Fatalf("could not send command %v: %+v", tt.cmd, err) + case err == nil && tt.err != nil: + t.Fatalf("sent command /%s. got=nil, want=%+v\n", tt.cmd, tt.err) + default: + t.Fatalf("err: %+v", err) + } + }() + } - err = <-errc - if err != nil && !xerrors.Is(err, context.Canceled) { - t.Fatalf("error shutting down run-ctl: %+v", err) - } + err = grp.Wait() + if err != nil { + t.Fatalf("could not run device run-group: %+v", err) + } + err = <-errc + if err != nil && !xerrors.Is(err, context.Canceled) { + t.Fatalf("error shutting down run-ctl: %+v", err) + } + }) + } } func TestRunControlWithMissingInput(t *testing.T) { @@ -419,129 +490,152 @@ func TestRunControlWithMissingInput(t *testing.T) { proclvl = log.LvlInfo ) - port, err := tcputil.GetTCPPort() + tmpdir, err := ioutil.TempDir("", "go-tdaq-") if err != nil { - t.Fatalf("could not find a tcp port for run-ctl: %+v", err) + t.Fatal(err) } + defer os.RemoveAll(tmpdir) - rcAddr := ":" + port - - port, err = tcputil.GetTCPPort() - if err != nil { - t.Fatalf("could not find a tcp port for run-ctl web server: %+v", err) - } - webAddr := ":" + port + for _, tc := range []struct { + network string + port func(string) (string, error) + }{ + { + network: "tcp", + port: func(string) (string, error) { + p, err := tcputil.GetTCPPort() + return ":" + p, err + }, + }, + // { + // network: "unix", + // port: func(n string) (string, error) { + // return filepath.Join(tmpdir, n), nil + // }, + // }, + } { + t.Run(tc.network, func(t *testing.T) { + rcAddr, err := tc.port("runctl") + if err != nil { + t.Fatalf("could not find a port for run-ctl: %+v", err) + } - stdout := iomux.NewWriter(new(bytes.Buffer)) + webAddr, err := tc.port("web") + if err != nil { + t.Fatalf("could not find a port for run-ctl web server: %+v", err) + } - fname, err := ioutil.TempFile("", "tdaq-") - if err != nil { - t.Fatalf("could not create a temporary log file for run-ctl log server: %+v", err) - } - fname.Close() - defer func() { - if err != nil { - raw, err := ioutil.ReadFile(fname.Name()) - if err == nil { - t.Logf("log-file:\n%v\n", string(raw)) - } - } - os.Remove(fname.Name()) - }() - - cfg := config.RunCtl{ - Name: "run-ctl", - Level: rclvl, - Net: "tcp", - RunCtl: rcAddr, - Web: webAddr, - LogFile: fname.Name(), - HBeatFreq: 50 * time.Millisecond, - } + stdout := iomux.NewWriter(new(bytes.Buffer)) - rc, err := tdaq.NewRunControl(cfg, stdout) - if err != nil { - t.Fatalf("could not create run-ctl: %+v", err) - } + fname, err := ioutil.TempFile("", "tdaq-") + if err != nil { + t.Fatalf("could not create a temporary log file for run-ctl log server: %+v", err) + } + fname.Close() + defer func() { + if err != nil { + raw, err := ioutil.ReadFile(fname.Name()) + if err == nil { + t.Logf("log-file:\n%v\n", string(raw)) + } + } + os.Remove(fname.Name()) + }() + + cfg := config.RunCtl{ + Name: "run-ctl", + Level: rclvl, + Net: tc.network, + RunCtl: rcAddr, + Web: webAddr, + LogFile: fname.Name(), + HBeatFreq: 50 * time.Millisecond, + } - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() - - grp, ctx := errgroup.WithContext(ctx) - - errc := make(chan error) - go func() { - errc <- rc.Run(ctx) - }() - - grp.Go(func() error { - dev := xdaq.I64Dumper{} - - cfg := config.Process{ - Name: "proc-1", - Level: proclvl, - Net: "tcp", - RunCtl: rcAddr, - } - srv := tdaq.New(cfg, stdout) - srv.InputHandle("/i64", dev.Input) - - err := srv.Run(ctx) - return err - }) - - timeout := time.NewTimer(5 * time.Second) - defer timeout.Stop() -loop: - for { - select { - case <-timeout.C: - t.Fatalf("devices did not connect") - default: - n := rc.NumClients() - if n == 1 { - break loop - } - } - } + rc, err := tdaq.NewRunControl(cfg, stdout) + if err != nil { + t.Fatalf("could not create run-ctl: %+v", err) + } - for _, tt := range []struct { - name string - cmd tdaq.CmdType - err error - }{ - {"config", tdaq.CmdConfig, xerrors.Errorf(`could not find a provider for input "/i64" for "proc-1"`)}, - {"quit", tdaq.CmdQuit, nil}, - } { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - err = rc.Do(ctx, tt.cmd) - switch { - case err == nil && tt.err == nil: - // ok - case err != nil && tt.err != nil: - if got, want := err.Error(), tt.err.Error(); got != want { - t.Fatalf("sent command /%s\ngot = %+v\nwant= %+v\n", tt.cmd, err, tt.err) + + grp, ctx := errgroup.WithContext(ctx) + + errc := make(chan error) + go func() { + errc <- rc.Run(ctx) + }() + + grp.Go(func() error { + dev := xdaq.I64Dumper{} + + cfg := config.Process{ + Name: "proc-1", + Level: proclvl, + Net: tc.network, + RunCtl: rcAddr, } - case err != nil && tt.err == nil: - t.Fatalf("could not send command %v: %+v", tt.cmd, err) - case err == nil && tt.err != nil: - t.Fatalf("sent command /%s. got=nil, want=%+v\n", tt.cmd, tt.err) - default: - t.Fatalf("err: %+v", err) - } - }() - } + srv := tdaq.New(cfg, stdout) + srv.InputHandle("/i64", dev.Input) + + err := srv.Run(ctx) + return err + }) + + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + loop: + for { + select { + case <-timeout.C: + t.Fatalf("devices did not connect") + default: + n := rc.NumClients() + if n == 1 { + break loop + } + } + } - err = grp.Wait() - if err != nil { - t.Fatalf("could not run device run-group: %+v", err) - } + for _, tt := range []struct { + name string + cmd tdaq.CmdType + err error + }{ + {"config", tdaq.CmdConfig, xerrors.Errorf(`could not find a provider for input "/i64" for "proc-1"`)}, + {"quit", tdaq.CmdQuit, nil}, + } { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + func() { + defer cancel() + err = rc.Do(ctx, tt.cmd) + switch { + case err == nil && tt.err == nil: + // ok + case err != nil && tt.err != nil: + if got, want := err.Error(), tt.err.Error(); got != want { + t.Fatalf("sent command /%s\ngot = %+v\nwant= %+v\n", tt.cmd, err, tt.err) + } + case err != nil && tt.err == nil: + t.Fatalf("could not send command %v: %+v", tt.cmd, err) + case err == nil && tt.err != nil: + t.Fatalf("sent command /%s. got=nil, want=%+v\n", tt.cmd, tt.err) + default: + t.Fatalf("err: %+v", err) + } + }() + } - err = <-errc - if err != nil && !xerrors.Is(err, context.Canceled) { - t.Fatalf("error shutting down run-ctl: %+v", err) - } + err = grp.Wait() + if err != nil { + t.Fatalf("could not run device run-group: %+v", err) + } + err = <-errc + if err != nil && !xerrors.Is(err, context.Canceled) { + t.Fatalf("error shutting down run-ctl: %+v", err) + } + }) + } } diff --git a/webctl_test.go b/webctl_test.go index 9ae2994..8ab51a8 100644 --- a/webctl_test.go +++ b/webctl_test.go @@ -39,227 +39,251 @@ func TestRunControlWebAPI(t *testing.T) { proclvl = log.LvlDebug ) - port, err := tcputil.GetTCPPort() + tmpdir, err := ioutil.TempDir("", "go-tdaq-") if err != nil { - t.Fatalf("could not find a tcp port for run-ctl: %+v", err) + t.Fatal(err) } + defer os.RemoveAll(tmpdir) - rcAddr := ":" + port - - port, err = tcputil.GetTCPPort() - if err != nil { - t.Fatalf("could not find a tcp port for run-ctl web server: %+v", err) - } - webAddr := ":" + port - - stdout := iomux.NewWriter(new(bytes.Buffer)) - - fname, err := ioutil.TempFile("", "tdaq-") - if err != nil { - t.Fatalf("could not create a temporary log file for run-ctl log server: %+v", err) - } - fname.Close() - defer func() { - if err != nil { - raw, err := ioutil.ReadFile(fname.Name()) - if err == nil { - t.Logf("log-file:\n%v\n", string(raw)) + for _, tc := range []struct { + network string + port func(string) (string, error) + }{ + { + network: "tcp", + port: func(string) (string, error) { + p, err := tcputil.GetTCPPort() + return ":" + p, err + }, + }, + // { + // network: "unix", + // port: func(n string) (string, error) { + // return filepath.Join(tmpdir, n), nil + // }, + // }, + } { + t.Run(tc.network, func(t *testing.T) { + rcAddr, err := tc.port("runctl") + if err != nil { + t.Fatalf("could not find a port for run-ctl: %+v", err) } - } - os.Remove(fname.Name()) - }() - - cfg := config.RunCtl{ - Name: "run-ctl", - Level: rclvl, - Net: "tcp", - RunCtl: rcAddr, - Web: webAddr, - LogFile: fname.Name(), - HBeatFreq: 50 * time.Millisecond, - } - rc, err := tdaq.NewRunControl(cfg, stdout) - if err != nil { - t.Fatalf("could not create run-ctl: %+v", err) - } - tsrv := httptest.NewUnstartedServer(rc.Web().(*http.Server).Handler) - tsrv.Config.ReadTimeout = 5 * time.Second - tsrv.Config.WriteTimeout = 5 * time.Second - tsrv.Start() - defer tsrv.Close() - - cli := tsrv.Client() - rc.SetWebSrv(newWebSrvTest(tsrv)) - tcli := &testCli{tsrv} - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() - - grp, ctx := errgroup.WithContext(ctx) - - errc := make(chan error) - go func() { - errc <- rc.Run(ctx) - }() - - grp.Go(func() error { - dev := xdaq.I64Gen{} - - cfg := config.Process{ - Name: "data-src", - Level: proclvl, - Net: "tcp", - RunCtl: rcAddr, - } - srv := tdaq.New(cfg, stdout) - srv.CmdHandle("/config", dev.OnConfig) - srv.CmdHandle("/init", dev.OnInit) - srv.CmdHandle("/reset", dev.OnReset) - srv.CmdHandle("/start", dev.OnStart) - srv.CmdHandle("/stop", dev.OnStop) - srv.CmdHandle("/quit", dev.OnQuit) - - srv.OutputHandle("/i64", dev.Output) - - srv.RunHandle(dev.Loop) - - err := srv.Run(ctx) - return err - }) - - for _, i := range []int{1, 2, 3} { - name := fmt.Sprintf("data-sink-%d", i) - grp.Go(func() error { - dev := xdaq.I64Dumper{} - - cfg := config.Process{ - Name: name, - Level: proclvl, - Net: "tcp", - RunCtl: rcAddr, + webAddr, err := tc.port("web") + if err != nil { + t.Fatalf("could not find a port for run-ctl web server: %+v", err) } - srv := tdaq.New(cfg, stdout) - srv.CmdHandle("/init", dev.OnInit) - srv.CmdHandle("/reset", dev.OnReset) - srv.CmdHandle("/stop", dev.OnStop) - srv.InputHandle("/i64", dev.Input) - - err := srv.Run(context.Background()) - return err - }) - } + stdout := iomux.NewWriter(new(bytes.Buffer)) - timeout := time.NewTimer(5 * time.Second) - defer timeout.Stop() -loop: - for { - select { - case <-timeout.C: - t.Fatalf("devices did not connect") - default: - n := rc.NumClients() - if n == 4 { - break loop + fname, err := ioutil.TempFile("", "tdaq-") + if err != nil { + t.Fatalf("could not create a temporary log file for run-ctl log server: %+v", err) + } + fname.Close() + defer func() { + if err != nil { + raw, err := ioutil.ReadFile(fname.Name()) + if err == nil { + t.Logf("log-file:\n%v\n", string(raw)) + } + } + os.Remove(fname.Name()) + }() + + cfg := config.RunCtl{ + Name: "run-ctl", + Level: rclvl, + Net: tc.network, + RunCtl: rcAddr, + Web: webAddr, + LogFile: fname.Name(), + HBeatFreq: 50 * time.Millisecond, } - } - } - - func() { - req, err := http.NewRequest(http.MethodGet, tsrv.URL+"/", nil) - if err != nil { - t.Fatalf("could not create http request for %q: %+v", "/", err) - } - resp, err := cli.Do(req) - if err != nil { - t.Fatalf("could not get /: %+v", err) - } - defer resp.Body.Close() - }() - - ws, err := newTestWS(tsrv) - if err != nil { - t.Fatalf("could not create test websocket: %+v", err) - } - defer ws.Close() - - status, err := ws.readStatus() - if err != nil { - t.Fatalf("could not get /status: %+v", err) - } - if got, want := status, fsm.UnConf.String(); got != want { - t.Fatalf("invalid status: got=%q, want=%q", got, want) - } - func() { - // test invalid command - cmd := "invalid-command" - req, err := tcli.cmd(cmd) - if err != nil { - t.Fatalf("could not prepare invalid command /%s: %+v", cmd, err) - } - resp, err := cli.Do(req) - if err != nil { - t.Fatalf("could not send invalid command /%s: %+v", cmd, err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusBadRequest { - t.Fatalf("invalid status code for invalid command: %v", resp.StatusCode) - } - }() - - for _, tt := range []struct { - name string - want string - }{ - {name: "config", want: fsm.Conf.String()}, - {name: "init", want: fsm.Init.String()}, - {name: "reset", want: fsm.UnConf.String()}, - {name: "config", want: fsm.Conf.String()}, - {name: "init", want: fsm.Init.String()}, - {name: "start", want: fsm.Running.String()}, - {name: "stop", want: fsm.Stopped.String()}, - {name: "status", want: fsm.Stopped.String()}, - {name: "start", want: fsm.Running.String()}, - {name: "stop", want: fsm.Stopped.String()}, - {name: "quit", want: fsm.Exiting.String()}, - } { - req, err := tcli.cmd(tt.name) - if err != nil { - t.Fatalf("could not prepare cmd /%s: %+v", tt.name, err) - } - func() { - var resp *http.Response - resp, err = cli.Do(req) + rc, err := tdaq.NewRunControl(cfg, stdout) if err != nil { - t.Fatalf("could not send command /%s: %+v", tt.name, err) + t.Fatalf("could not create run-ctl: %+v", err) + } + tsrv := httptest.NewUnstartedServer(rc.Web().(*http.Server).Handler) + tsrv.Config.ReadTimeout = 5 * time.Second + tsrv.Config.WriteTimeout = 5 * time.Second + tsrv.Start() + defer tsrv.Close() + + cli := tsrv.Client() + rc.SetWebSrv(newWebSrvTest(tsrv)) + tcli := &testCli{tsrv} + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + grp, ctx := errgroup.WithContext(ctx) + + errc := make(chan error) + go func() { + errc <- rc.Run(ctx) + }() + + grp.Go(func() error { + dev := xdaq.I64Gen{} + + cfg := config.Process{ + Name: "data-src", + Level: proclvl, + Net: tc.network, + RunCtl: rcAddr, + } + srv := tdaq.New(cfg, stdout) + srv.CmdHandle("/config", dev.OnConfig) + srv.CmdHandle("/init", dev.OnInit) + srv.CmdHandle("/reset", dev.OnReset) + srv.CmdHandle("/start", dev.OnStart) + srv.CmdHandle("/stop", dev.OnStop) + srv.CmdHandle("/quit", dev.OnQuit) + + srv.OutputHandle("/i64", dev.Output) + + srv.RunHandle(dev.Loop) + + err := srv.Run(ctx) + return err + }) + + for _, i := range []int{1, 2, 3} { + name := fmt.Sprintf("data-sink-%d", i) + grp.Go(func() error { + dev := xdaq.I64Dumper{} + + cfg := config.Process{ + Name: name, + Level: proclvl, + Net: tc.network, + RunCtl: rcAddr, + } + srv := tdaq.New(cfg, stdout) + srv.CmdHandle("/init", dev.OnInit) + srv.CmdHandle("/reset", dev.OnReset) + srv.CmdHandle("/stop", dev.OnStop) + + srv.InputHandle("/i64", dev.Input) + + err := srv.Run(context.Background()) + return err + }) + } + + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + loop: + for { + select { + case <-timeout.C: + t.Fatalf("devices did not connect") + default: + n := rc.NumClients() + if n == 4 { + break loop + } + } } - defer resp.Body.Close() - if tt.name == "quit" { - return + func() { + req, err := http.NewRequest(http.MethodGet, tsrv.URL+"/", nil) + if err != nil { + t.Fatalf("could not create http request for %q: %+v", "/", err) + } + resp, err := cli.Do(req) + if err != nil { + t.Fatalf("could not get /: %+v", err) + } + defer resp.Body.Close() + }() + + ws, err := newTestWS(tsrv) + if err != nil { + t.Fatalf("could not create test websocket: %+v", err) } + defer ws.Close() status, err := ws.readStatus() if err != nil { - t.Fatalf("could not get /status after /%s: %+v", tt.name, err) + t.Fatalf("could not get /status: %+v", err) + } + if got, want := status, fsm.UnConf.String(); got != want { + t.Fatalf("invalid status: got=%q, want=%q", got, want) } - if got, want := status, tt.want; got != want { - t.Fatalf("invalid status after /%s: got=%q, want=%q", tt.name, got, want) + func() { + // test invalid command + cmd := "invalid-command" + req, err := tcli.cmd(cmd) + if err != nil { + t.Fatalf("could not prepare invalid command /%s: %+v", cmd, err) + } + resp, err := cli.Do(req) + if err != nil { + t.Fatalf("could not send invalid command /%s: %+v", cmd, err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("invalid status code for invalid command: %v", resp.StatusCode) + } + }() + + for _, tt := range []struct { + name string + want string + }{ + {name: "config", want: fsm.Conf.String()}, + {name: "init", want: fsm.Init.String()}, + {name: "reset", want: fsm.UnConf.String()}, + {name: "config", want: fsm.Conf.String()}, + {name: "init", want: fsm.Init.String()}, + {name: "start", want: fsm.Running.String()}, + {name: "stop", want: fsm.Stopped.String()}, + {name: "status", want: fsm.Stopped.String()}, + {name: "start", want: fsm.Running.String()}, + {name: "stop", want: fsm.Stopped.String()}, + {name: "quit", want: fsm.Exiting.String()}, + } { + req, err := tcli.cmd(tt.name) + if err != nil { + t.Fatalf("could not prepare cmd /%s: %+v", tt.name, err) + } + func() { + var resp *http.Response + resp, err = cli.Do(req) + if err != nil { + t.Fatalf("could not send command /%s: %+v", tt.name, err) + } + defer resp.Body.Close() + + if tt.name == "quit" { + return + } + + status, err := ws.readStatus() + if err != nil { + t.Fatalf("could not get /status after /%s: %+v", tt.name, err) + } + + if got, want := status, tt.want; got != want { + t.Fatalf("invalid status after /%s: got=%q, want=%q", tt.name, got, want) + } + }() } - }() - } - err = grp.Wait() - if err != nil { - t.Fatalf("could not run device run-group: %+v", err) - } + err = grp.Wait() + if err != nil { + t.Fatalf("could not run device run-group: %+v", err) + } - err = <-errc - if err != nil && !xerrors.Is(err, context.Canceled) { - t.Fatalf("error shutting down run-ctl: %+v", err) + err = <-errc + if err != nil && !xerrors.Is(err, context.Canceled) { + t.Fatalf("error shutting down run-ctl: %+v", err) + } + }) } } diff --git a/xdaq/xdaq_test.go b/xdaq/xdaq_test.go index 2246a79..8968338 100644 --- a/xdaq/xdaq_test.go +++ b/xdaq/xdaq_test.go @@ -7,6 +7,8 @@ package xdaq_test // import "github.com/go-daq/tdaq/xdaq" import ( "bytes" "context" + "io/ioutil" + "os" "testing" "time" @@ -32,145 +34,169 @@ func TestSequence(t *testing.T) { proclvl = log.LvlDebug ) - port, err := tcputil.GetTCPPort() + tmpdir, err := ioutil.TempDir("", "go-tdaq-") if err != nil { - t.Fatalf("could not find a tcp port for run-ctl: %+v", err) + t.Fatal(err) } + defer os.RemoveAll(tmpdir) - rcAddr := ":" + port - - port, err = tcputil.GetTCPPort() - if err != nil { - t.Fatalf("could not find a tcp port for run-ctl web server: %+v", err) - } - webAddr := ":" + port - - stdout := new(bytes.Buffer) - app := job.New("tcp", stdout) - defer func() { - if err != nil { - t.Logf("stdout:\n%v\n", stdout.String()) - } - }() - - app.Cfg.RunCtl = rcAddr - app.Cfg.Web = webAddr - app.Cfg.Level = rclvl - - var ( - proc1 = pstate{name: "proc-1"} - proc2 = pstate{name: "proc-2"} - proc3 = pstate{name: "proc-3.1"} - proc4 = pstate{name: "proc-3.2"} - ) - - app.Add( - func() job.Proc { - dev := new(xdaq.I64Gen) - proc1.v = &dev.N - return job.Proc{ - Dev: dev, - Name: proc1.name, - Level: proclvl, - Outputs: job.OutputHandlers{ - "/i64-1": dev.Output, - }, - Handlers: job.RunHandlers{dev.Loop}, - } - }(), - func() job.Proc { - dev := new(xdaq.I64Processor) - proc2.v = &dev.V - return job.Proc{ - Dev: dev, - Name: proc2.name, - Level: proclvl, - Inputs: job.InputHandlers{ - "/i64-1": dev.Input, - }, - Outputs: job.OutputHandlers{ - "/i64-2": dev.Output, - }, - } - }(), - func() job.Proc { - dev := new(xdaq.I64Dumper) - proc3.v = &dev.V - return job.Proc{ - Dev: dev, - Name: proc3.name, - Level: proclvl, - Inputs: job.InputHandlers{ - "/i64-2": dev.Input, - }, + for _, tc := range []struct { + network string + port func(string) (string, error) + }{ + { + network: "tcp", + port: func(string) (string, error) { + p, err := tcputil.GetTCPPort() + return ":" + p, err + }, + }, + // { // FIXME(sbinet) + // network: "unix", + // port: func(n string) (string, error) { + // return filepath.Join(tmpdir, n), nil + // }, + // }, + } { + t.Run(tc.network, func(t *testing.T) { + rcAddr, err := tc.port("runctl") + if err != nil { + t.Fatalf("could not find a port for run-ctl: %+v", err) } - }(), - func() job.Proc { - dev := new(xdaq.I64Dumper) - proc4.v = &dev.V - return job.Proc{ - Dev: dev, - Name: proc4.name, - Level: proclvl, - Inputs: job.InputHandlers{ - "/i64-2": dev.Input, - }, + webAddr, err := tc.port("web") + if err != nil { + t.Fatalf("could not find a port for run-ctl web server: %+v", err) } - }(), - ) + stdout := new(bytes.Buffer) + app := job.New(tc.network, stdout) + defer func() { + if err != nil { + t.Logf("stdout:\n%v\n", stdout.String()) + } + }() + + app.Cfg.RunCtl = rcAddr + app.Cfg.Web = webAddr + app.Cfg.Level = rclvl + + var ( + proc1 = pstate{name: "proc-1"} + proc2 = pstate{name: "proc-2"} + proc3 = pstate{name: "proc-3.1"} + proc4 = pstate{name: "proc-3.2"} + ) + + app.Add( + func() job.Proc { + dev := new(xdaq.I64Gen) + proc1.v = &dev.N + return job.Proc{ + Dev: dev, + Name: proc1.name, + Level: proclvl, + Outputs: job.OutputHandlers{ + "/i64-1": dev.Output, + }, + Handlers: job.RunHandlers{dev.Loop}, + } + }(), + func() job.Proc { + dev := new(xdaq.I64Processor) + proc2.v = &dev.V + return job.Proc{ + Dev: dev, + Name: proc2.name, + Level: proclvl, + Inputs: job.InputHandlers{ + "/i64-1": dev.Input, + }, + Outputs: job.OutputHandlers{ + "/i64-2": dev.Output, + }, + } + }(), + func() job.Proc { + dev := new(xdaq.I64Dumper) + proc3.v = &dev.V + return job.Proc{ + Dev: dev, + Name: proc3.name, + Level: proclvl, + Inputs: job.InputHandlers{ + "/i64-2": dev.Input, + }, + } - err = app.Start() - if err != nil { - t.Fatalf("could not start tdaq app: %+v", err) - } + }(), + func() job.Proc { + dev := new(xdaq.I64Dumper) + proc4.v = &dev.V + return job.Proc{ + Dev: dev, + Name: proc4.name, + Level: proclvl, + Inputs: job.InputHandlers{ + "/i64-2": dev.Input, + }, + } - for _, tt := range []struct { - name string - cmd tdaq.CmdType - }{ - {"/config", tdaq.CmdConfig}, - {"/init", tdaq.CmdInit}, - {"/reset", tdaq.CmdReset}, - {"/config", tdaq.CmdConfig}, - {"/init", tdaq.CmdInit}, - {"/start", tdaq.CmdStart}, - {"/stop", tdaq.CmdStop}, - {"/status", tdaq.CmdStatus}, - {"/start", tdaq.CmdStart}, - {"/stop", tdaq.CmdStop}, - {"/quit", tdaq.CmdQuit}, - } { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - func() { - defer cancel() - err = app.Do(ctx, tt.cmd) + }(), + ) + + err = app.Start() if err != nil { - t.Fatalf("could not send command %v: %+v", tt.cmd, err) - } - if tt.name == "/start" { - time.Sleep(100 * time.Millisecond) - } - if tt.name == "/stop" { - switch { - case *proc1.v-1 != *proc2.v: - err = xerrors.Errorf("stage-1 error") - t.Fatalf("stage-1 error: %q:%v, %q:%v", proc1.name, *proc1.v, proc2.name, *proc2.v) - case *proc2.v*2 != *proc3.v: - err = xerrors.Errorf("stage-2 error") - t.Fatalf("stage-2 error: %q:%v, %q:%v", proc2.name, *proc2.v, proc3.name, *proc3.v) - case *proc3.v != *proc4.v: - err = xerrors.Errorf("stage-3 error") - t.Fatalf("stage-3 error: %q:%v, %q:%v", proc3.name, *proc3.v, proc4.name, *proc4.v) - } + t.Fatalf("could not start tdaq app: %+v", err) + } + + for _, tt := range []struct { + name string + cmd tdaq.CmdType + }{ + {"/config", tdaq.CmdConfig}, + {"/init", tdaq.CmdInit}, + {"/reset", tdaq.CmdReset}, + {"/config", tdaq.CmdConfig}, + {"/init", tdaq.CmdInit}, + {"/start", tdaq.CmdStart}, + {"/stop", tdaq.CmdStop}, + {"/status", tdaq.CmdStatus}, + {"/start", tdaq.CmdStart}, + {"/stop", tdaq.CmdStop}, + {"/quit", tdaq.CmdQuit}, + } { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + func() { + defer cancel() + err = app.Do(ctx, tt.cmd) + if err != nil { + t.Fatalf("could not send command %v: %+v", tt.cmd, err) + } + if tt.name == "/start" { + time.Sleep(100 * time.Millisecond) + } + if tt.name == "/stop" { + switch { + case *proc1.v-1 != *proc2.v: + err = xerrors.Errorf("stage-1 error") + t.Fatalf("stage-1 error: %q:%v, %q:%v", proc1.name, *proc1.v, proc2.name, *proc2.v) + case *proc2.v*2 != *proc3.v: + err = xerrors.Errorf("stage-2 error") + t.Fatalf("stage-2 error: %q:%v, %q:%v", proc2.name, *proc2.v, proc3.name, *proc3.v) + case *proc3.v != *proc4.v: + err = xerrors.Errorf("stage-3 error") + t.Fatalf("stage-3 error: %q:%v, %q:%v", proc3.name, *proc3.v, proc4.name, *proc4.v) + } + } + }() } - }() - } - err = app.Wait() - if err != nil { - t.Fatalf("error shutting down tdaq app: %+v", err) + err = app.Wait() + if err != nil { + t.Fatalf("error shutting down tdaq app: %+v", err) + } + }) } } @@ -182,141 +208,166 @@ func TestAdder(t *testing.T) { proclvl = log.LvlDebug ) - port, err := tcputil.GetTCPPort() - if err != nil { - t.Fatalf("could not find a tcp port for run-ctl: %+v", err) - } - - rcAddr := ":" + port - - port, err = tcputil.GetTCPPort() - if err != nil { - t.Fatalf("could not find a tcp port for run-ctl web server: %+v", err) - } - webAddr := ":" + port - - stdout := new(bytes.Buffer) - app := job.New("tcp", stdout) - defer func() { - if err != nil { - t.Logf("stdout:\n%v\n", stdout.String()) - } - }() - - app.Cfg.RunCtl = rcAddr - app.Cfg.Web = webAddr - app.Cfg.Level = rclvl - - var ( - proc1 = pstate{name: "gen-1"} - proc2 = pstate{name: "gen-2"} - proc3 = pstate{name: "adder"} - proc4 = pstate{name: "dumper"} - ) - - app.Add( - func() job.Proc { - dev := new(xdaq.I64Gen) - proc1.v = &dev.N - return job.Proc{ - Dev: dev, - Name: proc1.name, - Level: proclvl, - Outputs: job.OutputHandlers{ - "/i64-1": dev.Output, - }, - Handlers: job.RunHandlers{dev.Loop}, - } - }(), - func() job.Proc { - dev := new(xdaq.I64Gen) - proc2.v = &dev.N - return job.Proc{ - Dev: dev, - Name: proc2.name, - Level: proclvl, - Outputs: job.OutputHandlers{ - "/i64-2": dev.Output, - }, - Handlers: job.RunHandlers{dev.Loop}, - } - }(), - func() job.Proc { - dev := new(xdaq.I64Adder) - proc3.n = &dev.N - proc3.v = &dev.V - return job.Proc{ - Dev: dev, - Name: proc3.name, - Level: proclvl, - Inputs: job.InputHandlers{ - "/i64-1": dev.Left, - "/i64-2": dev.Right, - }, - Outputs: job.OutputHandlers{ - "/sum": dev.Output, - }, - } - }(), - func() job.Proc { - dev := new(xdaq.I64Dumper) - proc4.v = &dev.V - return job.Proc{ - Dev: dev, - Name: proc4.name, - Level: proclvl, - Inputs: job.InputHandlers{ - "/sum": dev.Input, - }, - } - }(), - ) - - err = app.Start() + tmpdir, err := ioutil.TempDir("", "go-tdaq-") if err != nil { - t.Fatalf("could not start tdaq app: %+v", err) + t.Fatal(err) } + defer os.RemoveAll(tmpdir) - for _, tt := range []struct { - name string - cmd tdaq.CmdType + for _, tc := range []struct { + network string + port func(string) (string, error) }{ - {"/config", tdaq.CmdConfig}, - {"/init", tdaq.CmdInit}, - {"/reset", tdaq.CmdReset}, - {"/config", tdaq.CmdConfig}, - {"/init", tdaq.CmdInit}, - {"/start", tdaq.CmdStart}, - {"/status", tdaq.CmdStatus}, - {"/stop", tdaq.CmdStop}, - {"/quit", tdaq.CmdQuit}, + { + network: "tcp", + port: func(string) (string, error) { + p, err := tcputil.GetTCPPort() + return ":" + p, err + }, + }, + // { // FIXME(sbinet) + // network: "unix", + // port: func(n string) (string, error) { + // return filepath.Join(tmpdir, n), nil + // }, + // }, } { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - func() { - defer cancel() - err = app.Do(ctx, tt.cmd) + t.Run(tc.network, func(t *testing.T) { + + rcAddr, err := tc.port("runctl") if err != nil { - t.Fatalf("could not send command %v: %+v", tt.cmd, err) + t.Fatalf("could not find a port for run-ctl: %+v", err) } - if tt.name == "/start" { - time.Sleep(100 * time.Millisecond) + + webAddr, err := tc.port("web") + if err != nil { + t.Fatalf("could not find a port for run-ctl web server: %+v", err) } - if tt.name == "/stop" { - switch { - case *proc3.v != 2*(*proc3.n-1): - err = xerrors.Errorf("stage-2 error") - t.Fatalf("stage-2 error: %q:%v, %q:%v", proc3.name+"-sum", *proc3.v, proc3.name+"-n", *proc3.n) - case *proc3.v != *proc4.v: - err = xerrors.Errorf("stage-3 error") - t.Fatalf("stage-3 error: %q:%v, %q:%v", proc3.name, *proc3.v, proc4.name, *proc4.v) + + stdout := new(bytes.Buffer) + app := job.New(tc.network, stdout) + defer func() { + if err != nil { + t.Logf("stdout:\n%v\n", stdout.String()) } + }() + + app.Cfg.RunCtl = rcAddr + app.Cfg.Web = webAddr + app.Cfg.Level = rclvl + + var ( + proc1 = pstate{name: "gen-1"} + proc2 = pstate{name: "gen-2"} + proc3 = pstate{name: "adder"} + proc4 = pstate{name: "dumper"} + ) + + app.Add( + func() job.Proc { + dev := new(xdaq.I64Gen) + proc1.v = &dev.N + return job.Proc{ + Dev: dev, + Name: proc1.name, + Level: proclvl, + Outputs: job.OutputHandlers{ + "/i64-1": dev.Output, + }, + Handlers: job.RunHandlers{dev.Loop}, + } + }(), + func() job.Proc { + dev := new(xdaq.I64Gen) + proc2.v = &dev.N + return job.Proc{ + Dev: dev, + Name: proc2.name, + Level: proclvl, + Outputs: job.OutputHandlers{ + "/i64-2": dev.Output, + }, + Handlers: job.RunHandlers{dev.Loop}, + } + }(), + func() job.Proc { + dev := new(xdaq.I64Adder) + proc3.n = &dev.N + proc3.v = &dev.V + return job.Proc{ + Dev: dev, + Name: proc3.name, + Level: proclvl, + Inputs: job.InputHandlers{ + "/i64-1": dev.Left, + "/i64-2": dev.Right, + }, + Outputs: job.OutputHandlers{ + "/sum": dev.Output, + }, + } + }(), + func() job.Proc { + dev := new(xdaq.I64Dumper) + proc4.v = &dev.V + return job.Proc{ + Dev: dev, + Name: proc4.name, + Level: proclvl, + Inputs: job.InputHandlers{ + "/sum": dev.Input, + }, + } + }(), + ) + + err = app.Start() + if err != nil { + t.Fatalf("could not start tdaq app: %+v", err) + } + + for _, tt := range []struct { + name string + cmd tdaq.CmdType + }{ + {"/config", tdaq.CmdConfig}, + {"/init", tdaq.CmdInit}, + {"/reset", tdaq.CmdReset}, + {"/config", tdaq.CmdConfig}, + {"/init", tdaq.CmdInit}, + {"/start", tdaq.CmdStart}, + {"/status", tdaq.CmdStatus}, + {"/stop", tdaq.CmdStop}, + {"/quit", tdaq.CmdQuit}, + } { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + func() { + defer cancel() + err = app.Do(ctx, tt.cmd) + if err != nil { + t.Fatalf("could not send command %v: %+v", tt.cmd, err) + } + if tt.name == "/start" { + time.Sleep(100 * time.Millisecond) + } + if tt.name == "/stop" { + switch { + case *proc3.v != 2*(*proc3.n-1): + err = xerrors.Errorf("stage-2 error") + t.Fatalf("stage-2 error: %q:%v, %q:%v", proc3.name+"-sum", *proc3.v, proc3.name+"-n", *proc3.n) + case *proc3.v != *proc4.v: + err = xerrors.Errorf("stage-3 error") + t.Fatalf("stage-3 error: %q:%v, %q:%v", proc3.name, *proc3.v, proc4.name, *proc4.v) + } + } + }() } - }() - } - err = app.Wait() - if err != nil { - t.Fatalf("error shutting down tdaq app: %+v", err) + err = app.Wait() + if err != nil { + t.Fatalf("error shutting down tdaq app: %+v", err) + } + }) } } @@ -328,172 +379,197 @@ func TestScaler(t *testing.T) { proclvl = log.LvlDebug ) - port, err := tcputil.GetTCPPort() + tmpdir, err := ioutil.TempDir("", "go-tdaq-") if err != nil { - t.Fatalf("could not find a tcp port for run-ctl: %+v", err) + t.Fatal(err) } + defer os.RemoveAll(tmpdir) - rcAddr := ":" + port - - port, err = tcputil.GetTCPPort() - if err != nil { - t.Fatalf("could not find a tcp port for run-ctl web server: %+v", err) - } - webAddr := ":" + port - - stdout := new(bytes.Buffer) - app := job.New("tcp", stdout) - defer func() { - if err != nil { - t.Logf("stdout:\n%v\n", stdout.String()) - } - }() - - app.Cfg.RunCtl = rcAddr - app.Cfg.Web = webAddr - app.Cfg.Level = rclvl - - var ( - proc1 = pstate{name: "proc-1"} - proc2 = pstate{name: "proc-2"} - proc3 = pstate{name: "proc-3"} - proc4 = pstate{name: "proc-4"} - proc5 = pstate{name: "proc-5"} - ) + for _, tc := range []struct { + network string + port func(string) (string, error) + }{ + { + network: "tcp", + port: func(string) (string, error) { + p, err := tcputil.GetTCPPort() + return ":" + p, err + }, + }, + // { + // network: "unix", + // port: func(n string) (string, error) { + // return filepath.Join(tmpdir, n), nil + // }, + // }, + } { + t.Run(tc.network, func(t *testing.T) { - app.Add( - func() job.Proc { - dev := new(xdaq.I64Gen) - proc1.n = &dev.N - return job.Proc{ - Dev: dev, - Name: proc1.name, - Level: proclvl, - Outputs: job.OutputHandlers{ - "/i64-1": dev.Output, - }, - Handlers: job.RunHandlers{dev.Loop}, + rcAddr, err := tc.port("runctl") + if err != nil { + t.Fatalf("could not find a port for run-ctl: %+v", err) } - }(), - func() job.Proc { - var n int64 - dev := &xdaq.Scaler{ - Accept: func() bool { - n++ - return n%2 == 0 - }, + + webAddr, err := tc.port("web") + if err != nil { + t.Fatalf("could not find a port for run-ctl web server: %+v", err) } - proc2.n = &n - return job.Proc{ - Dev: dev, - Name: proc2.name, - Level: proclvl, - Cmds: job.CmdHandlers{ - "/reset": func(ctx tdaq.Context, resp *tdaq.Frame, req tdaq.Frame) error { - n = 0 - dev.Accept = func() bool { + + stdout := new(bytes.Buffer) + app := job.New(tc.network, stdout) + defer func() { + if err != nil { + t.Logf("stdout:\n%v\n", stdout.String()) + } + }() + + app.Cfg.RunCtl = rcAddr + app.Cfg.Web = webAddr + app.Cfg.Level = rclvl + + var ( + proc1 = pstate{name: "proc-1"} + proc2 = pstate{name: "proc-2"} + proc3 = pstate{name: "proc-3"} + proc4 = pstate{name: "proc-4"} + proc5 = pstate{name: "proc-5"} + ) + + app.Add( + func() job.Proc { + dev := new(xdaq.I64Gen) + proc1.n = &dev.N + return job.Proc{ + Dev: dev, + Name: proc1.name, + Level: proclvl, + Outputs: job.OutputHandlers{ + "/i64-1": dev.Output, + }, + Handlers: job.RunHandlers{dev.Loop}, + } + }(), + func() job.Proc { + var n int64 + dev := &xdaq.Scaler{ + Accept: func() bool { n++ return n%2 == 0 + }, + } + proc2.n = &n + return job.Proc{ + Dev: dev, + Name: proc2.name, + Level: proclvl, + Cmds: job.CmdHandlers{ + "/reset": func(ctx tdaq.Context, resp *tdaq.Frame, req tdaq.Frame) error { + n = 0 + dev.Accept = func() bool { + n++ + return n%2 == 0 + } + return dev.OnReset(ctx, resp, req) + }, + }, + Inputs: job.InputHandlers{ + "/i64-1": dev.Input, + }, + Outputs: job.OutputHandlers{ + "/i64-2": dev.Output, + }, + } + }(), + func() job.Proc { + dev := new(xdaq.I64Dumper) + proc3.n = &dev.N + return job.Proc{ + Dev: dev, + Name: proc3.name, + Level: proclvl, + Inputs: job.InputHandlers{ + "/i64-2": dev.Input, + }, + } + }(), + func() job.Proc { + dev := new(xdaq.Scaler) + return job.Proc{ + Dev: dev, + Name: proc4.name, + Level: proclvl, + Inputs: job.InputHandlers{ + "/i64-1": dev.Input, + }, + Outputs: job.OutputHandlers{ + "/i64-3": dev.Output, + }, + } + }(), + func() job.Proc { + dev := new(xdaq.I64Dumper) + proc5.n = &dev.N + return job.Proc{ + Dev: dev, + Name: proc5.name, + Level: proclvl, + Inputs: job.InputHandlers{ + "/i64-3": dev.Input, + }, + } + }(), + ) + + err = app.Start() + if err != nil { + t.Fatalf("could not start tdaq app: %+v", err) + } + + for _, tt := range []struct { + name string + cmd tdaq.CmdType + }{ + {"/config", tdaq.CmdConfig}, + {"/init", tdaq.CmdInit}, + {"/reset", tdaq.CmdReset}, + {"/config", tdaq.CmdConfig}, + {"/init", tdaq.CmdInit}, + {"/start", tdaq.CmdStart}, + {"/status", tdaq.CmdStatus}, + {"/stop", tdaq.CmdStop}, + {"/quit", tdaq.CmdQuit}, + } { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + func() { + defer cancel() + err = app.Do(ctx, tt.cmd) + if err != nil { + t.Fatalf("could not send command %v: %+v", tt.cmd, err) + } + if tt.name == "/start" { + time.Sleep(100 * time.Millisecond) + } + if tt.name == "/stop" { + switch { + case *proc1.n != *proc2.n: + err = xerrors.Errorf("stage-1 error") + t.Fatalf("stage-1: error: %q:%v, %q:%v", proc1.name, *proc1.n, proc2.name, *proc2.n) + case int64(float64(*proc2.n)*0.5) != int64(float64(*proc3.n)): + err = xerrors.Errorf("stage-2 error") + t.Fatalf("stage-2: error: %q:%v, %q:%v", proc2.name, *proc2.n, proc3.name, *proc3.n) + case *proc5.n != 0: + err = xerrors.Errorf("stage-3 error") + t.Fatalf("stage-3: error: %q:%v, %q:%v", proc1.name, *proc1.n, proc5.name, *proc5.n) } - return dev.OnReset(ctx, resp, req) - }, - }, - Inputs: job.InputHandlers{ - "/i64-1": dev.Input, - }, - Outputs: job.OutputHandlers{ - "/i64-2": dev.Output, - }, - } - }(), - func() job.Proc { - dev := new(xdaq.I64Dumper) - proc3.n = &dev.N - return job.Proc{ - Dev: dev, - Name: proc3.name, - Level: proclvl, - Inputs: job.InputHandlers{ - "/i64-2": dev.Input, - }, - } - }(), - func() job.Proc { - dev := new(xdaq.Scaler) - return job.Proc{ - Dev: dev, - Name: proc4.name, - Level: proclvl, - Inputs: job.InputHandlers{ - "/i64-1": dev.Input, - }, - Outputs: job.OutputHandlers{ - "/i64-3": dev.Output, - }, - } - }(), - func() job.Proc { - dev := new(xdaq.I64Dumper) - proc5.n = &dev.N - return job.Proc{ - Dev: dev, - Name: proc5.name, - Level: proclvl, - Inputs: job.InputHandlers{ - "/i64-3": dev.Input, - }, + } + }() } - }(), - ) - err = app.Start() - if err != nil { - t.Fatalf("could not start tdaq app: %+v", err) - } - - for _, tt := range []struct { - name string - cmd tdaq.CmdType - }{ - {"/config", tdaq.CmdConfig}, - {"/init", tdaq.CmdInit}, - {"/reset", tdaq.CmdReset}, - {"/config", tdaq.CmdConfig}, - {"/init", tdaq.CmdInit}, - {"/start", tdaq.CmdStart}, - {"/status", tdaq.CmdStatus}, - {"/stop", tdaq.CmdStop}, - {"/quit", tdaq.CmdQuit}, - } { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - func() { - defer cancel() - err = app.Do(ctx, tt.cmd) + err = app.Wait() if err != nil { - t.Fatalf("could not send command %v: %+v", tt.cmd, err) - } - if tt.name == "/start" { - time.Sleep(100 * time.Millisecond) - } - if tt.name == "/stop" { - switch { - case *proc1.n != *proc2.n: - err = xerrors.Errorf("stage-1 error") - t.Fatalf("stage-1: error: %q:%v, %q:%v", proc1.name, *proc1.n, proc2.name, *proc2.n) - case int64(float64(*proc2.n)*0.5) != int64(float64(*proc3.n)): - err = xerrors.Errorf("stage-2 error") - t.Fatalf("stage-2: error: %q:%v, %q:%v", proc2.name, *proc2.n, proc3.name, *proc3.n) - case *proc5.n != 0: - err = xerrors.Errorf("stage-3 error") - t.Fatalf("stage-3: error: %q:%v, %q:%v", proc1.name, *proc1.n, proc5.name, *proc5.n) - } + t.Fatalf("error shutting down tdaq app: %+v", err) } - }() - } - - err = app.Wait() - if err != nil { - t.Fatalf("error shutting down tdaq app: %+v", err) + }) } } @@ -505,77 +581,81 @@ func TestSplitter(t *testing.T) { proclvl = log.LvlDebug ) - port, err := tcputil.GetTCPPort() + tmpdir, err := ioutil.TempDir("", "go-tdaq-") if err != nil { - t.Fatalf("could not find a tcp port for run-ctl: %+v", err) + t.Fatal(err) } + defer os.RemoveAll(tmpdir) - rcAddr := ":" + port - - port, err = tcputil.GetTCPPort() - if err != nil { - t.Fatalf("could not find a tcp port for run-ctl web server: %+v", err) - } - webAddr := ":" + port - - stdout := new(bytes.Buffer) - app := job.New("tcp", stdout) - defer func() { - if err != nil { - t.Logf("stdout:\n%v\n", stdout.String()) - } - }() - - app.Cfg.RunCtl = rcAddr - app.Cfg.Web = webAddr - app.Cfg.Level = rclvl - - var ( - proc1 = pstate{name: "proc-1"} - proc2 = pstate{name: "proc-2"} - proc3 = pstate{name: "proc-3"} - proc4 = pstate{name: "proc-4"} - proc5 = pstate{name: "proc-5"} - proc6 = pstate{name: "proc-6"} - proc7 = pstate{name: "proc-7"} - ) - - app.Add( - func() job.Proc { - dev := new(xdaq.I64Gen) - proc1.n = &dev.N - return job.Proc{ - Dev: dev, - Name: proc1.name, - Level: proclvl, - Outputs: job.OutputHandlers{ - "/i64": dev.Output, - }, - Handlers: job.RunHandlers{dev.Loop}, + for _, tc := range []struct { + network string + port func(string) (string, error) + }{ + { + network: "tcp", + port: func(string) (string, error) { + p, err := tcputil.GetTCPPort() + return ":" + p, err + }, + }, + // { // FIXME(sbinet) + // network: "unix", + // port: func(n string) (string, error) { + // return filepath.Join(tmpdir, n), nil + // }, + // }, + } { + t.Run(tc.network, func(t *testing.T) { + rcAddr, err := tc.port("runctl") + if err != nil { + t.Fatalf("could not find a port for run-ctl: %+v", err) } - }(), - func() job.Proc { - var n int64 - dev := &xdaq.Splitter{ - Fct: func() int { - n++ - switch { - case n%2 == 0: - return -1 - default: - return +1 - } - }, + + webAddr, err := tc.port("web") + if err != nil { + t.Fatalf("could not find a port for run-ctl web server: %+v", err) } - proc2.n = &n - return job.Proc{ - Dev: dev, - Name: proc2.name, - Level: proclvl, - Cmds: job.CmdHandlers{ - "/reset": func(ctx tdaq.Context, resp *tdaq.Frame, req tdaq.Frame) error { - n = 0 - dev.Fct = func() int { + + stdout := new(bytes.Buffer) + app := job.New(tc.network, stdout) + defer func() { + if err != nil { + t.Logf("stdout:\n%v\n", stdout.String()) + } + }() + + app.Cfg.RunCtl = rcAddr + app.Cfg.Web = webAddr + app.Cfg.Level = rclvl + + var ( + proc1 = pstate{name: "proc-1"} + proc2 = pstate{name: "proc-2"} + proc3 = pstate{name: "proc-3"} + proc4 = pstate{name: "proc-4"} + proc5 = pstate{name: "proc-5"} + proc6 = pstate{name: "proc-6"} + proc7 = pstate{name: "proc-7"} + ) + + app.Add( + func() job.Proc { + dev := new(xdaq.I64Gen) + proc1.n = &dev.N + return job.Proc{ + Dev: dev, + Name: proc1.name, + Level: proclvl, + Outputs: job.OutputHandlers{ + "/i64": dev.Output, + }, + Handlers: job.RunHandlers{dev.Loop}, + } + }(), + func() job.Proc { + var n int64 + dev := &xdaq.Splitter{ + Fct: func() int { n++ switch { case n%2 == 0: @@ -583,136 +663,156 @@ func TestSplitter(t *testing.T) { default: return +1 } + }, + } + proc2.n = &n + return job.Proc{ + Dev: dev, + Name: proc2.name, + Level: proclvl, + Cmds: job.CmdHandlers{ + "/reset": func(ctx tdaq.Context, resp *tdaq.Frame, req tdaq.Frame) error { + n = 0 + dev.Fct = func() int { + n++ + switch { + case n%2 == 0: + return -1 + default: + return +1 + } + } + return dev.OnReset(ctx, resp, req) + }, + }, + Inputs: job.InputHandlers{ + "/i64": dev.Input, + }, + Outputs: job.OutputHandlers{ + "/i64-1-left": dev.Left, + "/i64-1-right": dev.Right, + }, + } + }(), + func() job.Proc { + dev := new(xdaq.I64Dumper) + proc3.n = &dev.N + return job.Proc{ + Dev: dev, + Name: proc3.name, + Level: proclvl, + Inputs: job.InputHandlers{ + "/i64-1-left": dev.Input, + }, + } + }(), + func() job.Proc { + dev := new(xdaq.I64Dumper) + proc4.n = &dev.N + return job.Proc{ + Dev: dev, + Name: proc4.name, + Level: proclvl, + Inputs: job.InputHandlers{ + "/i64-1-right": dev.Input, + }, + } + }(), + func() job.Proc { + dev := &xdaq.Splitter{} + return job.Proc{ + Dev: dev, + Name: proc5.name, + Level: proclvl, + Inputs: job.InputHandlers{ + "/i64": dev.Input, + }, + Outputs: job.OutputHandlers{ + "/i64-2-left": dev.Left, + "/i64-2-right": dev.Right, + }, + } + }(), + func() job.Proc { + dev := new(xdaq.I64Dumper) + proc6.n = &dev.N + return job.Proc{ + Dev: dev, + Name: proc6.name, + Level: proclvl, + Inputs: job.InputHandlers{ + "/i64-2-left": dev.Input, + }, + } + }(), + func() job.Proc { + dev := new(xdaq.I64Dumper) + proc7.n = &dev.N + return job.Proc{ + Dev: dev, + Name: proc7.name, + Level: proclvl, + Inputs: job.InputHandlers{ + "/i64-2-right": dev.Input, + }, + } + }(), + ) + + err = app.Start() + if err != nil { + t.Fatalf("could not start tdaq app: %+v", err) + } + + for _, tt := range []struct { + name string + cmd tdaq.CmdType + }{ + {"/config", tdaq.CmdConfig}, + {"/init", tdaq.CmdInit}, + {"/reset", tdaq.CmdReset}, + {"/config", tdaq.CmdConfig}, + {"/init", tdaq.CmdInit}, + {"/start", tdaq.CmdStart}, + {"/stop", tdaq.CmdStop}, + {"/status", tdaq.CmdStatus}, + {"/start", tdaq.CmdStart}, + {"/stop", tdaq.CmdStop}, + {"/quit", tdaq.CmdQuit}, + } { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + func() { + defer cancel() + err = app.Do(ctx, tt.cmd) + if err != nil { + t.Fatalf("could not send command %v: %+v", tt.cmd, err) + } + if tt.name == "/start" { + time.Sleep(100 * time.Millisecond) + } + if tt.name == "/stop" { + switch { + case *proc1.n != *proc2.n: + err = xerrors.Errorf("stage-1 error") + t.Fatalf("stage-1: error: %q:%v, %q:%v", proc1.name, *proc1.n, proc2.name, *proc2.n) + case *proc2.n != *proc3.n+*proc4.n: + err = xerrors.Errorf("stage-2 error") + t.Fatalf("stage-2: error: %q:%v, %q:%v %q:%v", proc2.name, *proc2.n, proc3.name, *proc3.n, proc4.name, *proc4.n) + case *proc1.n != *proc6.n+*proc7.n: + err = xerrors.Errorf("stage-3 error") + t.Fatalf("stage-3: error: %q:%v, %q:%v %q:%v", proc1.name, *proc1.n, proc6.name, *proc6.n, proc7.name, *proc7.n) + case *proc7.n != 0: + err = xerrors.Errorf("stage-4 error") + t.Fatalf("stage-4: error: %q:%v, %q:%v", proc1.name, *proc1.n, proc7.name, *proc7.n) } - return dev.OnReset(ctx, resp, req) - }, - }, - Inputs: job.InputHandlers{ - "/i64": dev.Input, - }, - Outputs: job.OutputHandlers{ - "/i64-1-left": dev.Left, - "/i64-1-right": dev.Right, - }, - } - }(), - func() job.Proc { - dev := new(xdaq.I64Dumper) - proc3.n = &dev.N - return job.Proc{ - Dev: dev, - Name: proc3.name, - Level: proclvl, - Inputs: job.InputHandlers{ - "/i64-1-left": dev.Input, - }, - } - }(), - func() job.Proc { - dev := new(xdaq.I64Dumper) - proc4.n = &dev.N - return job.Proc{ - Dev: dev, - Name: proc4.name, - Level: proclvl, - Inputs: job.InputHandlers{ - "/i64-1-right": dev.Input, - }, - } - }(), - func() job.Proc { - dev := &xdaq.Splitter{} - return job.Proc{ - Dev: dev, - Name: proc5.name, - Level: proclvl, - Inputs: job.InputHandlers{ - "/i64": dev.Input, - }, - Outputs: job.OutputHandlers{ - "/i64-2-left": dev.Left, - "/i64-2-right": dev.Right, - }, - } - }(), - func() job.Proc { - dev := new(xdaq.I64Dumper) - proc6.n = &dev.N - return job.Proc{ - Dev: dev, - Name: proc6.name, - Level: proclvl, - Inputs: job.InputHandlers{ - "/i64-2-left": dev.Input, - }, - } - }(), - func() job.Proc { - dev := new(xdaq.I64Dumper) - proc7.n = &dev.N - return job.Proc{ - Dev: dev, - Name: proc7.name, - Level: proclvl, - Inputs: job.InputHandlers{ - "/i64-2-right": dev.Input, - }, + } + }() } - }(), - ) - - err = app.Start() - if err != nil { - t.Fatalf("could not start tdaq app: %+v", err) - } - for _, tt := range []struct { - name string - cmd tdaq.CmdType - }{ - {"/config", tdaq.CmdConfig}, - {"/init", tdaq.CmdInit}, - {"/reset", tdaq.CmdReset}, - {"/config", tdaq.CmdConfig}, - {"/init", tdaq.CmdInit}, - {"/start", tdaq.CmdStart}, - {"/stop", tdaq.CmdStop}, - {"/status", tdaq.CmdStatus}, - {"/start", tdaq.CmdStart}, - {"/stop", tdaq.CmdStop}, - {"/quit", tdaq.CmdQuit}, - } { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - func() { - defer cancel() - err = app.Do(ctx, tt.cmd) + err = app.Wait() if err != nil { - t.Fatalf("could not send command %v: %+v", tt.cmd, err) + t.Fatalf("error shutting down tdaq app: %+v", err) } - if tt.name == "/start" { - time.Sleep(100 * time.Millisecond) - } - if tt.name == "/stop" { - switch { - case *proc1.n != *proc2.n: - err = xerrors.Errorf("stage-1 error") - t.Fatalf("stage-1: error: %q:%v, %q:%v", proc1.name, *proc1.n, proc2.name, *proc2.n) - case *proc2.n != *proc3.n+*proc4.n: - err = xerrors.Errorf("stage-2 error") - t.Fatalf("stage-2: error: %q:%v, %q:%v %q:%v", proc2.name, *proc2.n, proc3.name, *proc3.n, proc4.name, *proc4.n) - case *proc1.n != *proc6.n+*proc7.n: - err = xerrors.Errorf("stage-3 error") - t.Fatalf("stage-3: error: %q:%v, %q:%v %q:%v", proc1.name, *proc1.n, proc6.name, *proc6.n, proc7.name, *proc7.n) - case *proc7.n != 0: - err = xerrors.Errorf("stage-4 error") - t.Fatalf("stage-4: error: %q:%v, %q:%v", proc1.name, *proc1.n, proc7.name, *proc7.n) - } - } - }() - } - - err = app.Wait() - if err != nil { - t.Fatalf("error shutting down tdaq app: %+v", err) + }) } }