feat: adding background jobs, Dockerfile

This commit is contained in:
allanice001
2025-11-04 16:32:54 +00:00
parent 2170b9a945
commit 19d5cf7aab
34 changed files with 1269 additions and 148 deletions

View File

@@ -0,0 +1,53 @@
package bg
import (
"context"
"time"
"github.com/dyaksa/archer"
"github.com/dyaksa/archer/job"
"github.com/google/uuid"
"gorm.io/gorm"
)
type CleanupArgs struct {
RetainDays int `json:"retain_days"`
Table string `json:"table"`
}
type JobRow struct {
ID string `gorm:"primaryKey"`
Status string
UpdatedAt time.Time
}
func (JobRow) TableName() string { return "jobs" }
func CleanupWorker(gdb *gorm.DB, jobs *Jobs, retainDays int) archer.WorkerFn {
return func(ctx context.Context, j job.Job) (any, error) {
if err := CleanupJobs(gdb, retainDays); err != nil {
return nil, err
}
// schedule tomorrow 03:30
next := time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour).Add(3*time.Hour + 30*time.Minute)
_, _ = jobs.Enqueue(
ctx,
uuid.NewString(),
"archer_cleanup",
CleanupArgs{RetainDays: retainDays, Table: "jobs"},
archer.WithScheduleTime(next),
archer.WithMaxRetries(1),
)
return nil, nil
}
}
func CleanupJobs(db *gorm.DB, retainDays int) error {
cutoff := time.Now().AddDate(0, 0, -retainDays)
return db.
Where("status IN ?", []string{"success", "failed", "cancelled"}).
Where("updated_at < ?", cutoff).
Delete(&JobRow{}).Error
}

274
internal/bg/bastion.go Normal file
View File

