mirror of
https://github.com/GlueOps/autoglue.git
synced 2026-02-13 12:50:05 +01:00
Refactor routing logic (Chi can be a pain when you're managing large sets of routes, but its one of the better options when considering a potential gRPC future)
Upgrade API Generation to fully support OAS3.1
Update swagger interface to RapiDoc - the old swagger interface doesnt support OAS3.1 yet
Docs are now embedded as part of the UI - once logged in they pick up the cookies and org id from what gets set by the UI, but you can override it
Other updates include better portability of the db-studio
Signed-off-by: allanice001 <allanice001@gmail.com>
387 lines
12 KiB
Go
387 lines
12 KiB
Go
package handlers
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/dyaksa/archer"
|
|
"github.com/glueops/autoglue/internal/bg"
|
|
"github.com/glueops/autoglue/internal/handlers/dto"
|
|
"github.com/glueops/autoglue/internal/models"
|
|
"github.com/glueops/autoglue/internal/utils"
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/google/uuid"
|
|
"gorm.io/gorm"
|
|
"gorm.io/gorm/clause"
|
|
)
|
|
|
|
// AdminListArcherJobs godoc
|
|
//
|
|
// @ID AdminListArcherJobs
|
|
// @Summary List Archer jobs (admin)
|
|
// @Description Paginated background jobs with optional filters. Search `q` may match id, type, error, payload (implementation-dependent).
|
|
// @Tags ArcherAdmin
|
|
// @Produce json
|
|
// @Param status query string false "Filter by status" Enums(queued,running,succeeded,failed,canceled,retrying,scheduled)
|
|
// @Param queue query string false "Filter by queue name / worker name"
|
|
// @Param q query string false "Free-text search"
|
|
// @Param page query int false "Page number" default(1)
|
|
// @Param page_size query int false "Items per page" minimum(1) maximum(100) default(25)
|
|
// @Success 200 {object} dto.PageJob
|
|
// @Failure 401 {string} string "Unauthorized"
|
|
// @Failure 403 {string} string "forbidden"
|
|
// @Failure 500 {string} string "internal error"
|
|
// @Router /admin/archer/jobs [get]
|
|
// @Security BearerAuth
|
|
func AdminListArcherJobs(db *gorm.DB) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
status := strings.TrimSpace(r.URL.Query().Get("status"))
|
|
queue := strings.TrimSpace(r.URL.Query().Get("queue"))
|
|
q := strings.TrimSpace(r.URL.Query().Get("q"))
|
|
page := atoiDefault(r.URL.Query().Get("page"), 1)
|
|
size := clamp(atoiDefault(r.URL.Query().Get("page_size"), 25), 1, 100)
|
|
|
|
base := db.Model(&models.Job{})
|
|
if status != "" {
|
|
base = base.Where("status = ?", status)
|
|
}
|
|
if queue != "" {
|
|
base = base.Where("queue_name = ?", queue)
|
|
}
|
|
if q != "" {
|
|
like := "%" + q + "%"
|
|
base = base.Where(
|
|
db.Where("id ILIKE ?", like).
|
|
Or("queue_name ILIKE ?", like).
|
|
Or("COALESCE(last_error,'') ILIKE ?", like).
|
|
Or("CAST(arguments AS TEXT) ILIKE ?", like),
|
|
)
|
|
}
|
|
|
|
var total int64
|
|
if err := base.Count(&total).Error; err != nil {
|
|
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
|
return
|
|
}
|
|
|
|
var rows []models.Job
|
|
offset := (page - 1) * size
|
|
if err := base.Order("created_at DESC").Limit(size).Offset(offset).Find(&rows).Error; err != nil {
|
|
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
|
return
|
|
}
|
|
|
|
items := make([]dto.Job, 0, len(rows))
|
|
for _, m := range rows {
|
|
items = append(items, mapModelJobToDTO(m))
|
|
}
|
|
|
|
utils.WriteJSON(w, http.StatusOK, dto.PageJob{
|
|
Items: items,
|
|
Total: int(total),
|
|
Page: page,
|
|
PageSize: size,
|
|
})
|
|
}
|
|
}
|
|
|
|
// AdminEnqueueArcherJob godoc
|
|
//
|
|
// @ID AdminEnqueueArcherJob
|
|
// @Summary Enqueue a new Archer job (admin)
|
|
// @Description Create a job immediately or schedule it for the future via `run_at`.
|
|
// @Tags ArcherAdmin
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Param body body dto.EnqueueRequest true "Job parameters"
|
|
// @Success 200 {object} dto.Job
|
|
// @Failure 400 {string} string "invalid json or missing fields"
|
|
// @Failure 401 {string} string "Unauthorized"
|
|
// @Failure 403 {string} string "forbidden"
|
|
// @Failure 500 {string} string "internal error"
|
|
// @Router /admin/archer/jobs [post]
|
|
// @Security BearerAuth
|
|
func AdminEnqueueArcherJob(db *gorm.DB, jobs *bg.Jobs) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
var in dto.EnqueueRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&in); err != nil {
|
|
utils.WriteError(w, http.StatusBadRequest, "bad_request", "invalid json")
|
|
return
|
|
}
|
|
in.Queue = strings.TrimSpace(in.Queue)
|
|
in.Type = strings.TrimSpace(in.Type)
|
|
if in.Queue == "" || in.Type == "" {
|
|
utils.WriteError(w, http.StatusBadRequest, "bad_request", "queue and type are required")
|
|
return
|
|
}
|
|
|
|
// Parse payload into generic 'args' for Archer.
|
|
var args any
|
|
if len(in.Payload) > 0 && string(in.Payload) != "null" {
|
|
if err := json.Unmarshal(in.Payload, &args); err != nil {
|
|
utils.WriteError(w, http.StatusBadRequest, "bad_request", "payload must be valid JSON")
|
|
return
|
|
}
|
|
}
|
|
|
|
id := uuid.NewString()
|
|
|
|
opts := []archer.FnOptions{
|
|
archer.WithMaxRetries(0), // adjust or expose in request if needed
|
|
}
|
|
if in.RunAt != nil {
|
|
opts = append(opts, archer.WithScheduleTime(*in.RunAt))
|
|
}
|
|
|
|
// Schedule with Archer (queue == worker name).
|
|
if _, err := jobs.Enqueue(context.Background(), id, in.Queue, args, opts...); err != nil {
|
|
utils.WriteError(w, http.StatusInternalServerError, "enqueue_failed", err.Error())
|
|
return
|
|
}
|
|
|
|
// Read back the just-created row.
|
|
var m models.Job
|
|
if err := db.First(&m, "id = ?", id).Error; err != nil {
|
|
// Fallback: return a synthesized job if row not visible yet.
|
|
now := time.Now()
|
|
out := dto.Job{
|
|
ID: id,
|
|
Type: in.Type,
|
|
Queue: in.Queue,
|
|
Status: dto.StatusQueued,
|
|
Attempts: 0,
|
|
MaxAttempts: 0,
|
|
CreatedAt: now,
|
|
UpdatedAt: &now,
|
|
RunAt: in.RunAt,
|
|
Payload: args,
|
|
}
|
|
utils.WriteJSON(w, http.StatusOK, out)
|
|
return
|
|
}
|
|
|
|
utils.WriteJSON(w, http.StatusOK, mapModelJobToDTO(m))
|
|
}
|
|
}
|
|
|
|
// AdminRetryArcherJob godoc
|
|
//
|
|
// @ID AdminRetryArcherJob
|
|
// @Summary Retry a failed/canceled Archer job (admin)
|
|
// @Description Marks the job retriable (DB flip). Swap this for an Archer admin call if you expose one.
|
|
// @Tags ArcherAdmin
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Param id path string true "Job ID"
|
|
// @Success 200 {object} dto.Job
|
|
// @Failure 400 {string} string "invalid job or not eligible"
|
|
// @Failure 401 {string} string "Unauthorized"
|
|
// @Failure 403 {string} string "forbidden"
|
|
// @Failure 404 {string} string "not found"
|
|
// @Router /admin/archer/jobs/{id}/retry [post]
|
|
// @Security BearerAuth
|
|
func AdminRetryArcherJob(db *gorm.DB) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
id := chi.URLParam(r, "id")
|
|
var m models.Job
|
|
if err := db.Clauses(clause.Locking{Strength: "UPDATE"}).First(&m, "id = ?", id).Error; err != nil {
|
|
if err == gorm.ErrRecordNotFound {
|
|
utils.WriteError(w, http.StatusNotFound, "not_found", "job not found")
|
|
return
|
|
}
|
|
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
|
return
|
|
}
|
|
|
|
// Only allow retry from failed/canceled (adjust as you see fit).
|
|
if m.Status != string(dto.StatusFailed) && m.Status != string(dto.StatusCanceled) {
|
|
utils.WriteError(w, http.StatusBadRequest, "not_eligible", "job is not failed/canceled")
|
|
return
|
|
}
|
|
|
|
// Reset to queued; clear started_at; bump updated_at.
|
|
now := time.Now()
|
|
if err := db.Model(&m).Updates(map[string]any{
|
|
"status": string(dto.StatusQueued),
|
|
"started_at": nil,
|
|
"updated_at": now,
|
|
}).Error; err != nil {
|
|
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
|
return
|
|
}
|
|
|
|
// Re-read and return.
|
|
if err := db.First(&m, "id = ?", id).Error; err != nil {
|
|
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
|
return
|
|
}
|
|
utils.WriteJSON(w, http.StatusOK, mapModelJobToDTO(m))
|
|
}
|
|
}
|
|
|
|
// AdminCancelArcherJob godoc
|
|
//
|
|
// @ID AdminCancelArcherJob
|
|
// @Summary Cancel an Archer job (admin)
|
|
// @Description Set job status to canceled if cancellable. For running jobs, this only affects future picks; wire to Archer if you need active kill.
|
|
// @Tags ArcherAdmin
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Param id path string true "Job ID"
|
|
// @Success 200 {object} dto.Job
|
|
// @Failure 400 {string} string "invalid job or not cancellable"
|
|
// @Failure 401 {string} string "Unauthorized"
|
|
// @Failure 403 {string} string "forbidden"
|
|
// @Failure 404 {string} string "not found"
|
|
// @Router /admin/archer/jobs/{id}/cancel [post]
|
|
// @Security BearerAuth
|
|
func AdminCancelArcherJob(db *gorm.DB) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
id := chi.URLParam(r, "id")
|
|
var m models.Job
|
|
if err := db.Clauses(clause.Locking{Strength: "UPDATE"}).First(&m, "id = ?", id).Error; err != nil {
|
|
if err == gorm.ErrRecordNotFound {
|
|
utils.WriteError(w, http.StatusNotFound, "not_found", "job not found")
|
|
return
|
|
}
|
|
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
|
return
|
|
}
|
|
|
|
// If already finished, bail.
|
|
switch m.Status {
|
|
case string(dto.StatusSucceeded), string(dto.StatusCanceled):
|
|
utils.WriteError(w, http.StatusBadRequest, "not_cancellable", "job already finished")
|
|
return
|
|
}
|
|
|
|
now := time.Now()
|
|
if err := db.Model(&m).Updates(map[string]any{
|
|
"status": string(dto.StatusCanceled),
|
|
"updated_at": now,
|
|
}).Error; err != nil {
|
|
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
|
return
|
|
}
|
|
|
|
if err := db.First(&m, "id = ?", id).Error; err != nil {
|
|
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
|
return
|
|
}
|
|
utils.WriteJSON(w, http.StatusOK, mapModelJobToDTO(m))
|
|
}
|
|
}
|
|
|
|
// AdminListArcherQueues godoc
|
|
//
|
|
// @ID AdminListArcherQueues
|
|
// @Summary List Archer queues (admin)
|
|
// @Description Summary metrics per queue (pending, running, failed, scheduled).
|
|
// @Tags ArcherAdmin
|
|
// @Produce json
|
|
// @Success 200 {array} dto.QueueInfo
|
|
// @Failure 401 {string} string "Unauthorized"
|
|
// @Failure 403 {string} string "forbidden"
|
|
// @Failure 500 {string} string "internal error"
|
|
// @Router /admin/archer/queues [get]
|
|
// @Security BearerAuth
|
|
func AdminListArcherQueues(db *gorm.DB) http.HandlerFunc {
|
|
type row struct {
|
|
QueueName string
|
|
Pending int
|
|
Running int
|
|
Failed int
|
|
Scheduled int
|
|
}
|
|
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
var rows []row
|
|
// Use filtered aggregate; adjust status values if your Archer differs.
|
|
if err := db.
|
|
Raw(`
|
|
SELECT
|
|
queue_name,
|
|
COUNT(*) FILTER (WHERE status = 'queued') AS pending,
|
|
COUNT(*) FILTER (WHERE status = 'running') AS running,
|
|
COUNT(*) FILTER (WHERE status = 'failed') AS failed,
|
|
COUNT(*) FILTER (WHERE status = 'scheduled') AS scheduled
|
|
FROM jobs
|
|
GROUP BY queue_name
|
|
ORDER BY queue_name ASC
|
|
`).Scan(&rows).Error; err != nil {
|
|
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
|
return
|
|
}
|
|
|
|
out := make([]dto.QueueInfo, 0, len(rows))
|
|
for _, r := range rows {
|
|
out = append(out, dto.QueueInfo{
|
|
Name: r.QueueName,
|
|
Pending: r.Pending,
|
|
Running: r.Running,
|
|
Failed: r.Failed,
|
|
Scheduled: r.Scheduled,
|
|
})
|
|
}
|
|
|
|
utils.WriteJSON(w, http.StatusOK, out)
|
|
}
|
|
}
|
|
|
|
// Helpers
|
|
func atoiDefault(s string, def int) int {
|
|
if s == "" {
|
|
return def
|
|
}
|
|
if n, err := strconv.Atoi(s); err == nil {
|
|
return n
|
|
}
|
|
return def
|
|
}
|
|
func clamp(n, lo, hi int) int {
|
|
if n < lo {
|
|
return lo
|
|
}
|
|
if n > hi {
|
|
return hi
|
|
}
|
|
return n
|
|
}
|
|
|
|
func mapModelJobToDTO(m models.Job) dto.Job {
|
|
var payload any
|
|
if len(m.Arguments) > 0 {
|
|
_ = json.Unmarshal([]byte(m.Arguments), &payload)
|
|
}
|
|
|
|
var updated *time.Time
|
|
if !m.UpdatedAt.IsZero() {
|
|
updated = &m.UpdatedAt
|
|
}
|
|
|
|
var runAt *time.Time
|
|
if !m.ScheduledAt.IsZero() {
|
|
rt := m.ScheduledAt
|
|
runAt = &rt
|
|
}
|
|
|
|
return dto.Job{
|
|
ID: m.ID,
|
|
// If you distinguish between queue and type elsewhere, set Type accordingly.
|
|
Type: m.QueueName,
|
|
Queue: m.QueueName,
|
|
Status: dto.JobStatus(m.Status),
|
|
Attempts: m.RetryCount,
|
|
MaxAttempts: m.MaxRetry,
|
|
CreatedAt: m.CreatedAt,
|
|
UpdatedAt: updated,
|
|
LastError: m.LastError,
|
|
RunAt: runAt,
|
|
Payload: payload,
|
|
}
|
|
}
|