Files
autoglue/internal/bg/backup_s3.go
allanice001 fc1c83ba18 chore: cleanup and route refactoring
Signed-off-by: allanice001 <allanice001@gmail.com>
2025-11-14 06:12:59 +00:00

265 lines
7.0 KiB
Go

package bg
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"mime"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/dyaksa/archer"
"github.com/dyaksa/archer/job"
"github.com/glueops/autoglue/internal/config"
"github.com/glueops/autoglue/internal/models"
"github.com/glueops/autoglue/internal/utils"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
)
type DbBackupArgs struct {
IntervalS int `json:"interval_seconds,omitempty"`
}
type s3Scope struct {
Service string `json:"service"`
Region string `json:"region"`
}
type encAWS struct {
AccessKeyID string `json:"access_key_id"`
SecretAccessKey string `json:"secret_access_key"`
}
func DbBackupWorker(db *gorm.DB, jobs *Jobs) archer.WorkerFn {
return func(ctx context.Context, j job.Job) (any, error) {
args := DbBackupArgs{IntervalS: 3600}
_ = j.ParseArguments(&args)
if args.IntervalS <= 0 {
args.IntervalS = 3600
}
if err := DbBackup(ctx, db); err != nil {
return nil, err
}
queue := j.QueueName
if strings.TrimSpace(queue) == "" {
queue = "db_backup_s3"
}
next := time.Now().Add(time.Duration(args.IntervalS) * time.Second)
payload := DbBackupArgs{}
opts := []archer.FnOptions{
archer.WithScheduleTime(next),
archer.WithMaxRetries(1),
}
if _, err := jobs.Enqueue(ctx, uuid.NewString(), queue, payload, opts...); err != nil {
log.Error().Err(err).Str("queue", queue).Time("next", next).Msg("failed to enqueue next db backup")
} else {
log.Info().Str("queue", queue).Time("next", next).Msg("scheduled next db backup")
}
return nil, nil
}
}
func DbBackup(ctx context.Context, db *gorm.DB) error {
cfg, err := config.Load()
if err != nil {
return fmt.Errorf("load config: %w", err)
}
cred, sc, err := loadS3Credential(ctx, db)
if err != nil {
return fmt.Errorf("load credential: %w", err)
}
ak, sk, err := decryptAwsAccessKeys(ctx, db, cred)
if err != nil {
return fmt.Errorf("decrypt aws keys: %w", err)
}
region := sc.Region
if strings.TrimSpace(region) == "" {
region = cred.Region
if strings.TrimSpace(region) == "" {
region = "us-west-1"
}
}
bucket := strings.ToLower(fmt.Sprintf("%s-autoglue-backups-%s", cred.OrganizationID, region))
s3cli, err := makeS3Client(ctx, ak, sk, region)
if err != nil {
return err
}
if err := ensureBucket(ctx, s3cli, bucket, region); err != nil {
return fmt.Errorf("ensure bucket: %w", err)
}
tmpDir := os.TempDir()
now := time.Now().UTC()
key := fmt.Sprintf("%04d/%02d/%02d/backup-%02d.sql", now.Year(), now.Month(), now.Day(), now.Hour())
outPath := filepath.Join(tmpDir, "autoglue-backup-"+now.Format("20060102T150405Z")+".sql")
if err := runPgDump(ctx, cfg.DbURL, outPath); err != nil {
return fmt.Errorf("pg_dump: %w", err)
}
defer os.Remove(outPath)
if err := uploadFileToS3(ctx, s3cli, bucket, key, outPath); err != nil {
return fmt.Errorf("s3 upload: %w", err)
}
log.Info().Str("bucket", bucket).Str("key", key).Msg("backup uploaded")
return nil
}
// --- Helpers
func loadS3Credential(ctx context.Context, db *gorm.DB) (models.Credential, s3Scope, error) {
var c models.Credential
err := db.
WithContext(ctx).
Where("provider = ? AND kind = ? AND scope_kind = ?", "aws", "aws_access_key", "service").
Where("scope ->> 'service' = ?", "s3").
Order("created_at DESC").
First(&c).Error
if err != nil {
return models.Credential{}, s3Scope{}, fmt.Errorf("load credential: %w", err)
}
var sc s3Scope
_ = json.Unmarshal(c.Scope, &sc)
return c, sc, nil
}
func decryptAwsAccessKeys(ctx context.Context, db *gorm.DB, c models.Credential) (string, string, error) {
plain, err := utils.DecryptForOrg(c.OrganizationID, c.EncryptedData, c.IV, c.Tag, db)
if err != nil {
return "", "", err
}
var payload encAWS
if err := json.Unmarshal([]byte(plain), &payload); err != nil {
return "", "", fmt.Errorf("parse decrypted payload: %w", err)
}
if payload.AccessKeyID == "" || payload.SecretAccessKey == "" {
return "", "", errors.New("decrypted payload missing keys")
}
return payload.AccessKeyID, payload.SecretAccessKey, nil
}
func makeS3Client(ctx context.Context, accessKey, secret, region string) (*s3.Client, error) {
staticCredentialsProvider := credentials.NewStaticCredentialsProvider(accessKey, secret, "")
cfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithCredentialsProvider(staticCredentialsProvider), awsconfig.WithRegion(region))
if err != nil {
return nil, fmt.Errorf("aws config: %w", err)
}
return s3.NewFromConfig(cfg), nil
}
func ensureBucket(ctx context.Context, s3cli *s3.Client, bucket, region string) error {
_, err := s3cli.HeadBucket(ctx, &s3.HeadBucketInput{Bucket: aws.String(bucket)})
if err == nil {
return nil
}
if out, err := s3cli.GetBucketLocation(ctx, &s3.GetBucketLocationInput{Bucket: aws.String(bucket)}); err == nil {
existing := string(out.LocationConstraint)
if existing == "" {
existing = "us-east-1"
}
if existing != region {
return fmt.Errorf("bucket %q already exists in region %q (requested %q)", bucket, existing, region)
}
}
// Create; LocationConstraint except us-east-1
in := &s3.CreateBucketInput{Bucket: aws.String(bucket)}
if region != "us-east-1" {
in.CreateBucketConfiguration = &s3types.CreateBucketConfiguration{
LocationConstraint: s3types.BucketLocationConstraint(region),
}
}
if _, err := s3cli.CreateBucket(ctx, in); err != nil {
return fmt.Errorf("create bucket: %w", err)
}
// default SSE (best-effort)
_, _ = s3cli.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{
Bucket: aws.String(bucket),
ServerSideEncryptionConfiguration: &s3types.ServerSideEncryptionConfiguration{
Rules: []s3types.ServerSideEncryptionRule{
{ApplyServerSideEncryptionByDefault: &s3types.ServerSideEncryptionByDefault{
SSEAlgorithm: s3types.ServerSideEncryptionAes256,
}},
},
},
})
return nil
}
func runPgDump(ctx context.Context, dbURL, outPath string) error {
if err := os.MkdirAll(filepath.Dir(outPath), 0o755); err != nil {
return err
}
args := []string{
"--no-owner",
"--no-privileges",
"--format=plain",
"--file", outPath,
dbURL,
}
cmd := exec.CommandContext(ctx, "pg_dump", args...)
var stderr bytes.Buffer
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("pg_dump failed: %v | %s", err, stderr.String())
}
return nil
}
func uploadFileToS3(ctx context.Context, s3cli *s3.Client, bucket, key, path string) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
info, _ := f.Stat()
_, err = s3cli.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: f,
ContentLength: aws.Int64(info.Size()),
ContentType: aws.String(mime.TypeByExtension(filepath.Ext(path))),
ServerSideEncryption: s3types.ServerSideEncryptionAes256,
})
return err
}