@@ -0,0 +1,274 @@
package bg
import (
"context"
"fmt"
"log"
"net"
"strings"
"time"
"github.com/dyaksa/archer"
"github.com/dyaksa/archer/job"
"github.com/glueops/autoglue/internal/models"
"github.com/glueops/autoglue/internal/utils"
"github.com/google/uuid"
"golang.org/x/crypto/ssh"
"gorm.io/gorm"
)
// ----- Public types -----
type BastionBootstrapArgs struct{}
type BastionBootstrapFailure struct {
ID uuid.UUID `json:"id"`
Step string `json:"step"`
Reason string `json:"reason"`
}
type BastionBootstrapResult struct {
Status string `json:"status"`
Processed int `json:"processed"`
Ready int `json:"ready"`
Failed int `json:"failed"`
ElapsedMs int `json:"elapsed_ms"`
FailedServer []uuid.UUID `json:"failed_server_ids"`
Failures []BastionBootstrapFailure `json:"failures"`
}
// ----- Worker -----
func BastionBootstrapWorker(db *gorm.DB) archer.WorkerFn {
return func(ctx context.Context, j job.Job) (any, error) {
jobID := j.ID
start := time.Now()
var servers []models.Server
if err := db.
Preload("SshKey").
Where("role = ? AND status = ?", "bastion", "pending").
Find(&servers).Error; err != nil {
log.Printf("[bastion] level=ERROR job=%s step=query msg=%q", jobID, err)
return nil, err
}
log.Printf("[bastion] level=INFO job=%s step=start count=%d", jobID, len(servers))
proc, ok, fail := 0, 0, 0
var failedIDs []uuid.UUID
var failures []BastionBootstrapFailure
perHostTimeout := 8 * time.Minute
for i := range servers {
s := &servers[i]
hostStart := time.Now()
proc++
// 1) Defensive IP check
if s.PublicIPAddress == nil || *s.PublicIPAddress == "" {
fail++
failedIDs = append(failedIDs, s.ID)
failures = append(failures, BastionBootstrapFailure{ID: s.ID, Step: "ip_check", Reason: "missing public ip"})
logHostErr(jobID, s, "ip_check", fmt.Errorf("missing public ip"))
_ = setServerStatus(db, s.ID, "failed")
continue
}
// 2) Move to provisioning
if err := setServerStatus(db, s.ID, "provisioning"); err != nil {
fail++
failedIDs = append(failedIDs, s.ID)
failures = append(failures, BastionBootstrapFailure{ID: s.ID, Step: "set_provisioning", Reason: err.Error()})
logHostErr(jobID, s, "set_provisioning", err)
continue
}
// 3) Decrypt private key for org
privKey, err := utils.DecryptForOrg(
s.OrganizationID,
s.SshKey.EncryptedPrivateKey,
s.SshKey.PrivateIV,
s.SshKey.PrivateTag,
db,
)
if err != nil {
fail++
failedIDs = append(failedIDs, s.ID)
failures = append(failures, BastionBootstrapFailure{ID: s.ID, Step: "decrypt_key", Reason: err.Error()})
logHostErr(jobID, s, "decrypt_key", err)
_ = setServerStatus(db, s.ID, "failed")
continue
}
// 4) SSH + install docker
host := net.JoinHostPort(*s.PublicIPAddress, "22")
runCtx, cancel := context.WithTimeout(ctx, perHostTimeout)
out, err := sshInstallDockerWithOutput(runCtx, host, s.SSHUser, []byte(privKey))
cancel()
if err != nil {
fail++
failedIDs = append(failedIDs, s.ID)
failures = append(failures, BastionBootstrapFailure{ID: s.ID, Step: "ssh_install", Reason: err.Error()})
// include a short tail of output to speed debugging without flooding logs
tail := out
if len(tail) > 800 {
tail = tail[len(tail)-800:]
}
logHostErr(jobID, s, "ssh_install", fmt.Errorf("%v | tail=%q", err, tail))
_ = setServerStatus(db, s.ID, "failed")
continue
}
// 5) Mark ready
if err := setServerStatus(db, s.ID, "ready"); err != nil {
fail++
failedIDs = append(failedIDs, s.ID)
failures = append(failures, BastionBootstrapFailure{ID: s.ID, Step: "set_ready", Reason: err.Error()})
logHostErr(jobID, s, "set_ready", err)
_ = setServerStatus(db, s.ID, "failed")
continue
}
ok++
logHostInfo(jobID, s, "done", "host completed",
"elapsed_ms", time.Since(hostStart).Milliseconds())
}
res := BastionBootstrapResult{
Status: "ok",
Processed: proc,
Ready: ok,
Failed: fail,
ElapsedMs: int(time.Since(start).Milliseconds()),
FailedServer: failedIDs,
Failures: failures,
}
log.Printf("[bastion] level=INFO job=%s step=finish processed=%d ready=%d failed=%d elapsed_ms=%d",
jobID, proc, ok, fail, res.ElapsedMs)
return res, nil
}
}
// ----- Helpers -----
func setServerStatus(db *gorm.DB, id uuid.UUID, status string) error {
return db.Model(&models.Server{}).
Where("id = ?", id).
Updates(map[string]any{
"status": status,
"updated_at": time.Now(),
}).Error
}
// uniform log helpers for consistent, greppable output
func logHostErr(jobID string, s *models.Server, step string, err error) {
ip := ""
if s.PublicIPAddress != nil {
ip = *s.PublicIPAddress
}
log.Printf("[bastion] level=ERROR job=%s server_id=%s host=%s step=%s msg=%q",
jobID, s.ID, ip, step, err)
}
func logHostInfo(jobID string, s *models.Server, step, msg string, kv ...any) {
ip := ""
if s.PublicIPAddress != nil {
ip = *s.PublicIPAddress
}
log.Printf("[bastion] level=INFO job=%s server_id=%s host=%s step=%s %s kv=%v",
jobID, s.ID, ip, step, msg, kv)
}
// ----- SSH & command execution -----
// returns combined stdout/stderr so caller can log it on error
// returns combined stdout/stderr so caller can log it on error
func sshInstallDockerWithOutput(ctx context.Context, host, user string, privateKeyPEM []byte) (string, error) {
signer, err := ssh.ParsePrivateKey(privateKeyPEM)
if err != nil {
return "", fmt.Errorf("parse private key: %w", err)
}
config := &ssh.ClientConfig{
User: user,
Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(), // TODO: known_hosts verification
Timeout: 30 * time.Second,
}
// context-aware dial
dialer := &net.Dialer{}
conn, err := dialer.DialContext(ctx, "tcp", host)
if err != nil {
return "", fmt.Errorf("dial: %w", err)
}
defer conn.Close()
c, chans, reqs, err := ssh.NewClientConn(conn, host, config)
if err != nil {
return "", fmt.Errorf("ssh handshake: %w", err)
}
client := ssh.NewClient(c, chans, reqs)
defer client.Close()
sess, err := client.NewSession()
if err != nil {
return "", fmt.Errorf("session: %w", err)
}
defer sess.Close()
// --- script to run remotely (no extra quoting) ---
script := `
set -euxo pipefail
if ! command -v docker >/dev/null 2>&1; then
curl -fsSL https://get.docker.com | sh
fi
# try to enable/start (handles distros with systemd)
if command -v systemctl >/dev/null 2>&1; then
sudo systemctl enable --now docker || true
fi
# add current ssh user to docker group if exists
if getent group docker >/dev/null 2>&1; then
sudo usermod -aG docker "$(id -un)" || true
fi
`
// Send script via stdin to avoid quoting/escaping issues
sess.Stdin = strings.NewReader(script)
// Capture combined stdout+stderr
out, runErr := sess.CombinedOutput("bash -s")
return string(out), wrapSSHError(runErr, string(out))
}
// annotate common SSH/remote failure modes to speed triage
func wrapSSHError(err error, output string) error {
if err == nil {
return nil
}
switch {
case strings.Contains(output, "Could not resolve host"):
return fmt.Errorf("remote run: name resolution failed: %w", err)
case strings.Contains(output, "Permission denied"):
return fmt.Errorf("remote run: permission denied (check user/key/authorized_keys): %w", err)
case strings.Contains(output, "apt-get"):
return fmt.Errorf("remote run: apt failed: %w", err)
case strings.Contains(output, "yum"):
return fmt.Errorf("remote run: yum failed: %w", err)
default:
return fmt.Errorf("remote run: %w", err)
}
}
// super simple escaping for a here-string; avoids quoting hell
func sshEscape(s string) string {
return fmt.Sprintf("%q", s)
}

