mirror of
https://github.com/GlueOps/autoglue.git
synced 2026-02-13 04:40:05 +01:00
feat: adding background jobs ui page and apis - requires user is_admin to be set to true
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/glueops/autoglue/internal/api/httpmiddleware"
|
||||
"github.com/glueops/autoglue/internal/common"
|
||||
"github.com/glueops/autoglue/internal/handlers/dto"
|
||||
"github.com/glueops/autoglue/internal/models"
|
||||
"github.com/glueops/autoglue/internal/utils"
|
||||
@@ -71,7 +73,6 @@ func ListAnnotations(db *gorm.DB) http.HandlerFunc {
|
||||
// @Produce json
|
||||
// @Param X-Org-ID header string false "Organization UUID"
|
||||
// @Param id path string true "Annotation ID (UUID)"
|
||||
// @Param include query string false "Optional: node_pools"
|
||||
// @Success 200 {object} dto.AnnotationResponse
|
||||
// @Failure 400 {string} string "invalid id"
|
||||
// @Failure 401 {string} string "Unauthorized"
|
||||
@@ -108,3 +109,174 @@ func GetAnnotation(db *gorm.DB) http.HandlerFunc {
|
||||
utils.WriteJSON(w, http.StatusOK, out)
|
||||
}
|
||||
}
|
||||
|
||||
// CreateAnnotation godoc
|
||||
// @ID CreateAnnotation
|
||||
// @Summary Create annotation (org scoped)
|
||||
// @Description Creates an annotation.
|
||||
// @Tags Annotations
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param X-Org-ID header string false "Organization UUID"
|
||||
// @Param body body dto.CreateAnnotationRequest true "Annotation payload"
|
||||
// @Success 201 {object} dto.AnnotationResponse
|
||||
// @Failure 400 {string} string "invalid json / missing fields"
|
||||
// @Failure 401 {string} string "Unauthorized"
|
||||
// @Failure 403 {string} string "organization required"
|
||||
// @Failure 500 {string} string "create failed"
|
||||
// @Router /annotations [post]
|
||||
// @Security BearerAuth
|
||||
// @Security OrgKeyAuth
|
||||
// @Security OrgSecretAuth
|
||||
func CreateAnnotation(db *gorm.DB) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
orgID, ok := httpmiddleware.OrgIDFrom(r.Context())
|
||||
if !ok {
|
||||
utils.WriteError(w, http.StatusForbidden, "org_required", "specify X-Org-ID")
|
||||
return
|
||||
}
|
||||
|
||||
var req dto.CreateAnnotationRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
utils.WriteError(w, http.StatusBadRequest, "bad_request", "bad request")
|
||||
return
|
||||
}
|
||||
|
||||
req.Key = strings.TrimSpace(req.Key)
|
||||
req.Value = strings.TrimSpace(req.Value)
|
||||
|
||||
if req.Key == "" || req.Value == "" {
|
||||
utils.WriteError(w, http.StatusBadRequest, "bad_request", "missing key/value")
|
||||
return
|
||||
}
|
||||
|
||||
a := models.Annotation{
|
||||
AuditFields: common.AuditFields{OrganizationID: orgID},
|
||||
Key: req.Key,
|
||||
Value: req.Value,
|
||||
}
|
||||
|
||||
if err := db.Create(&a).Error; err != nil {
|
||||
utils.WriteError(w, http.StatusInternalServerError, "db_error", "db error")
|
||||
return
|
||||
}
|
||||
|
||||
out := dto.AnnotationResponse{
|
||||
AuditFields: a.AuditFields,
|
||||
Key: a.Key,
|
||||
Value: a.Value,
|
||||
}
|
||||
utils.WriteJSON(w, http.StatusCreated, out)
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateAnnotation godoc
|
||||
// @ID UpdateAnnotation
|
||||
// @Summary Update annotation (org scoped)
|
||||
// @Description Partially update annotation fields.
|
||||
// @Tags Annotations
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param X-Org-ID header string false "Organization UUID"
|
||||
// @Param id path string true "Annotation ID (UUID)"
|
||||
// @Param body body dto.UpdateAnnotationRequest true "Fields to update"
|
||||
// @Success 200 {object} dto.AnnotationResponse
|
||||
// @Failure 400 {string} string "invalid id / invalid json"
|
||||
// @Failure 401 {string} string "Unauthorized"
|
||||
// @Failure 403 {string} string "organization required"
|
||||
// @Failure 404 {string} string "not found"
|
||||
// @Failure 500 {string} string "update failed"
|
||||
// @Router /annotations/{id} [patch]
|
||||
// @Security BearerAuth
|
||||
// @Security OrgKeyAuth
|
||||
// @Security OrgSecretAuth
|
||||
func UpdateAnnotation(db *gorm.DB) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
orgID, ok := httpmiddleware.OrgIDFrom(r.Context())
|
||||
if !ok {
|
||||
utils.WriteError(w, http.StatusForbidden, "org_required", "specify X-Org-ID")
|
||||
return
|
||||
}
|
||||
|
||||
id, err := uuid.Parse(chi.URLParam(r, "id"))
|
||||
if err != nil {
|
||||
utils.WriteError(w, http.StatusBadRequest, "bad_request", "bad request")
|
||||
return
|
||||
}
|
||||
|
||||
var a models.Annotation
|
||||
if err := db.Where("id = ? AND organization_id = ?", id, orgID).First(&a).Error; err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
utils.WriteError(w, http.StatusNotFound, "not_found", "not_found")
|
||||
return
|
||||
}
|
||||
utils.WriteError(w, http.StatusInternalServerError, "db_error", "db error")
|
||||
return
|
||||
}
|
||||
|
||||
var req dto.UpdateAnnotationRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
utils.WriteError(w, http.StatusBadRequest, "bad_request", "bad request")
|
||||
return
|
||||
}
|
||||
|
||||
if req.Key != nil {
|
||||
a.Key = strings.TrimSpace(*req.Key)
|
||||
}
|
||||
if req.Value != nil {
|
||||
a.Value = strings.TrimSpace(*req.Value)
|
||||
}
|
||||
|
||||
if err := db.Save(&a).Error; err != nil {
|
||||
utils.WriteError(w, http.StatusInternalServerError, "db_error", "db error")
|
||||
return
|
||||
}
|
||||
|
||||
out := dto.AnnotationResponse{
|
||||
AuditFields: a.AuditFields,
|
||||
Key: a.Key,
|
||||
Value: a.Value,
|
||||
}
|
||||
utils.WriteJSON(w, http.StatusOK, out)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteAnnotation godoc
|
||||
// @ID DeleteAnnotation
|
||||
// @Summary Delete annotation (org scoped)
|
||||
// @Description Permanently deletes the annotation.
|
||||
// @Tags Annotations
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param X-Org-ID header string false "Organization UUID"
|
||||
// @Param id path string true "Annotation ID (UUID)"
|
||||
// @Success 204 {string} string "No Content"
|
||||
// @Failure 400 {string} string "invalid id"
|
||||
// @Failure 401 {string} string "Unauthorized"
|
||||
// @Failure 403 {string} string "organization required"
|
||||
// @Failure 500 {string} string "delete failed"
|
||||
// @Router /annotations/{id} [delete]
|
||||
// @Security BearerAuth
|
||||
// @Security OrgKeyAuth
|
||||
// @Security OrgSecretAuth
|
||||
func DeleteAnnotation(db *gorm.DB) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
orgID, ok := httpmiddleware.OrgIDFrom(r.Context())
|
||||
if !ok {
|
||||
utils.WriteError(w, http.StatusForbidden, "org_required", "specify X-Org-ID")
|
||||
return
|
||||
}
|
||||
|
||||
id, err := uuid.Parse(chi.URLParam(r, "id"))
|
||||
if err != nil {
|
||||
utils.WriteError(w, http.StatusBadRequest, "bad_request", "bad request")
|
||||
return
|
||||
}
|
||||
|
||||
if err := db.Where("id = ? AND organization_id = ?", id, orgID).Delete(&models.Annotation{}).Error; err != nil {
|
||||
utils.WriteError(w, http.StatusInternalServerError, "db_error", "db error")
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
}
|
||||
|
||||
62
internal/handlers/dto/jobs.go
Normal file
62
internal/handlers/dto/jobs.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package dto
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
type JobStatus string
|
||||
|
||||
const (
|
||||
StatusQueued JobStatus = "queued"
|
||||
StatusRunning JobStatus = "running"
|
||||
StatusSucceeded JobStatus = "succeeded"
|
||||
StatusFailed JobStatus = "failed"
|
||||
StatusCanceled JobStatus = "canceled"
|
||||
StatusRetrying JobStatus = "retrying"
|
||||
StatusScheduled JobStatus = "scheduled"
|
||||
)
|
||||
|
||||
// Job represents a background job managed by Archer.
|
||||
// swagger:model Job
|
||||
type Job struct {
|
||||
ID string `json:"id" example:"01HF7SZK8Z8WG1M3J7S2Z8M2N6"`
|
||||
Type string `json:"type" example:"email.send"`
|
||||
Queue string `json:"queue" example:"default"`
|
||||
Status JobStatus `json:"status" example:"queued" enums:"queued|running|succeeded|failed|canceled|retrying|scheduled"`
|
||||
Attempts int `json:"attempts" example:"0"`
|
||||
MaxAttempts int `json:"max_attempts,omitempty" example:"3"`
|
||||
CreatedAt time.Time `json:"created_at" example:"2025-11-04T09:30:00Z"`
|
||||
UpdatedAt *time.Time `json:"updated_at,omitempty" example:"2025-11-04T09:30:00Z"`
|
||||
LastError *string `json:"last_error,omitempty" example:"error message"`
|
||||
RunAt *time.Time `json:"run_at,omitempty" example:"2025-11-04T09:30:00Z"`
|
||||
Payload any `json:"payload,omitempty"`
|
||||
}
|
||||
|
||||
// QueueInfo holds queue-level counts.
|
||||
// swagger:model QueueInfo
|
||||
type QueueInfo struct {
|
||||
Name string `json:"name" example:"default"`
|
||||
Pending int `json:"pending" example:"42"`
|
||||
Running int `json:"running" example:"3"`
|
||||
Failed int `json:"failed" example:"5"`
|
||||
Scheduled int `json:"scheduled" example:"7"`
|
||||
}
|
||||
|
||||
// PageJob is a concrete paginated response for Job (generics not supported by swag).
|
||||
// swagger:model PageJob
|
||||
type PageJob struct {
|
||||
Items []Job `json:"items"`
|
||||
Total int `json:"total" example:"120"`
|
||||
Page int `json:"page" example:"1"`
|
||||
PageSize int `json:"page_size" example:"25"`
|
||||
}
|
||||
|
||||
// EnqueueRequest is the POST body for creating a job.
|
||||
// swagger:model EnqueueRequest
|
||||
type EnqueueRequest struct {
|
||||
Queue string `json:"queue" example:"default"`
|
||||
Type string `json:"type" example:"email.send"`
|
||||
Payload json.RawMessage `json:"payload"`
|
||||
RunAt *time.Time `json:"run_at" example:"2025-11-05T08:00:00Z"`
|
||||
}
|
||||
383
internal/handlers/jobs.go
Normal file
383
internal/handlers/jobs.go
Normal file
@@ -0,0 +1,383 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/dyaksa/archer"
|
||||
"github.com/glueops/autoglue/internal/bg"
|
||||
"github.com/glueops/autoglue/internal/handlers/dto"
|
||||
"github.com/glueops/autoglue/internal/models"
|
||||
"github.com/glueops/autoglue/internal/utils"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
)
|
||||
|
||||
// AdminListArcherJobs godoc
|
||||
// @ID AdminListArcherJobs
|
||||
// @Summary List Archer jobs (admin)
|
||||
// @Description Paginated background jobs with optional filters. Search `q` may match id, type, error, payload (implementation-dependent).
|
||||
// @Tags ArcherAdmin
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param status query string false "Filter by status" Enums(queued,running,succeeded,failed,canceled,retrying,scheduled)
|
||||
// @Param queue query string false "Filter by queue name / worker name"
|
||||
// @Param q query string false "Free-text search"
|
||||
// @Param page query int false "Page number" default(1)
|
||||
// @Param page_size query int false "Items per page" minimum(1) maximum(100) default(25)
|
||||
// @Success 200 {object} dto.PageJob
|
||||
// @Failure 401 {string} string "Unauthorized"
|
||||
// @Failure 403 {string} string "forbidden"
|
||||
// @Failure 500 {string} string "internal error"
|
||||
// @Router /admin/archer/jobs [get]
|
||||
// @Security BearerAuth
|
||||
func AdminListArcherJobs(db *gorm.DB) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
status := strings.TrimSpace(r.URL.Query().Get("status"))
|
||||
queue := strings.TrimSpace(r.URL.Query().Get("queue"))
|
||||
q := strings.TrimSpace(r.URL.Query().Get("q"))
|
||||
page := atoiDefault(r.URL.Query().Get("page"), 1)
|
||||
size := clamp(atoiDefault(r.URL.Query().Get("page_size"), 25), 1, 100)
|
||||
|
||||
base := db.Model(&models.Job{})
|
||||
if status != "" {
|
||||
base = base.Where("status = ?", status)
|
||||
}
|
||||
if queue != "" {
|
||||
base = base.Where("queue_name = ?", queue)
|
||||
}
|
||||
if q != "" {
|
||||
like := "%" + q + "%"
|
||||
base = base.Where(
|
||||
db.Where("id ILIKE ?", like).
|
||||
Or("queue_name ILIKE ?", like).
|
||||
Or("COALESCE(last_error,'') ILIKE ?", like).
|
||||
Or("CAST(arguments AS TEXT) ILIKE ?", like),
|
||||
)
|
||||
}
|
||||
|
||||
var total int64
|
||||
if err := base.Count(&total).Error; err != nil {
|
||||
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
var rows []models.Job
|
||||
offset := (page - 1) * size
|
||||
if err := base.Order("created_at DESC").Limit(size).Offset(offset).Find(&rows).Error; err != nil {
|
||||
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
items := make([]dto.Job, 0, len(rows))
|
||||
for _, m := range rows {
|
||||
items = append(items, mapModelJobToDTO(m))
|
||||
}
|
||||
|
||||
utils.WriteJSON(w, http.StatusOK, dto.PageJob{
|
||||
Items: items,
|
||||
Total: int(total),
|
||||
Page: page,
|
||||
PageSize: size,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// AdminEnqueueArcherJob godoc
|
||||
// @ID AdminEnqueueArcherJob
|
||||
// @Summary Enqueue a new Archer job (admin)
|
||||
// @Description Create a job immediately or schedule it for the future via `run_at`.
|
||||
// @Tags ArcherAdmin
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param body body dto.EnqueueRequest true "Job parameters"
|
||||
// @Success 200 {object} dto.Job
|
||||
// @Failure 400 {string} string "invalid json or missing fields"
|
||||
// @Failure 401 {string} string "Unauthorized"
|
||||
// @Failure 403 {string} string "forbidden"
|
||||
// @Failure 500 {string} string "internal error"
|
||||
// @Router /admin/archer/jobs [post]
|
||||
// @Security BearerAuth
|
||||
func AdminEnqueueArcherJob(db *gorm.DB, jobs *bg.Jobs) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
var in dto.EnqueueRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&in); err != nil {
|
||||
utils.WriteError(w, http.StatusBadRequest, "bad_request", "invalid json")
|
||||
return
|
||||
}
|
||||
in.Queue = strings.TrimSpace(in.Queue)
|
||||
in.Type = strings.TrimSpace(in.Type)
|
||||
if in.Queue == "" || in.Type == "" {
|
||||
utils.WriteError(w, http.StatusBadRequest, "bad_request", "queue and type are required")
|
||||
return
|
||||
}
|
||||
|
||||
// Parse payload into generic 'args' for Archer.
|
||||
var args any
|
||||
if len(in.Payload) > 0 && string(in.Payload) != "null" {
|
||||
if err := json.Unmarshal(in.Payload, &args); err != nil {
|
||||
utils.WriteError(w, http.StatusBadRequest, "bad_request", "payload must be valid JSON")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
id := uuid.NewString()
|
||||
|
||||
opts := []archer.FnOptions{
|
||||
archer.WithMaxRetries(0), // adjust or expose in request if needed
|
||||
}
|
||||
if in.RunAt != nil {
|
||||
opts = append(opts, archer.WithScheduleTime(*in.RunAt))
|
||||
}
|
||||
|
||||
// Schedule with Archer (queue == worker name).
|
||||
if _, err := jobs.Enqueue(context.Background(), id, in.Queue, args, opts...); err != nil {
|
||||
utils.WriteError(w, http.StatusInternalServerError, "enqueue_failed", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Read back the just-created row.
|
||||
var m models.Job
|
||||
if err := db.First(&m, "id = ?", id).Error; err != nil {
|
||||
// Fallback: return a synthesized job if row not visible yet.
|
||||
now := time.Now()
|
||||
out := dto.Job{
|
||||
ID: id,
|
||||
Type: in.Type,
|
||||
Queue: in.Queue,
|
||||
Status: dto.StatusQueued,
|
||||
Attempts: 0,
|
||||
MaxAttempts: 0,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: &now,
|
||||
RunAt: in.RunAt,
|
||||
Payload: args,
|
||||
}
|
||||
utils.WriteJSON(w, http.StatusOK, out)
|
||||
return
|
||||
}
|
||||
|
||||
utils.WriteJSON(w, http.StatusOK, mapModelJobToDTO(m))
|
||||
}
|
||||
}
|
||||
|
||||
// AdminRetryArcherJob godoc
|
||||
// @ID AdminRetryArcherJob
|
||||
// @Summary Retry a failed/canceled Archer job (admin)
|
||||
// @Description Marks the job retriable (DB flip). Swap this for an Archer admin call if you expose one.
|
||||
// @Tags ArcherAdmin
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param id path string true "Job ID"
|
||||
// @Success 200 {object} dto.Job
|
||||
// @Failure 400 {string} string "invalid job or not eligible"
|
||||
// @Failure 401 {string} string "Unauthorized"
|
||||
// @Failure 403 {string} string "forbidden"
|
||||
// @Failure 404 {string} string "not found"
|
||||
// @Router /admin/archer/jobs/{id}/retry [post]
|
||||
// @Security BearerAuth
|
||||
func AdminRetryArcherJob(db *gorm.DB) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
var m models.Job
|
||||
if err := db.Clauses(clause.Locking{Strength: "UPDATE"}).First(&m, "id = ?", id).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
utils.WriteError(w, http.StatusNotFound, "not_found", "job not found")
|
||||
return
|
||||
}
|
||||
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Only allow retry from failed/canceled (adjust as you see fit).
|
||||
if m.Status != string(dto.StatusFailed) && m.Status != string(dto.StatusCanceled) {
|
||||
utils.WriteError(w, http.StatusBadRequest, "not_eligible", "job is not failed/canceled")
|
||||
return
|
||||
}
|
||||
|
||||
// Reset to queued; clear started_at; bump updated_at.
|
||||
now := time.Now()
|
||||
if err := db.Model(&m).Updates(map[string]any{
|
||||
"status": string(dto.StatusQueued),
|
||||
"started_at": nil,
|
||||
"updated_at": now,
|
||||
}).Error; err != nil {
|
||||
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Re-read and return.
|
||||
if err := db.First(&m, "id = ?", id).Error; err != nil {
|
||||
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
||||
return
|
||||
}
|
||||
utils.WriteJSON(w, http.StatusOK, mapModelJobToDTO(m))
|
||||
}
|
||||
}
|
||||
|
||||
// AdminCancelArcherJob godoc
|
||||
// @ID AdminCancelArcherJob
|
||||
// @Summary Cancel an Archer job (admin)
|
||||
// @Description Set job status to canceled if cancellable. For running jobs, this only affects future picks; wire to Archer if you need active kill.
|
||||
// @Tags ArcherAdmin
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param id path string true "Job ID"
|
||||
// @Success 200 {object} dto.Job
|
||||
// @Failure 400 {string} string "invalid job or not cancellable"
|
||||
// @Failure 401 {string} string "Unauthorized"
|
||||
// @Failure 403 {string} string "forbidden"
|
||||
// @Failure 404 {string} string "not found"
|
||||
// @Router /admin/archer/jobs/{id}/cancel [post]
|
||||
// @Security BearerAuth
|
||||
func AdminCancelArcherJob(db *gorm.DB) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
var m models.Job
|
||||
if err := db.Clauses(clause.Locking{Strength: "UPDATE"}).First(&m, "id = ?", id).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
utils.WriteError(w, http.StatusNotFound, "not_found", "job not found")
|
||||
return
|
||||
}
|
||||
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// If already finished, bail.
|
||||
switch m.Status {
|
||||
case string(dto.StatusSucceeded), string(dto.StatusCanceled):
|
||||
utils.WriteError(w, http.StatusBadRequest, "not_cancellable", "job already finished")
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
if err := db.Model(&m).Updates(map[string]any{
|
||||
"status": string(dto.StatusCanceled),
|
||||
"updated_at": now,
|
||||
}).Error; err != nil {
|
||||
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if err := db.First(&m, "id = ?", id).Error; err != nil {
|
||||
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
||||
return
|
||||
}
|
||||
utils.WriteJSON(w, http.StatusOK, mapModelJobToDTO(m))
|
||||
}
|
||||
}
|
||||
|
||||
// AdminListArcherQueues godoc
|
||||
// @ID AdminListArcherQueues
|
||||
// @Summary List Archer queues (admin)
|
||||
// @Description Summary metrics per queue (pending, running, failed, scheduled).
|
||||
// @Tags ArcherAdmin
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Success 200 {array} dto.QueueInfo
|
||||
// @Failure 401 {string} string "Unauthorized"
|
||||
// @Failure 403 {string} string "forbidden"
|
||||
// @Failure 500 {string} string "internal error"
|
||||
// @Router /admin/archer/queues [get]
|
||||
// @Security BearerAuth
|
||||
func AdminListArcherQueues(db *gorm.DB) http.HandlerFunc {
|
||||
type row struct {
|
||||
QueueName string
|
||||
Pending int
|
||||
Running int
|
||||
Failed int
|
||||
Scheduled int
|
||||
}
|
||||
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
var rows []row
|
||||
// Use filtered aggregate; adjust status values if your Archer differs.
|
||||
if err := db.
|
||||
Raw(`
|
||||
SELECT
|
||||
queue_name,
|
||||
COUNT(*) FILTER (WHERE status = 'queued') AS pending,
|
||||
COUNT(*) FILTER (WHERE status = 'running') AS running,
|
||||
COUNT(*) FILTER (WHERE status = 'failed') AS failed,
|
||||
COUNT(*) FILTER (WHERE status = 'scheduled') AS scheduled
|
||||
FROM jobs
|
||||
GROUP BY queue_name
|
||||
ORDER BY queue_name ASC
|
||||
`).Scan(&rows).Error; err != nil {
|
||||
utils.WriteError(w, http.StatusInternalServerError, "db_error", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
out := make([]dto.QueueInfo, 0, len(rows))
|
||||
for _, r := range rows {
|
||||
out = append(out, dto.QueueInfo{
|
||||
Name: r.QueueName,
|
||||
Pending: r.Pending,
|
||||
Running: r.Running,
|
||||
Failed: r.Failed,
|
||||
Scheduled: r.Scheduled,
|
||||
})
|
||||
}
|
||||
|
||||
utils.WriteJSON(w, http.StatusOK, out)
|
||||
}
|
||||
}
|
||||
|
||||
// Helpers
|
||||
func atoiDefault(s string, def int) int {
|
||||
if s == "" {
|
||||
return def
|
||||
}
|
||||
if n, err := strconv.Atoi(s); err == nil {
|
||||
return n
|
||||
}
|
||||
return def
|
||||
}
|
||||
func clamp(n, lo, hi int) int {
|
||||
if n < lo {
|
||||
return lo
|
||||
}
|
||||
if n > hi {
|
||||
return hi
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func mapModelJobToDTO(m models.Job) dto.Job {
|
||||
var payload any
|
||||
if len(m.Arguments) > 0 {
|
||||
_ = json.Unmarshal([]byte(m.Arguments), &payload)
|
||||
}
|
||||
|
||||
var updated *time.Time
|
||||
if !m.UpdatedAt.IsZero() {
|
||||
updated = &m.UpdatedAt
|
||||
}
|
||||
|
||||
var runAt *time.Time
|
||||
if !m.ScheduledAt.IsZero() {
|
||||
rt := m.ScheduledAt
|
||||
runAt = &rt
|
||||
}
|
||||
|
||||
return dto.Job{
|
||||
ID: m.ID,
|
||||
// If you distinguish between queue and type elsewhere, set Type accordingly.
|
||||
Type: m.QueueName,
|
||||
Queue: m.QueueName,
|
||||
Status: dto.JobStatus(m.Status),
|
||||
Attempts: m.RetryCount,
|
||||
MaxAttempts: m.MaxRetry,
|
||||
CreatedAt: m.CreatedAt,
|
||||
UpdatedAt: updated,
|
||||
LastError: m.LastError,
|
||||
RunAt: runAt,
|
||||
Payload: payload,
|
||||
}
|
||||
}
|
||||
@@ -40,7 +40,7 @@ func GetMe(db *gorm.DB) http.HandlerFunc {
|
||||
}
|
||||
|
||||
var emails []models.UserEmail
|
||||
_ = db.Where("user_id = ?", user.ID).Order("is_primary desc, created_at asc").Find(&emails).Error
|
||||
_ = db.Preload("User").Where("user_id = ?", user.ID).Order("is_primary desc, created_at asc").Find(&emails).Error
|
||||
|
||||
var orgs []models.Organization
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user