diff --git a/.circleci/config.yml b/.circleci/config.yml
index d9d10891..95b3ffaf 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -1,10 +1,14 @@
-version: 2
+version: 2.1
-jobs:
- build:
+executors:
+ go-executor:
docker:
- image: cimg/go:1.23
- working_directory: ~/task-tools
+ working_directory: ~/task-tools
+
+jobs:
+ test:
+ executor: go-executor
steps:
- checkout
- run: go install github.com/jstemmer/go-junit-report/v2@latest
@@ -23,4 +27,36 @@ jobs:
- store_artifacts:
path: ~/task-tools/junit
- store_artifacts:
- path: tests.out
\ No newline at end of file
+ path: tests.out
+
+ build:
+ executor: go-executor
+ steps:
+ - checkout
+ - setup_remote_docker:
+ docker_layer_caching: false
+ - run:
+ name: "docker login"
+ command: echo ${DOCKERHUB_TOKEN} | docker login -u ${DOCKERHUB_USERNAME} --password-stdin
+ - run:
+ name: "Push Docker Image"
+ command: make docker
+
+workflows:
+ version: 2
+ test_and_build:
+ jobs:
+ - test:
+ filters:
+ tags:
+ only:
+ - /.*/
+ - build:
+ requires:
+ - test
+ context:
+ - DOCKER
+ filters:
+ tags:
+ only:
+ - /.*/
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 6f6f2342..c5ce8a25 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,5 +18,7 @@ apps/utils/file-watcher/file-watcher
*/stats/stats
apps/workers/sql-load/sql-load
build
+tasks.db
+*_preview.html
coverage
diff --git a/README.md b/README.md
index b53d4f12..871beff8 100644
--- a/README.md
+++ b/README.md
@@ -88,9 +88,16 @@ func (tm *taskMaster) Run(ctx context.Context) error {
## Pre-built Apps
### **Flowlord**
-an all-purpose TaskMaster that should be used with workflow files to schedule when tasks should run and the task hierarchy. It can retry failed jobs, alert when tasks fail and has an API that can be used to backload/schedule jobs and give a recap of recent jobs run.
+Production-ready task orchestration engine for managing complex workflow dependencies with intelligent scheduling, automatic retries, and real-time monitoring. Features include:
-See Additional [docs](apps/flowlord/README.md).
+- **Workflow Management** - Multi-phase workflows with parent-child task dependencies
+- **Intelligent Scheduling** - Cron-based scheduling with template-based task generation
+- **Optional SQLite Cache** - Task history, alerts, and file tracking for troubleshooting (non-critical, stateless operation)
+- **Web Dashboard** - Real-time monitoring UI with filtering, pagination, and date navigation
+- **Batch Processing** - Generate multiple tasks from date ranges, metadata arrays, or data files
+- **RESTful API** - Comprehensive API for backloading, monitoring, and workflow management
+
+See detailed [documentation](apps/flowlord/README.md).
### Workers
diff --git a/apps/flowlord/README.md b/apps/flowlord/README.md
index a909c396..f4136ee1 100644
--- a/apps/flowlord/README.md
+++ b/apps/flowlord/README.md
@@ -1,11 +1,64 @@
# flowlord taskmaster
-flowlord schedules and coordinates task dependency across workflows. Flowlord reads tasks from the done topic, failed tasks can be configured to retry a set number of times before being sent to slack and/or a retry_failed topic. Successful tasks will start children tasks.
-
+
+
+**Flowlord** is a production-ready task orchestration engine that manages complex workflow dependencies with intelligent scheduling, automatic retries, and real-time monitoring. Built on the [task](https://github.com/pcelvng/task) ecosystem, it coordinates distributed workers through message bus communication while providing visibility into task execution through a web dashboard.
+**Key Features:**
+- **Workflow Management** - Define multi-phase workflows with parent-child task dependencies
+- **Intelligent Scheduling** - Cron-based scheduling with template-based task generation
+- **Automatic Retries** - Configurable retry logic with exponential backoff and jitter to prevent thundering herd
+- **File Watching** - Trigger tasks automatically when files are written to specified paths
+- **Batch Processing** - Generate multiple tasks from date ranges, metadata arrays, or data files
+- **Alerting** - Slack notifications for failed tasks and incomplete jobs with smart frequency management
+- **RESTful API** - Web UI and API for monitoring workflows, viewing task history, and managing alerts
[](https://github.com/pcelvng/task-tools/wiki/Flowlord-API)
+
+
+## Overview
+
+
+
+## Monitoring & Troubleshooting
+
+SQLite is used to store phases and provides troubleshooting convenience by recording task history. As flowlord is stateless task management system, the cache persistent is not required for flowlord to work, but it convenient to review historical tasks and alerts. It is recommended to backup the task and alert logs for long term storage.
+
+- **Task Records** - Full execution lifecycle with timing metrics
+- **Alert History** - Failed task tracking for debugging
+- **File Processing Audit** - Which files triggered which tasks
+- **Workflow State** - Phase configuration and dependencies
+
+The database is optional and non-critical. If deleted, Flowlord continues normally and rebuilds fresh data. Features:
+- Automatic backup/restore from remote paths (S3/GCS)
+- 90-day default retention with automatic cleanup
+- WAL mode for concurrent access
+
+**Configuration:**
+```toml
+[cache]
+ local_path = "./tasks.db" # required local cache
+ backup_path = "gs://bucket/tasks.db" # Optional backup location
+ retention = "2160h" # 90 days
+ task_ttl = "4h" # Alert deadline from task to complete (creation to complete)
+```
+
+## Web Dashboard
+
+Built-in web UI for monitoring workflows and troubleshooting. Uses Go templates to render HTML dashboards with:
+- Task execution history with filtering and pagination
+- Alert summaries grouped by task type
+- File processing history
+- Workflow phase visualization
+- System statistics
+
+Access at `http://localhost:8080/` (or configured port)
+
+| Files View | Tasks View | Alerts View | Workflow View |
+|:----------:|:----------:|:-----------:|:-------------:|
+| [](../../internal/docs/img/flowlord_files.png) | [](../../internal/docs/img/flowlord_tasks.png) | [](../../internal/docs/img/flowlord_alerts.png) | [](../../internal/docs/img/flowlord_workflow.png) |
+
## workflow
A workflow consists of one or more phases as a way to define of how a set of task is to be scheduled and run and the dependencies between them.
diff --git a/apps/flowlord/cache/cache.go b/apps/flowlord/cache/cache.go
deleted file mode 100644
index 39d2cc20..00000000
--- a/apps/flowlord/cache/cache.go
+++ /dev/null
@@ -1,146 +0,0 @@
-package cache
-
-import (
- "net/url"
- "strings"
- "sync"
- "time"
-
- "github.com/pcelvng/task"
- "github.com/pcelvng/task/bus"
-)
-
-type Cache interface {
- Add(task.Task)
- Get(id string) TaskJob
-
- // todo: listener for cache expiry?
-}
-
-func NewMemory(ttl time.Duration) *Memory {
- if ttl < time.Hour {
- ttl = time.Hour
- }
- return &Memory{
- ttl: ttl,
- cache: make(map[string]TaskJob),
- }
-
-}
-
-type Memory struct {
- ttl time.Duration
- cache map[string]TaskJob
- mu sync.RWMutex
-}
-
-// todo: name to describe info about completed tasks that are within the cache
-type TaskJob struct {
- LastUpdate time.Time // time since the last event with id
- Completed bool
- count int
- Events []task.Task
-}
-
-type Stat struct {
- Count int
- Removed int
- ProcessTime time.Duration
- Unfinished []task.Task
-}
-
-// Recycle iterates through the cache
-// clearing all tasks that have been completed within the cache window
-// it returns a list of tasks that have not been completed but have expired
-func (c *Memory) Recycle() Stat {
- tasks := make([]task.Task, 0)
- t := time.Now()
- total := len(c.cache)
- c.mu.Lock()
- for k, v := range c.cache {
- // remove expired items
- if t.Sub(v.LastUpdate) > c.ttl {
- if !v.Completed {
- tasks = append(tasks, v.Events[len(v.Events)-1])
- }
- delete(c.cache, k)
- }
- }
- c.mu.Unlock()
- return Stat{
- Count: len(c.cache),
- Removed: total - len(c.cache),
- ProcessTime: time.Since(t),
- Unfinished: tasks,
- }
-
-}
-
-// Add a task to the cache
-// the task must have an id to be added.
-func (c *Memory) Add(t task.Task) {
- if t.ID == "" || c == nil {
- return
- }
- c.mu.Lock()
- job := c.cache[t.ID]
- job.Events = append(job.Events, t)
- if t.Result != "" {
- job.Completed = true
- t, _ := time.Parse(time.RFC3339, t.Ended)
- job.LastUpdate = t
- } else {
- job.Completed = false
- t, _ := time.Parse(time.RFC3339, t.Created)
- job.LastUpdate = t
- }
-
- c.cache[t.ID] = job
- c.mu.Unlock()
-}
-
-func (c *Memory) Recap() map[string]*Stats {
- data := map[string]*Stats{}
- if c == nil {
- return data
- }
- c.mu.RLock()
- for _, v := range c.cache {
- for _, t := range v.Events {
- job := t.Job
- if job == "" {
- v, _ := url.ParseQuery(t.Meta)
- job = v.Get("job")
- }
- key := strings.TrimRight(t.Type+":"+job, ":")
- stat, found := data[key]
- if !found {
- stat = &Stats{
- CompletedTimes: make([]time.Time, 0),
- ErrorTimes: make([]time.Time, 0),
- ExecTimes: &DurationStats{},
- }
- data[key] = stat
- }
- stat.Add(t)
- }
- }
- c.mu.RUnlock()
- return data
-}
-
-// Get the TaskJob info with the given id.
-// If the id isn't found a _ is returned
-func (c *Memory) Get(id string) TaskJob {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.cache[id]
-}
-
-// SendFunc extends the given producers send function by adding any task sent to the cache.
-func (m *Memory) SendFunc(p bus.Producer) func(string, *task.Task) error {
- return func(topic string, tsk *task.Task) error {
- m.Add(*tsk)
- return p.Send(topic, tsk.JSONBytes())
- }
-}
diff --git a/apps/flowlord/cache/cache_test.go b/apps/flowlord/cache/cache_test.go
deleted file mode 100644
index 7c615123..00000000
--- a/apps/flowlord/cache/cache_test.go
+++ /dev/null
@@ -1,227 +0,0 @@
-package cache
-
-import (
- "testing"
- "time"
-
- "github.com/hydronica/trial"
- "github.com/pcelvng/task"
-)
-
-func TestAdd(t *testing.T) {
- fn := func(tasks []task.Task) (map[string]TaskJob, error) {
- cache := &Memory{cache: make(map[string]TaskJob)}
- for _, t := range tasks {
- cache.Add(t)
- }
- for k, v := range cache.cache {
- v.count = len(v.Events)
- v.Events = nil
- cache.cache[k] = v
- }
- return cache.cache, nil
- }
- cases := trial.Cases[[]task.Task, map[string]TaskJob]{
- "no id": {
- Input: []task.Task{
- {Type: "test"},
- },
- },
- "created": {
- Input: []task.Task{
- {Type: "pull", ID: "id1", Info: "?date=2023-01-01", Created: "2023-01-01T00:00:00Z"},
- },
- Expected: map[string]TaskJob{
- "id1": {
- LastUpdate: trial.TimeDay("2023-01-01"),
- count: 1,
- },
- },
- },
- "completed": {
- Input: []task.Task{
- {Type: "pull", ID: "id1", Info: "?date=2023-01-01", Created: "2023-01-01T00:00:00Z"},
- {Type: "pull", ID: "id1", Info: "?date=2023-01-01", Created: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:01Z", Result: task.CompleteResult},
- },
- Expected: map[string]TaskJob{
- "id1": {
- LastUpdate: trial.Time(time.RFC3339, "2023-01-01T00:00:01Z"),
- Completed: true,
- count: 2,
- },
- },
- },
- "failed": {
- Input: []task.Task{
- {Type: "pull", ID: "id1", Info: "?date=2023-01-01", Created: "2023-01-01T00:00:00Z"},
- {Type: "pull", ID: "id1", Info: "?date=2023-01-01", Created: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:01Z", Result: task.ErrResult, Msg: "Error with pull from X"},
- },
- Expected: map[string]TaskJob{
- "id1": {
- LastUpdate: trial.Time(time.RFC3339, "2023-01-01T00:00:01Z"),
- Completed: true,
- count: 2,
- },
- },
- },
- "retry": {
- Input: []task.Task{
- {Type: "pull", ID: "id1", Info: "?date=2023-01-01", Created: "2023-01-01T00:00:00Z"},
- {Type: "pull", ID: "id1", Info: "?date=2023-01-01", Created: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:01Z", Result: task.ErrResult, Msg: "Error with pull from X"},
- {Type: "pull", ID: "id1", Info: "?date=2023-01-01", Created: "2023-01-01T00:01:00Z", Meta: "retry=1"},
- },
- Expected: map[string]TaskJob{
- "id1": {
- LastUpdate: trial.Time(time.RFC3339, "2023-01-01T00:01:00Z"),
- Completed: false,
- count: 3,
- },
- },
- },
- "child": {
- Input: []task.Task{
- {Type: "pull", ID: "id1", Info: "?date=2023-01-01", Created: "2023-01-01T00:00:00Z"},
- {Type: "pull", ID: "id1", Info: "?date=2023-01-01", Created: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:01Z", Result: task.CompleteResult},
- {Type: "transform", ID: "id1", Info: "/product/2023-01-01/data.txt", Created: "2023-01-01T00:02:00Z"},
- },
- Expected: map[string]TaskJob{
- "id1": {
- LastUpdate: trial.Time(time.RFC3339, "2023-01-01T00:02:00Z"),
- Completed: false,
- count: 3,
- },
- },
- },
- "multi-child": {
- Input: []task.Task{
- {Type: "pull", ID: "id1", Info: "?date=2023-01-01", Started: "2023-01-01T00:00:00Z"},
- {Type: "pull", ID: "id1", Info: "?date=2023-01-01", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:01Z", Result: task.CompleteResult},
- {Type: "transform", ID: "id1", Info: "/product/2023-01-01/data.txt", Started: "2023-01-01T00:02:00Z"},
- {Type: "transform", ID: "id1", Info: "/product/2023-01-01/data.txt", Started: "2023-01-01T00:02:00Z", Ended: "2023-01-01T00:02:15Z", Result: task.CompleteResult},
- {Type: "load", ID: "id1", Info: "/product/2023-01-01/data.txt?table=schema.product", Started: "2023-01-01T00:04:00Z"},
- {Type: "load", ID: "id1", Info: "/product/2023-01-01/data.txt?table=schema.product", Started: "2023-01-01T00:04:00Z", Ended: "2023-01-01T00:05:12Z", Result: task.CompleteResult},
- },
- Expected: map[string]TaskJob{
- "id1": {
- LastUpdate: trial.Time(time.RFC3339, "2023-01-01T00:05:12Z"),
- Completed: true,
- count: 6,
- },
- },
- },
- }
- trial.New(fn, cases).SubTest(t)
-}
-
-func TestRecycle(t *testing.T) {
- now := time.Now()
- cache := Memory{
- ttl: time.Hour,
- cache: map[string]TaskJob{
- "keep": {
- Completed: false,
- LastUpdate: now.Add(-30 * time.Minute),
- Events: []task.Task{{Type: "test1"}},
- },
- "expire": {
- Completed: true,
- LastUpdate: now.Add(-90 * time.Minute),
- },
- "not-completed": {
- Completed: false,
- LastUpdate: now.Add(-90 * time.Minute),
- Events: []task.Task{
- {Type: "test1", Created: now.String()},
- {Type: "test1", Created: now.String(), Result: task.CompleteResult},
- {Type: "test2", Created: now.String()},
- },
- },
- },
- }
-
- stat := cache.Recycle()
- stat.ProcessTime = 0
- expected := Stat{
- Count: 1,
- Removed: 2,
- Unfinished: []task.Task{
- {Type: "test2", Created: now.String()},
- }}
- if eq, diff := trial.Equal(stat, expected); !eq {
- t.Logf(diff)
- }
-}
-
-func TestRecap(t *testing.T) {
- fn := func(in []task.Task) (map[string]string, error) {
- c := &Memory{cache: map[string]TaskJob{}}
- for _, t := range in {
- c.Add(t)
- }
- result := map[string]string{}
- for k, v := range c.Recap() {
- result[k] = v.String()
- }
- return result, nil
- }
- cases := trial.Cases[[]task.Task, map[string]string]{
- "task no job": {
- Input: []task.Task{{ID: "abc", Type: "test1", Info: "?date=2020-01-02", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:10Z"}},
- Expected: map[string]string{
- "test1": "min: 10s max: 10s avg: 10s\n\tComplete: 1 2020/01/02\n",
- },
- },
- "task:job": {
- Input: []task.Task{
- {ID: "abc", Type: "test1", Job: "job1", Info: "?day=2020-01-01", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:10Z"},
- {ID: "abc", Type: "test1", Job: "job1", Info: "?day=2020-01-02", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:15Z"},
- {ID: "abc", Type: "test1", Job: "job1", Info: "?day=2020-01-03", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:05Z"},
- },
- Expected: map[string]string{
- "test1:job1": "min: 5s max: 15s avg: 10s\n\tComplete: 3 2020/01/01-2020/01/03\n",
- },
- },
- "with errors": {
- Input: []task.Task{
- {ID: "abc", Type: "test1", Job: "job1", Info: "?day=2020-01-01", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:10Z"},
- {ID: "abc", Type: "test1", Job: "job1", Info: "?day=2020-01-02", Result: "error", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:15Z"},
- {ID: "abc", Type: "test1", Job: "job1", Info: "?day=2020-01-03", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:05Z"},
- },
- Expected: map[string]string{
- "test1:job1": "min: 5s max: 10s avg: 7.5s\n\tComplete: 2 2020/01/01,2020/01/03\n\tError: 1 2020/01/02\n",
- },
- },
- "hourly": {
- Input: []task.Task{
- {ID: "abc", Type: "proc", Job: "hour", Info: "?hour=2020-01-01T05", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:10Z"},
- {ID: "abc", Type: "proc", Job: "hour", Info: "?hour_utc=2020-01-01T06", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:15Z"},
- {ID: "abc", Type: "proc", Job: "hour", Info: "?hour=2020-01-01T07", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:05Z"},
- {ID: "abc", Type: "proc", Job: "hour", Info: "?hour=2020-01-01T08", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:47Z"},
- {ID: "abc", Type: "proc", Job: "hour", Info: "?hour=2020-01-01T09", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:01:33Z"},
- },
- Expected: map[string]string{
- "proc:hour": "min: 5s max: 1m33s avg: 34s\n\tComplete: 5 2020/01/01T05-2020/01/01T09\n",
- },
- },
- "monthly": {
- Input: []task.Task{
- {ID: "abc", Type: "month", Info: "?day=2020-01-01", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:10Z"},
- {ID: "abc", Type: "month", Info: "?day=2020-02-01", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:15Z"},
- },
- Expected: map[string]string{
- "month": "min: 10s max: 15s avg: 12.5s\n\tComplete: 2 2020/01/01,2020/02/01\n",
- },
- },
- "meta_job": {
- Input: []task.Task{
- {ID: "abc", Type: "test1", Info: "?day=2020-01-01", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:10Z", Meta: "job=job1"},
- {ID: "abc", Type: "test1", Info: "?day=2020-01-02", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:15Z", Meta: "job=job1"},
- {ID: "abc", Type: "test1", Info: "?day=2020-01-03", Result: "complete", Started: "2023-01-01T00:00:00Z", Ended: "2023-01-01T00:00:05Z", Meta: "job=job1"},
- },
- Expected: map[string]string{
- "test1:job1": "min: 5s max: 15s avg: 10s\n\tComplete: 3 2020/01/01-2020/01/03\n",
- },
- },
- }
- trial.New(fn, cases).SubTest(t)
-}
diff --git a/apps/flowlord/cache/stats.go b/apps/flowlord/cache/stats.go
deleted file mode 100644
index 1fde69c5..00000000
--- a/apps/flowlord/cache/stats.go
+++ /dev/null
@@ -1,125 +0,0 @@
-package cache
-
-import (
- "encoding/json"
- "fmt"
- "time"
-
- gtools "github.com/jbsmith7741/go-tools"
- "github.com/pcelvng/task"
-
- "github.com/pcelvng/task-tools/tmpl"
-)
-
-const (
- precision = 10 * time.Millisecond
-)
-
-type Stats struct {
- CompletedCount int
- CompletedTimes []time.Time
-
- ErrorCount int
- ErrorTimes []time.Time
-
- ExecTimes *DurationStats
-}
-
-func (s *Stats) MarshalJSON() ([]byte, error) {
- type count struct {
- Count int
- Times string
- }
-
- v := struct {
- Min string `json:"min"`
- Max string `json:"max"`
- Average string `json:"avg"`
- Complete count `json:"complete"`
- Error count `json:"error"`
- }{
- Min: gtools.PrintDuration(s.ExecTimes.Min),
- Max: gtools.PrintDuration(s.ExecTimes.Max),
- Average: gtools.PrintDuration(s.ExecTimes.Average()),
- Complete: count{
- Count: s.CompletedCount,
- Times: tmpl.PrintDates(s.CompletedTimes),
- },
- Error: count{
- Count: s.ErrorCount,
- Times: tmpl.PrintDates(s.ErrorTimes),
- },
- }
- return json.Marshal(v)
-}
-
-func (s Stats) String() string {
- r := s.ExecTimes.String()
- if s.CompletedCount > 0 {
- r += fmt.Sprintf("\n\tComplete: %d %v", s.CompletedCount, tmpl.PrintDates(s.CompletedTimes))
- }
- if s.ErrorCount > 0 {
- r += fmt.Sprintf("\n\tError: %d %v", s.ErrorCount, tmpl.PrintDates(s.ErrorTimes))
- }
-
- return r + "\n"
-}
-
-type DurationStats struct {
- Min time.Duration
- Max time.Duration
- sum int64
- count int64
-}
-
-func (s *DurationStats) Add(d time.Duration) {
- if s.count == 0 {
- s.Min = d
- s.Max = d
- }
-
- if d > s.Max {
- s.Max = d
- } else if d < s.Min {
- s.Min = d
- }
- // truncate times to milliseconds to preserve space
- s.sum += int64(d / precision)
- s.count++
-}
-
-func (s *DurationStats) Average() time.Duration {
- if s.count == 0 {
- return 0
- }
- return time.Duration(s.sum/s.count) * precision
-}
-
-func (s *DurationStats) String() string {
- return fmt.Sprintf("min: %v max: %v avg: %v",
- s.Min, s.Max, s.Average())
-}
-
-func (stats *Stats) Add(tsk task.Task) {
- tm := tmpl.TaskTime(tsk)
- if tsk.Result == task.ErrResult {
- stats.ErrorCount++
- stats.ErrorTimes = append(stats.ErrorTimes, tm)
- return
- }
-
- stats.CompletedCount++
- stats.CompletedTimes = append(stats.CompletedTimes, tm)
-
- end, _ := time.Parse(time.RFC3339, tsk.Ended)
- start, _ := time.Parse(time.RFC3339, tsk.Started)
- stats.ExecTimes.Add(end.Sub(start))
-}
-
-type pathTime time.Time
-
-func (p *pathTime) UnmarshalText(b []byte) error {
- t := tmpl.PathTime(string(b))
- *p = pathTime(t)
- return nil
-}
diff --git a/apps/flowlord/files.go b/apps/flowlord/files.go
index bd6d5de0..d736072f 100644
--- a/apps/flowlord/files.go
+++ b/apps/flowlord/files.go
@@ -9,6 +9,7 @@ import (
"time"
"github.com/pcelvng/task"
+
"github.com/pcelvng/task-tools/file/stat"
"github.com/pcelvng/task-tools/tmpl"
"github.com/pcelvng/task-tools/workflow"
@@ -93,6 +94,9 @@ func unmarshalStat(b []byte) (sts stat.Stats) {
// if a match is found it will create a task and send it out
func (tm *taskMaster) matchFile(sts stat.Stats) error {
matches := 0
+ var taskIDs []string
+ var taskNames []string
+
for _, f := range tm.files {
if isMatch, _ := filepath.Match(f.SrcPattern, sts.Path); !isMatch {
continue
@@ -106,19 +110,33 @@ func (tm *taskMaster) matchFile(sts stat.Stats) error {
meta.Set("file", sts.Path)
meta.Set("filename", filepath.Base(sts.Path))
meta.Set("workflow", f.workflowFile)
- // todo: add job if provided in task name ex -> task:job
// populate the info string
info := tmpl.Parse(f.Template, t)
info, _ = tmpl.Meta(info, meta)
tsk := task.New(f.Topic(), info)
+ tsk.Job = f.Job()
tsk.Meta, _ = url.QueryUnescape(meta.Encode())
+ // Collect task information for storage
+ taskIDs = append(taskIDs, tsk.ID)
+ taskName := tsk.Type
+ if tsk.Job != "" {
+ taskName += ":" + tsk.Job
+ }
+ taskNames = append(taskNames, taskName)
+
if err := tm.producer.Send(tsk.Type, tsk.JSONBytes()); err != nil {
return err
}
}
+
+ // Store file message in database
+ if tm.taskCache != nil {
+ tm.taskCache.AddFileMessage(sts, taskIDs, taskNames)
+ }
+
if matches == 0 {
return fmt.Errorf("no match found for %q", sts.Path)
}
diff --git a/apps/flowlord/files_test.go b/apps/flowlord/files_test.go
index 6fc19361..62451817 100644
--- a/apps/flowlord/files_test.go
+++ b/apps/flowlord/files_test.go
@@ -128,7 +128,7 @@ func TestTaskMaster_MatchFile(t *testing.T) {
Input: stat.Stats{Path: "gs://bucket/group/data.txt"},
Expected: []task.Task{
{Type: "basic", Meta: "file=gs://bucket/group/data.txt&filename=data.txt&workflow=basic.toml"},
- {Type: "data", Meta: "file=gs://bucket/group/data.txt&filename=data.txt&job=1&workflow=data.toml"},
+ {Type: "data", Job: "1", Meta: "file=gs://bucket/group/data.txt&filename=data.txt&job=1&workflow=data.toml"},
},
},
}
diff --git a/apps/flowlord/handler.go b/apps/flowlord/handler.go
index b35ce859..85ad3c9e 100644
--- a/apps/flowlord/handler.go
+++ b/apps/flowlord/handler.go
@@ -1,34 +1,108 @@
package main
import (
+ "bytes"
+ "embed"
"encoding/json"
"errors"
+ "html/template"
"io"
+ "io/fs"
"log"
"net/http"
- "path"
"path/filepath"
"strconv"
"strings"
"time"
- "github.com/jbsmith7741/uri"
-
- "github.com/pcelvng/task-tools/slack"
-
+ "github.com/dustin/go-humanize"
"github.com/go-chi/chi/v5"
+ "github.com/go-chi/chi/v5/middleware"
gtools "github.com/jbsmith7741/go-tools"
"github.com/jbsmith7741/go-tools/appenderr"
+ "github.com/jbsmith7741/uri"
"github.com/pcelvng/task"
tools "github.com/pcelvng/task-tools"
+ "github.com/pcelvng/task-tools/apps/flowlord/sqlite"
"github.com/pcelvng/task-tools/file"
- "github.com/pcelvng/task-tools/workflow"
+ "github.com/pcelvng/task-tools/slack"
)
+//go:embed handler/alert.tmpl
+var AlertTemplate string
+
+//go:embed handler/files.tmpl
+var FilesTemplate string
+
+//go:embed handler/task.tmpl
+var TaskTemplate string
+
+//go:embed handler/workflow.tmpl
+var WorkflowTemplate string
+
+//go:embed handler/header.tmpl
+var HeaderTemplate string
+
+//go:embed handler/about.tmpl
+var AboutTemplate string
+
+//go:embed handler/static/*
+var StaticFiles embed.FS
+
+var isLocal = false
+
+// getBaseFuncMap returns a template.FuncMap with all common template functions
+func getBaseFuncMap() template.FuncMap {
+ return template.FuncMap{
+ // Time formatting functions
+ "formatFullDate": func(t time.Time) string {
+ return t.Format(time.RFC3339)
+ },
+ "formatTimeHour": func(t time.Time) string {
+ return t.Format("2006-01-02T15")
+ },
+ // Duration formatting
+ "formatDuration": gtools.PrintDuration,
+ // Size formatting
+ "formatBytes": func(bytes int64) string {
+ if bytes < 0 {
+ return "0 B"
+ }
+ return humanize.Bytes(uint64(bytes))
+ },
+ // String manipulation
+ "slice": func(s string, start, end int) string {
+ if start >= len(s) {
+ return ""
+ }
+ if end > len(s) {
+ end = len(s)
+ }
+ return s[start:end]
+ },
+ // Math functions
+ "add": func(a, b int) int {
+ return a + b
+ },
+ }
+}
+
func (tm *taskMaster) StartHandler() {
router := chi.NewRouter()
- router.Get("/", tm.Info)
+
+ // Enable gzip compression for all responses
+ router.Use(middleware.Compress(5))
+
+ // Static file serving - serve embedded static files
+ // Create a sub-filesystem that strips the "handler/" prefix
+ staticFS, err := fs.Sub(StaticFiles, "handler/static")
+ if err != nil {
+ log.Fatal("Failed to create static filesystem:", err)
+ }
+ router.Handle("/static/*", http.StripPrefix("/static/", http.FileServer(http.FS(staticFS))))
+
+ router.Get("/", tm.htmlAbout)
router.Get("/info", tm.Info)
router.Get("/refresh", tm.refreshHandler)
router.Post("/backload", tm.Backloader)
@@ -50,6 +124,11 @@ func (tm *taskMaster) StartHandler() {
})
router.Get("/task/{id}", tm.taskHandler)
router.Get("/recap", tm.recapHandler)
+ router.Get("/web/alert", tm.htmlAlert)
+ router.Get("/web/files", tm.htmlFiles)
+ router.Get("/web/task", tm.htmlTask)
+ router.Get("/web/workflow", tm.htmlWorkflow)
+ router.Get("/web/about", tm.htmlAbout)
if tm.port == 0 {
log.Println("flowlord router disabled")
@@ -71,13 +150,18 @@ func (tm *taskMaster) Info(w http.ResponseWriter, r *http.Request) {
}
// create a copy of all workflows
- wCache := make(map[string]map[string]workflow.Phase) // [file][task:job]Phase
- for key, w := range tm.Cache.Workflows {
- phases := make(map[string]workflow.Phase)
- for _, j := range w.Phases {
- phases[pName(j.Topic(), j.Job())] = j
+ wCache := make(map[string]map[string]sqlite.Phase) // [file][task:job]Phase
+ workflowFiles := tm.taskCache.GetWorkflowFiles()
+ for _, filePath := range workflowFiles {
+ phases, err := tm.taskCache.GetPhasesForWorkflow(filePath)
+ if err != nil {
+ continue
}
- wCache[key] = phases
+ phaseMap := make(map[string]sqlite.Phase)
+ for _, j := range phases {
+ phaseMap[pName(j.Phase.Topic(), j.Phase.Job())] = j.Phase
+ }
+ wCache[filePath] = phaseMap
}
entries := tm.cron.Entries()
for i := 0; i < len(entries); i++ {
@@ -150,27 +234,27 @@ func (tm *taskMaster) Info(w http.ResponseWriter, r *http.Request) {
// Add non cron based tasks
for f, w := range wCache {
- for _, v := range w {
- k := pName(v.Topic(), v.Job())
+ for _, ph := range w {
+ k := pName(ph.Topic(), ph.Job())
// check for parents
- for v.DependsOn != "" {
- if t, found := wCache[f][v.DependsOn]; found {
- k = v.DependsOn
- v = t
+ for ph.DependsOn != "" {
+ if t, found := wCache[f][ph.DependsOn]; found {
+ k = ph.DependsOn
+ ph = t
} else {
break
}
}
- children := tm.getAllChildren(v.Topic(), f, v.Job())
- // todo: remove children from Cache
+ children := tm.getAllChildren(ph.Topic(), f, ph.Job())
+ // todo: remove children from SQLite
if _, found := sts.Workflow[f]; !found {
sts.Workflow[f] = make(map[string]cEntry)
}
- warning := validatePhase(v)
- if v.DependsOn != "" {
- warning += "parent task not found: " + v.DependsOn
+ warning := ph.Validate()
+ if ph.DependsOn != "" {
+ warning += "parent task not found: " + ph.DependsOn
}
sts.Workflow[f][k] = cEntry{
Schedule: make([]string, 0),
@@ -206,7 +290,7 @@ func (tm *taskMaster) refreshHandler(w http.ResponseWriter, _ *http.Request) {
func (tm *taskMaster) taskHandler(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
- v := tm.taskCache.Get(id)
+ v := tm.taskCache.GetTask(id)
b, _ := json.Marshal(v)
w.Header().Add("Content-Type", "application/json")
w.Write(b)
@@ -214,7 +298,7 @@ func (tm *taskMaster) taskHandler(w http.ResponseWriter, r *http.Request) {
func (tm *taskMaster) recapHandler(w http.ResponseWriter, r *http.Request) {
- data := tm.taskCache.Recap()
+ data := tm.taskCache.Recap(time.Now().UTC())
if r.Header.Get("Accept") == "application/json" {
b, err := json.Marshal(data)
@@ -242,13 +326,10 @@ func (tm *taskMaster) workflowFiles(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
return
}
- var pth string
+ pth := tm.path
// support directory and single file for workflow path lookup.
- if _, f := path.Split(tm.path); f == "" {
- pth = tm.path + "/" + fName
- } else {
- // for single file show the file regardless of the file param
- pth = tm.path
+ if tm.taskCache.IsDir() {
+ pth += "/" + fName
}
sts, err := file.Stat(pth, tm.fOpts)
@@ -258,12 +339,12 @@ func (tm *taskMaster) workflowFiles(w http.ResponseWriter, r *http.Request) {
}
w.Header().Set("Content-Type", "text/plain")
if sts.IsDir {
+ w.WriteHeader(http.StatusOK)
files, _ := file.List(pth, tm.fOpts)
for _, f := range files {
b, a, _ := strings.Cut(f.Path, tm.path)
w.Write([]byte(b + a + "\n"))
}
- w.WriteHeader(http.StatusOK)
return
}
reader, err := file.NewReader(pth, tm.fOpts)
@@ -279,13 +360,387 @@ func (tm *taskMaster) workflowFiles(w http.ResponseWriter, r *http.Request) {
case "json":
w.Header().Set("Content-Type", "application/json")
case "yaml", "yml":
- w.Header().Set("Context-Type", "text/x-yaml")
+ w.Header().Set("Content-Type", "text/x-yaml")
}
- b, _ := io.ReadAll(reader)
w.WriteHeader(http.StatusOK)
+ b, _ := io.ReadAll(reader)
w.Write(b)
}
+func (tm *taskMaster) htmlAlert(w http.ResponseWriter, r *http.Request) {
+
+ dt, _ := time.Parse("2006-01-02", r.URL.Query().Get("date"))
+ if dt.IsZero() {
+ dt = time.Now()
+ }
+ alerts, err := tm.taskCache.GetAlertsByDate(dt)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ w.Write([]byte(err.Error()))
+ return
+ }
+
+ // Get dates with alerts for calendar highlighting
+ datesWithData, _ := tm.taskCache.DatesByType("alerts")
+
+ w.WriteHeader(http.StatusOK)
+ w.Header().Set("Content-Type", "text/html")
+ w.Write(alertHTML(alerts, dt, datesWithData))
+}
+
+// htmlFiles handles GET /web/files - displays file messages for a specific date
+func (tm *taskMaster) htmlFiles(w http.ResponseWriter, r *http.Request) {
+ dt, _ := time.Parse("2006-01-02", r.URL.Query().Get("date"))
+ if dt.IsZero() {
+ dt = time.Now()
+ }
+
+ files, err := tm.taskCache.GetFileMessagesByDate(dt)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ w.Write([]byte(err.Error()))
+ return
+ }
+
+ // Get dates with file messages for calendar highlighting
+ datesWithData, _ := tm.taskCache.DatesByType("files")
+
+ w.WriteHeader(http.StatusOK)
+ w.Header().Set("Content-Type", "text/html")
+ w.Write(filesHTML(files, dt, datesWithData))
+}
+
+// htmlTask handles GET /web/task - displays task summary and table for a specific date
+func (tm *taskMaster) htmlTask(w http.ResponseWriter, r *http.Request) {
+ dt, _ := time.Parse("2006-01-02", r.URL.Query().Get("date"))
+ if dt.IsZero() {
+ dt = time.Now()
+ }
+
+ // Get filter parameters from query string
+ page := 1
+ if pageStr := r.URL.Query().Get("page"); pageStr != "" {
+ if p, err := strconv.Atoi(pageStr); err == nil && p > 0 {
+ page = p
+ }
+ }
+
+ filter := &sqlite.TaskFilter{
+ ID: r.URL.Query().Get("id"),
+ Type: r.URL.Query().Get("type"),
+ Job: r.URL.Query().Get("job"),
+ Result: r.URL.Query().Get("result"),
+ Page: page,
+ Limit: sqlite.DefaultPageSize,
+ }
+
+ // Get task summary statistics for the date
+ summaryStart := time.Now()
+ taskStats, err := tm.taskCache.GetTaskSummaryByDate(dt)
+ summaryTime := time.Since(summaryStart)
+ if err != nil {
+ log.Printf("Error getting task summary: %v", err)
+ taskStats = sqlite.TaskStats{}
+ }
+
+ // Get filtered and paginated tasks
+ queryStart := time.Now()
+ tasks, totalCount, err := tm.taskCache.GetTasksByDate(dt, filter)
+ queryTime := time.Since(queryStart)
+ if err != nil {
+ log.Printf("Error getting tasks: %v", err)
+ tasks = []sqlite.TaskView{}
+ totalCount = 0
+ }
+
+ // Get dates with tasks for calendar highlighting
+ datesWithData, _ := tm.taskCache.DatesByType("tasks")
+
+ w.WriteHeader(http.StatusOK)
+ w.Header().Set("Content-Type", "text/html")
+ htmlBytes := taskHTML(tasks, taskStats, totalCount, dt, filter, datesWithData, summaryTime+queryTime)
+ w.Write(htmlBytes)
+}
+
+// htmlWorkflow handles GET /web/workflow - displays workflow phases from database
+func (tm *taskMaster) htmlWorkflow(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ w.Header().Set("Content-Type", "text/html")
+ w.Write(workflowHTML(tm.taskCache))
+}
+
+// htmlAbout handles GET /web/about - displays system information and cache statistics
+func (tm *taskMaster) htmlAbout(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ w.Header().Set("Content-Type", "text/html")
+ w.Write(tm.aboutHTML())
+}
+
+// filesHTML renders the file messages HTML page
+func filesHTML(files []sqlite.FileMessage, date time.Time, datesWithData []string) []byte {
+ // Calculate statistics
+ totalFiles := len(files)
+ matchedFiles := 0
+ totalTasks := 0
+
+ for _, file := range files {
+ if len(file.TaskNames) > 0 {
+ matchedFiles++
+ totalTasks += len(file.TaskNames)
+ }
+ }
+
+ unmatchedFiles := totalFiles - matchedFiles
+
+ // Calculate navigation dates
+ prevDate := date.AddDate(0, 0, -1)
+ nextDate := date.AddDate(0, 0, 1)
+
+ data := map[string]interface{}{
+ "Date": date.Format("Monday, January 2, 2006"),
+ "DateValue": date.Format("2006-01-02"),
+ "PrevDate": prevDate.Format("2006-01-02"),
+ "NextDate": nextDate.Format("2006-01-02"),
+ "Files": files,
+ "TotalFiles": totalFiles,
+ "MatchedFiles": matchedFiles,
+ "UnmatchedFiles": unmatchedFiles,
+ "TotalTasks": totalTasks,
+ "CurrentPage": "files",
+ "PageTitle": "File Messages",
+ "isLocal": isLocal,
+ "DatesWithData": datesWithData,
+ }
+
+ // Parse and execute template using the shared funcMap
+ tmpl, err := template.New("files").Funcs(getBaseFuncMap()).Parse(HeaderTemplate + FilesTemplate)
+ if err != nil {
+ return []byte(err.Error())
+ }
+
+ var buf bytes.Buffer
+ if err := tmpl.Execute(&buf, data); err != nil {
+ return []byte(err.Error())
+ }
+
+ return buf.Bytes()
+}
+
+// taskHTML renders the task summary and table HTML page
+func taskHTML(tasks []sqlite.TaskView, taskStats sqlite.TaskStats, totalCount int, date time.Time, filter *sqlite.TaskFilter, datesWithData []string, queryTime time.Duration) []byte {
+ renderStart := time.Now()
+
+ // Calculate navigation dates
+ prevDate := date.AddDate(0, 0, -1)
+ nextDate := date.AddDate(0, 0, 1)
+
+ // Get aggregate counts from TaskStats
+ counts := taskStats.TotalCounts()
+
+ // Get unique types and jobs from TaskStats for filter dropdowns
+ types := taskStats.UniqueTypes()
+ jobsByType := taskStats.JobsByType()
+
+ // Calculate pagination info
+ totalPages := (totalCount + filter.Limit - 1) / filter.Limit
+ if totalPages == 0 {
+ totalPages = 1
+ }
+
+ // Calculate display indices
+ startIdx := (filter.Page-1)*filter.Limit + 1
+ endIdx := startIdx + len(tasks) - 1
+ if len(tasks) == 0 {
+ startIdx = 0
+ endIdx = 0
+ }
+
+ data := map[string]interface{}{
+ "Date": date.Format("Monday, January 2, 2006"),
+ "DateValue": date.Format("2006-01-02"),
+ "PrevDate": prevDate.Format("2006-01-02"),
+ "NextDate": nextDate.Format("2006-01-02"),
+ "Tasks": tasks,
+ "Counts": counts,
+ "Filter": filter,
+ "CurrentPage": "task",
+ "PageTitle": "Task Dashboard",
+ "isLocal": isLocal,
+ "DatesWithData": datesWithData,
+ "UniqueTypes": types,
+ "JobsByType": jobsByType,
+ // Pagination info
+ "Page": filter.Page,
+ "PageSize": filter.Limit,
+ "TotalPages": totalPages,
+ "StartIndex": startIdx,
+ "EndIndex": endIdx,
+ "FilteredCount": totalCount,
+ }
+
+ // Parse and execute template using base funcMap
+ tmpl, err := template.New("task").Funcs(getBaseFuncMap()).Parse(HeaderTemplate + TaskTemplate)
+ if err != nil {
+ return []byte(err.Error())
+ }
+
+ var buf bytes.Buffer
+ if err := tmpl.Execute(&buf, data); err != nil {
+ return []byte(err.Error())
+ }
+
+ htmlSize := buf.Len()
+ renderTime := time.Since(renderStart)
+
+ // Single consolidated log with all metrics
+ log.Printf("Task page: date=%s filters=[id=%q type=%q job=%q result=%q] total=%d filtered=%d page=%d/%d query=%v render=%v size=%.2fMB",
+ date.Format("2006-01-02"), filter.ID, filter.Type, filter.Job, filter.Result,
+ counts.Total, totalCount, filter.Page, totalPages,
+ queryTime, renderTime, float64(htmlSize)/(1024*1024))
+
+ return buf.Bytes()
+}
+
+// workflowHTML renders the workflow phases HTML page
+func workflowHTML(tCache *sqlite.SQLite) []byte {
+ // Get all workflow files and their phases
+ workflowFiles := tCache.GetWorkflowFiles()
+
+ workflowFileSummary := make(map[string]int)
+ allPhases := make([]sqlite.PhaseDB, 0)
+
+ for _, filePath := range workflowFiles {
+ phases, err := tCache.GetPhasesForWorkflow(filePath)
+ if err != nil {
+ continue
+ }
+
+ workflowFileSummary[filePath] = len(phases)
+ allPhases = append(allPhases, phases...)
+ }
+
+ data := map[string]interface{}{
+ "Phases": allPhases,
+ "WorkflowFileSummary": workflowFileSummary,
+ "CurrentPage": "workflow",
+ "PageTitle": "Workflow Dashboard",
+ "isLocal": isLocal,
+ "DatesWithData": []string{}, // Workflow page doesn't use date picker with highlights
+ }
+
+ // Parse and execute template using the shared funcMap
+ tmpl, err := template.New("workflow").Funcs(getBaseFuncMap()).Parse(HeaderTemplate + WorkflowTemplate)
+ if err != nil {
+ return []byte("Error:" + err.Error())
+ }
+
+ var buf bytes.Buffer
+ if err := tmpl.Execute(&buf, data); err != nil {
+ return []byte("Error:" + err.Error())
+ }
+
+ return buf.Bytes()
+}
+
+// aboutHTML renders the about page HTML
+func (tm *taskMaster) aboutHTML() []byte {
+ // Get basic system information
+ sts := stats{
+ AppName: "flowlord",
+ Version: tools.Version,
+ RunTime: gtools.PrintDuration(time.Since(tm.initTime)),
+ NextUpdate: tm.nextUpdate.Format("2006-01-02T15:04:05"),
+ LastUpdate: tm.lastUpdate.Format("2006-01-02T15:04:05"),
+ }
+
+ // Get database size information
+ dbSize, err := tm.taskCache.GetDBSize()
+ if err != nil {
+ return []byte("Error getting database size: " + err.Error())
+ }
+
+ // Get table statistics
+ tableStats, err := tm.taskCache.GetTableStats()
+ if err != nil {
+ return []byte("Error getting table statistics: " + err.Error())
+ }
+
+ // Create data structure for template
+ data := map[string]interface{}{
+ "AppName": sts.AppName,
+ "Version": sts.Version,
+ "RunTime": sts.RunTime,
+ "LastUpdate": sts.LastUpdate,
+ "NextUpdate": sts.NextUpdate,
+ "TotalDBSize": dbSize.TotalSize,
+ "PageCount": dbSize.PageCount,
+ "PageSize": dbSize.PageSize,
+ "DBPath": dbSize.DBPath,
+ "TableStats": tableStats,
+ "SchemaVersion": tm.taskCache.GetSchemaVersion(),
+ "Retention": gtools.PrintDuration(tm.taskCache.Retention),
+ "TaskTTL": gtools.PrintDuration(tm.taskCache.TaskTTL),
+ "MinFrequency": gtools.PrintDuration(tm.slack.MinFrequency),
+ "MaxFrequency": gtools.PrintDuration(tm.slack.MaxFrequency),
+ "CurrentFrequency": gtools.PrintDuration(tm.slack.GetCurrentDuration()),
+ "CurrentPage": "about",
+ "DateValue": "", // About page doesn't need date
+ "PageTitle": "System Information",
+ "isLocal": isLocal,
+ "DatesWithData": []string{}, // About page doesn't use date picker with highlights
+ }
+
+ // Parse and execute template using the shared funcMap
+ tmpl, err := template.New("about").Funcs(getBaseFuncMap()).Parse(HeaderTemplate + AboutTemplate)
+ if err != nil {
+ return []byte("Error parsing template: " + err.Error())
+ }
+
+ var buf bytes.Buffer
+ if err := tmpl.Execute(&buf, data); err != nil {
+ return []byte("Error executing template: " + err.Error())
+ }
+
+ return buf.Bytes()
+}
+
+// AlertData holds both the alerts and summary data for the template
+type AlertData struct {
+ Alerts []sqlite.AlertRecord
+ Summary []sqlite.SummaryLine
+}
+
+// alertHTML will take a list of task and display a html webpage that is easily to digest what is going on.
+func alertHTML(tasks []sqlite.AlertRecord, date time.Time, datesWithData []string) []byte {
+ // Generate summary data using BuildCompactSummary
+ summary := sqlite.BuildCompactSummary(tasks)
+
+ // Create data structure for template
+ data := map[string]interface{}{
+ "Alerts": tasks,
+ "Summary": summary,
+ "CurrentPage": "alert",
+ "DateValue": date.Format("2006-01-02"),
+ "Date": date.Format("Monday, January 2, 2006"),
+ "PageTitle": "Task Alerts",
+ "isLocal": isLocal,
+ "DatesWithData": datesWithData,
+ }
+
+ // Parse and execute template using the shared funcMap
+ tmpl, err := template.New("alert").Funcs(getBaseFuncMap()).Parse(HeaderTemplate + AlertTemplate)
+ if err != nil {
+ return []byte(err.Error())
+ }
+
+ var buf bytes.Buffer
+ if err := tmpl.Execute(&buf, data); err != nil {
+ return []byte(err.Error())
+ }
+
+ return buf.Bytes()
+}
+
type request struct {
From string // start
To string // end
@@ -370,12 +825,12 @@ func (tm *taskMaster) backload(req request) response {
start = at
end = at
}
-
- workflowPath, phase := tm.Cache.Search(req.Task, req.Job)
- if workflowPath != "" {
- msg = append(msg, "phase found in "+workflowPath)
+
+ phase := tm.taskCache.Search(req.Task, req.Job)
+ if phase.FilePath != "" {
+ msg = append(msg, "phase found in "+phase.FilePath)
req.Template = phase.Template
- req.Workflow = workflowPath
+ req.Workflow = phase.FilePath
}
if req.Template == "" {
name := req.Task
diff --git a/apps/flowlord/handler/about.tmpl b/apps/flowlord/handler/about.tmpl
new file mode 100644
index 00000000..ae77ba28
--- /dev/null
+++ b/apps/flowlord/handler/about.tmpl
@@ -0,0 +1,138 @@
+
+
+
+
+
+ Flowlord: About
+
+
+
+
+ {{template "header" .}}
+
+
+
+
+
+
Application
+
+ App Name
+ {{.AppName}}
+
+
+ Version
+ {{.Version}}
+
+
+ Runtime
+ {{.RunTime}}
+
+
+
+
+
Cache Status
+
+ Last Update
+ {{.LastUpdate}}
+
+
+ Next Update
+ {{.NextUpdate}}
+
+
+ Database
+ {{.DBPath}}
+
+
+ Schema Version
+ {{.SchemaVersion}}
+
+
+
+
+
Database Size
+
+ Total Size
+ {{.TotalDBSize}}
+
+
+ Page Count
+ {{.PageCount}}
+
+
+ Page Size
+ {{.PageSize}}
+
+
+
+
+
Cache Settings
+
+ Retention Period
+ {{.Retention}}
+
+
+ Task TTL
+ {{.TaskTTL}}
+
+
+
+
+
Notification Settings
+
+ Current Frequency
+ {{.CurrentFrequency}}
+
+
+ Min Frequency
+ {{.MinFrequency}}
+
+
+ Max Frequency
+ {{.MaxFrequency}}
+
+
+
+
+
+
Table Breakdown
+
+
+
+
+ | Table Name |
+ Row Count |
+ Table Size |
+ Index Size |
+ Total Size |
+ Percentage |
+
+
+
+ {{range .TableStats}}
+
+ | {{.Name}} |
+ {{.RowCount}} |
+ {{.TableHuman}} |
+ {{.IndexHuman}} |
+ {{.TotalHuman}} |
+ {{printf "%.1f" .Percentage}}% |
+
+ {{end}}
+
+
+
+
+
+
+
+
+
+
diff --git a/apps/flowlord/handler/alert.tmpl b/apps/flowlord/handler/alert.tmpl
new file mode 100644
index 00000000..ae7f3ee5
--- /dev/null
+++ b/apps/flowlord/handler/alert.tmpl
@@ -0,0 +1,337 @@
+
+
+
+
+
+ Flowlord: Alerts
+
+
+
+
+ {{template "header" .}}
+
+
+
Alert Summary
+
+ {{range .Summary}}
+
+ {{end}}
+
+
+
+
+
+
+ | ID |
+ Task Type |
+ Job |
+ Message |
+ Alerted At |
+ Task Time |
+
+
+
+ {{range .Alerts}}
+
+ |
+ {{.TaskID}}
+ |
+ {{.Type}} |
+ {{.Job}} |
+
+ {{.Msg}}
+ |
+ {{.CreatedAt.Format "2006-01-02T15:04:05Z"}} |
+ {{if .TaskTime.IsZero}}N/A{{else}}{{.TaskTime.Format "2006-01-02T15"}}{{end}} |
+
+ {{end}}
+
+
+
+
+ Total Alerts: {{len .Alerts}}
+
+
+
+
+
+
+