From 4d4fda17ca74cd23361358cf1c7afd9a9ee68df8 Mon Sep 17 00:00:00 2001 From: allanice001 Date: Tue, 23 Sep 2025 13:40:20 +0100 Subject: [PATCH] fix: jobs api --- internal/handlers/jobs/jobs.go | 41 ++++++++++++++++++++++++--------- internal/handlers/jobs/kpi.go | 20 ++++++++++------ internal/handlers/jobs/queue.go | 27 ++++++++++++++++++---- 3 files changed, 65 insertions(+), 23 deletions(-) diff --git a/internal/handlers/jobs/jobs.go b/internal/handlers/jobs/jobs.go index b25dd9b..447d13a 100644 --- a/internal/handlers/jobs/jobs.go +++ b/internal/handlers/jobs/jobs.go @@ -17,15 +17,20 @@ import ( ) type JobListItem struct { - ID string `json:"id"` - QueueName string `json:"queue_name"` - Status string `json:"status"` - RetryCount int `json:"retry_count"` - MaxRetry int `json:"max_retry"` - ScheduledAt time.Time `json:"scheduled_at"` - StartedAt *time.Time `json:"started_at,omitempty"` - UpdatedAt time.Time `json:"updated_at"` - LastError *string `json:"last_error,omitempty"` + ID string `json:"id"` + QueueName string `json:"queue_name"` + Status string `json:"status"` + RetryCount int `json:"retry_count"` + MaxRetry int `json:"max_retry"` + ScheduledAt time.Time `json:"scheduled_at"` + StartedAt *time.Time `json:"started_at,omitempty"` + UpdatedAt time.Time `json:"updated_at"` + LastError *string `json:"last_error,omitempty"` + ResultStatus string `json:"result_status"` + Processed int `json:"processed"` + Ready int `json:"ready"` + Failed int `json:"failed"` + ElapsedMs int `json:"elapsed_ms"` } type EnqueueReq struct { @@ -127,7 +132,14 @@ func GetActive(w http.ResponseWriter, r *http.Request) { var rows []JobListItem err := db.DB.Model(&models.Job{}). - Select("id, queue_name, status, retry_count, max_retry, scheduled_at, started_at, updated_at, last_error"). + Select(` + id, queue_name, status, retry_count, max_retry, scheduled_at, started_at, updated_at, last_error, + COALESCE(result->>'status','') AS result_status, + COALESCE((result->>'processed')::int, 0) AS processed, + COALESCE((result->>'ready')::int, 0) AS ready, + COALESCE((result->>'failed')::int, 0) AS failed, + COALESCE((result->>'elapsed_ms')::int, 0) AS elapsed_ms + `). Where("status = ?", "running"). Order("started_at DESC NULLS LAST, updated_at DESC"). Limit(limit). @@ -160,7 +172,14 @@ func GetFailures(w http.ResponseWriter, r *http.Request) { var rows []JobListItem err := db.DB.Model(&models.Job{}). - Select("id, queue_name, status, retry_count, max_retry, scheduled_at, started_at, updated_at, last_error"). + Select(` + id, queue_name, status, retry_count, max_retry, scheduled_at, started_at, updated_at, last_error, + COALESCE(result->>'status','') AS result_status, + COALESCE((result->>'processed')::int, 0) AS processed, + COALESCE((result->>'ready')::int, 0) AS ready, + COALESCE((result->>'failed')::int, 0) AS failed, + COALESCE((result->>'elapsed_ms')::int, 0) AS elapsed_ms + `). Where("status = ?", "failed"). Order("updated_at DESC"). Limit(limit). diff --git a/internal/handlers/jobs/kpi.go b/internal/handlers/jobs/kpi.go index 4afe134..36b9012 100644 --- a/internal/handlers/jobs/kpi.go +++ b/internal/handlers/jobs/kpi.go @@ -21,12 +21,14 @@ func LoadKPI(db *gorm.DB) (KPI, error) { now := time.Now() dayAgo := now.Add(-24 * time.Hour) + // Running now if err := db.Model(&models.Job{}). Where("status = ?", "running"). Count(&k.RunningNow).Error; err != nil { return k, err } + // Scheduled in the future if err := db.Model(&models.Job{}). Where("status IN ?", []string{"queued", "scheduled", "pending"}). Where("scheduled_at > ?", now). @@ -34,6 +36,7 @@ func LoadKPI(db *gorm.DB) (KPI, error) { return k, err } + // Due now (queued/scheduled/pending with scheduled_at <= now) if err := db.Model(&models.Job{}). Where("status IN ?", []string{"queued", "scheduled", "pending"}). Where("scheduled_at <= ?", now). @@ -41,22 +44,25 @@ func LoadKPI(db *gorm.DB) (KPI, error) { return k, err } + // Sum of 'ready' over successful jobs in last 24h if err := db.Model(&models.Job{}). - Where("status = ?", "success"). - Where("updated_at >= ?", dayAgo). - Count(&k.Succeeded24h).Error; err != nil { + Select("COALESCE(SUM((result->>'ready')::int), 0)"). + Where("status = 'success' AND updated_at >= ?", dayAgo). + Scan(&k.Succeeded24h).Error; err != nil { return k, err } + // Sum of 'failed' over successful jobs in last 24h (failures within a “success” run) if err := db.Model(&models.Job{}). - Where("status = ?", "failed"). - Where("updated_at >= ?", dayAgo). - Count(&k.Failed24h).Error; err != nil { + Select("COALESCE(SUM((result->>'failed')::int), 0)"). + Where("status = 'success' AND updated_at >= ?", dayAgo). + Scan(&k.Failed24h).Error; err != nil { return k, err } + // Retryable failed job rows (same as before) if err := db.Model(&models.Job{}). - Where("status = ?", "failed"). + Where("status = 'failed'"). Where("retry_count < max_retry"). Count(&k.Retryable).Error; err != nil { return k, err diff --git a/internal/handlers/jobs/queue.go b/internal/handlers/jobs/queue.go index c8583f8..4096549 100644 --- a/internal/handlers/jobs/queue.go +++ b/internal/handlers/jobs/queue.go @@ -31,11 +31,28 @@ func LoadPerQueue(db *gorm.DB) ([]QueueRollup, error) { var rr, qd, qf, s24, f24 int64 var avgDur *float64 - _ = db.Model(&models.Job{}).Where("queue_name = ? AND status = 'running'", q).Count(&rr).Error - _ = db.Model(&models.Job{}).Where("queue_name = ? AND status IN ('queued','scheduled','pending') AND scheduled_at <= ?", q, now).Count(&qd).Error - _ = db.Model(&models.Job{}).Where("queue_name = ? AND status IN ('queued','scheduled','pending') AND scheduled_at > ?", q, now).Count(&qf).Error - _ = db.Model(&models.Job{}).Where("queue_name = ? AND status = 'success' AND updated_at >= ?", q, dayAgo).Count(&s24).Error - _ = db.Model(&models.Job{}).Where("queue_name = ? AND status = 'failed' AND updated_at >= ?", q, dayAgo).Count(&f24).Error + _ = db.Model(&models.Job{}). + Where("queue_name = ? AND status = 'running'", q). + Count(&rr).Error + + _ = db.Model(&models.Job{}). + Where("queue_name = ? AND status IN ('queued','scheduled','pending') AND scheduled_at <= ?", q, now). + Count(&qd).Error + + _ = db.Model(&models.Job{}). + Where("queue_name = ? AND status IN ('queued','scheduled','pending') AND scheduled_at > ?", q, now). + Count(&qf).Error + + // Sum result.ready / result.failed over successes in last 24h + _ = db.Model(&models.Job{}). + Select("COALESCE(SUM((result->>'ready')::int), 0)"). + Where("queue_name = ? AND status = 'success' AND updated_at >= ?", q, dayAgo). + Scan(&s24).Error + + _ = db.Model(&models.Job{}). + Select("COALESCE(SUM((result->>'failed')::int), 0)"). + Where("queue_name = ? AND status = 'success' AND updated_at >= ?", q, dayAgo). + Scan(&f24).Error _ = db. Model(&models.Job{}).