105
internal/bg/bg.go Normal file
View File

@@ -0,0 +1,105 @@
package bg
import (
"context"
"net"
"net/url"
"strings"
"time"
"github.com/dyaksa/archer"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
"gorm.io/gorm"
)
type Jobs struct{ Client *archer.Client }
func archerOptionsFromDSN(dsn string) (*archer.Options, error) {
u, err := url.Parse(dsn)
if err != nil {
return nil, err
}
var user, pass string
if u.User != nil {
user = u.User.Username()
pass, _ = u.User.Password()
}
host := u.Host
if !strings.Contains(host, ":") {
host = net.JoinHostPort(host, "5432")
}
return &archer.Options{
Addr: host,
User: user,
Password: pass,
DBName: strings.TrimPrefix(u.Path, "/"),
SSL: u.Query().Get("sslmode"), // forward sslmode
}, nil
}
func NewJobs(gdb *gorm.DB, dbUrl string) (*Jobs, error) {
opts, err := archerOptionsFromDSN(dbUrl)
if err != nil {
return nil, err
}
instances := viper.GetInt("archer.instances")
if instances <= 0 {
instances = 1
}
timeoutSec := viper.GetInt("archer.timeoutSec")
if timeoutSec <= 0 {
timeoutSec = 60
}
retainDays := viper.GetInt("archer.cleanup_retain_days")
if retainDays <= 0 {
retainDays = 7
}
c := archer.NewClient(
opts,
archer.WithSetTableName("jobs"), // <- ensure correct table
archer.WithSleepInterval(1*time.Second), // fast poll while debugging
archer.WithErrHandler(func(err error) { // bubble up worker SQL errors
log.Printf("[archer] ERROR: %v", err)
}),
)
jobs := &Jobs{Client: c}
c.Register(
"bootstrap_bastion",
BastionBootstrapWorker(gdb),
archer.WithInstances(instances),
archer.WithTimeout(time.Duration(timeoutSec)*time.Second),
)
c.Register(
"archer_cleanup",
CleanupWorker(gdb, jobs, retainDays),
archer.WithInstances(1),
archer.WithTimeout(5*time.Minute),
)
c.Register(
"tokens_cleanup",
TokensCleanupWorker(gdb, jobs),
archer.WithInstances(1),
archer.WithTimeout(5*time.Minute),
)
return jobs, nil
}
func (j *Jobs) Start() error { return j.Client.Start() }
func (j *Jobs) Stop() { j.Client.Stop() }
func (j *Jobs) Enqueue(ctx context.Context, id, queue string, args any, opts ...archer.FnOptions) (any, error) {
return j.Client.Schedule(ctx, id, queue, args, opts...)
}

View File

@@ -0,0 +1,51 @@
package bg
import (
"context"
"time"
"github.com/dyaksa/archer"
"github.com/dyaksa/archer/job"
"github.com/google/uuid"
"gorm.io/gorm"
)
type RefreshTokenRow struct {
ID string `gorm:"primaryKey"`
RevokedAt *time.Time
ExpiresAt time.Time
UpdatedAt time.Time
}
func (RefreshTokenRow) TableName() string { return "refresh_tokens" }
type TokensCleanupArgs struct {
// kept in case you want to change retention or add dry-run later
}
func TokensCleanupWorker(db *gorm.DB, jobs *Jobs) archer.WorkerFn {
return func(ctx context.Context, j job.Job) (any, error) {
if err := CleanupRefreshTokens(db); err != nil {
return nil, err
}
// schedule tomorrow 03:45
next := time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour).Add(3*time.Hour + 45*time.Minute)
_, _ = jobs.Enqueue(
ctx,
uuid.NewString(),
"tokens_cleanup",
TokensCleanupArgs{},
archer.WithScheduleTime(next),
archer.WithMaxRetries(1),
)
return nil, nil
}
}
func CleanupRefreshTokens(db *gorm.DB) error {
now := time.Now()
return db.
Where("revoked_at IS NOT NULL OR expires_at < ?", now).
Delete(&RefreshTokenRow{}).Error
}