Files
autoglue/internal/bg/bastion.go

274 lines
7.8 KiB
Go

package bg
import (
"context"
"fmt"
"log"
"net"
"strings"
"time"
"github.com/dyaksa/archer"
"github.com/dyaksa/archer/job"
"github.com/glueops/autoglue/internal/models"
"github.com/glueops/autoglue/internal/utils"
"github.com/google/uuid"
"golang.org/x/crypto/ssh"
"gorm.io/gorm"
)
// ----- Public types -----
type BastionBootstrapArgs struct{}
type BastionBootstrapFailure struct {
ID uuid.UUID `json:"id"`
Step string `json:"step"`
Reason string `json:"reason"`
}
type BastionBootstrapResult struct {
Status string `json:"status"`
Processed int `json:"processed"`
Ready int `json:"ready"`
Failed int `json:"failed"`
ElapsedMs int `json:"elapsed_ms"`
FailedServer []uuid.UUID `json:"failed_server_ids"`
Failures []BastionBootstrapFailure `json:"failures"`
}
// ----- Worker -----
func BastionBootstrapWorker(db *gorm.DB) archer.WorkerFn {
return func(ctx context.Context, j job.Job) (any, error) {
jobID := j.ID
start := time.Now()
var servers []models.Server
if err := db.
Preload("SshKey").
Where("role = ? AND status = ?", "bastion", "pending").
Find(&servers).Error; err != nil {
log.Printf("[bastion] level=ERROR job=%s step=query msg=%q", jobID, err)
return nil, err
}
// log.Printf("[bastion] level=INFO job=%s step=start count=%d", jobID, len(servers))
proc, ok, fail := 0, 0, 0
var failedIDs []uuid.UUID
var failures []BastionBootstrapFailure
perHostTimeout := 8 * time.Minute
for i := range servers {
s := &servers[i]
// hostStart := time.Now()
proc++
// 1) Defensive IP check
if s.PublicIPAddress == nil || *s.PublicIPAddress == "" {
fail++
failedIDs = append(failedIDs, s.ID)
failures = append(failures, BastionBootstrapFailure{ID: s.ID, Step: "ip_check", Reason: "missing public ip"})
logHostErr(jobID, s, "ip_check", fmt.Errorf("missing public ip"))
_ = setServerStatus(db, s.ID, "failed")
continue
}
// 2) Move to provisioning
if err := setServerStatus(db, s.ID, "provisioning"); err != nil {
fail++
failedIDs = append(failedIDs, s.ID)
failures = append(failures, BastionBootstrapFailure{ID: s.ID, Step: "set_provisioning", Reason: err.Error()})
logHostErr(jobID, s, "set_provisioning", err)
continue
}
// 3) Decrypt private key for org
privKey, err := utils.DecryptForOrg(
s.OrganizationID,
s.SshKey.EncryptedPrivateKey,
s.SshKey.PrivateIV,
s.SshKey.PrivateTag,
db,
)
if err != nil {
fail++
failedIDs = append(failedIDs, s.ID)
failures = append(failures, BastionBootstrapFailure{ID: s.ID, Step: "decrypt_key", Reason: err.Error()})
logHostErr(jobID, s, "decrypt_key", err)
_ = setServerStatus(db, s.ID, "failed")
continue
}
// 4) SSH + install docker
host := net.JoinHostPort(*s.PublicIPAddress, "22")
runCtx, cancel := context.WithTimeout(ctx, perHostTimeout)
out, err := sshInstallDockerWithOutput(runCtx, host, s.SSHUser, []byte(privKey))
cancel()
if err != nil {
fail++
failedIDs = append(failedIDs, s.ID)
failures = append(failures, BastionBootstrapFailure{ID: s.ID, Step: "ssh_install", Reason: err.Error()})
// include a short tail of output to speed debugging without flooding logs
tail := out
if len(tail) > 800 {
tail = tail[len(tail)-800:]
}
logHostErr(jobID, s, "ssh_install", fmt.Errorf("%v | tail=%q", err, tail))
_ = setServerStatus(db, s.ID, "failed")
continue
}
// 5) Mark ready
if err := setServerStatus(db, s.ID, "ready"); err != nil {
fail++
failedIDs = append(failedIDs, s.ID)
failures = append(failures, BastionBootstrapFailure{ID: s.ID, Step: "set_ready", Reason: err.Error()})
logHostErr(jobID, s, "set_ready", err)
_ = setServerStatus(db, s.ID, "failed")
continue
}
ok++
// logHostInfo(jobID, s, "done", "host completed",
// "elapsed_ms", time.Since(hostStart).Milliseconds())
}
res := BastionBootstrapResult{
Status: "ok",
Processed: proc,
Ready: ok,
Failed: fail,
ElapsedMs: int(time.Since(start).Milliseconds()),
FailedServer: failedIDs,
Failures: failures,
}
// log.Printf("[bastion] level=INFO job=%s step=finish processed=%d ready=%d failed=%d elapsed_ms=%d",
// jobID, proc, ok, fail, res.ElapsedMs)
return res, nil
}
}
// ----- Helpers -----
func setServerStatus(db *gorm.DB, id uuid.UUID, status string) error {
return db.Model(&models.Server{}).
Where("id = ?", id).
Updates(map[string]any{
"status": status,
"updated_at": time.Now(),
}).Error
}
// uniform log helpers for consistent, greppable output
func logHostErr(jobID string, s *models.Server, step string, err error) {
ip := ""
if s.PublicIPAddress != nil {
ip = *s.PublicIPAddress
}
log.Printf("[bastion] level=ERROR job=%s server_id=%s host=%s step=%s msg=%q",
jobID, s.ID, ip, step, err)
}
func logHostInfo(jobID string, s *models.Server, step, msg string, kv ...any) {
ip := ""
if s.PublicIPAddress != nil {
ip = *s.PublicIPAddress
}
log.Printf("[bastion] level=INFO job=%s server_id=%s host=%s step=%s %s kv=%v",
jobID, s.ID, ip, step, msg, kv)
}
// ----- SSH & command execution -----
// returns combined stdout/stderr so caller can log it on error
func sshInstallDockerWithOutput(ctx context.Context, host, user string, privateKeyPEM []byte) (string, error) {
signer, err := ssh.ParsePrivateKey(privateKeyPEM)
if err != nil {
return "", fmt.Errorf("parse private key: %w", err)
}
config := &ssh.ClientConfig{
User: user,
Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(), // TODO: known_hosts verification
Timeout: 30 * time.Second,
}
// context-aware dial
dialer := &net.Dialer{}
conn, err := dialer.DialContext(ctx, "tcp", host)
if err != nil {
return "", fmt.Errorf("dial: %w", err)
}
defer conn.Close()
c, chans, reqs, err := ssh.NewClientConn(conn, host, config)
if err != nil {
return "", fmt.Errorf("ssh handshake: %w", err)
}
client := ssh.NewClient(c, chans, reqs)
defer client.Close()
sess, err := client.NewSession()
if err != nil {
return "", fmt.Errorf("session: %w", err)
}
defer sess.Close()
// --- script to run remotely (no extra quoting) ---
script := `
set -euxo pipefail
if ! command -v docker >/dev/null 2>&1; then
curl -fsSL https://get.docker.com | sh
fi
# try to enable/start (handles distros with systemd)
if command -v systemctl >/dev/null 2>&1; then
sudo systemctl enable --now docker || true
fi
# add current ssh user to docker group if exists
if getent group docker >/dev/null 2>&1; then
sudo usermod -aG docker "$(id -un)" || true
fi
`
// Send script via stdin to avoid quoting/escaping issues
sess.Stdin = strings.NewReader(script)
// Capture combined stdout+stderr
out, runErr := sess.CombinedOutput("bash -s")
return string(out), wrapSSHError(runErr, string(out))
}
// annotate common SSH/remote failure modes to speed triage
func wrapSSHError(err error, output string) error {
if err == nil {
return nil
}
switch {
case strings.Contains(output, "Could not resolve host"):
return fmt.Errorf("remote run: name resolution failed: %w", err)
case strings.Contains(output, "Permission denied"):
return fmt.Errorf("remote run: permission denied (check user/key/authorized_keys): %w", err)
case strings.Contains(output, "apt-get"):
return fmt.Errorf("remote run: apt failed: %w", err)
case strings.Contains(output, "yum"):
return fmt.Errorf("remote run: yum failed: %w", err)
default:
return fmt.Errorf("remote run: %w", err)
}
}
// super simple escaping for a here-string; avoids quoting hell
func sshEscape(s string) string {
return fmt.Sprintf("%q", s)
}