mirror of
https://github.com/GlueOps/autoglue.git
synced 2026-02-13 04:40:05 +01:00
feat: adding hourly backups to s3
Signed-off-by: allanice001 <allanice001@gmail.com>
This commit is contained in:
258
internal/bg/backup_s3.go
Normal file
258
internal/bg/backup_s3.go
Normal file
@@ -0,0 +1,258 @@
|
||||
package bg
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"mime"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
awsconfig "github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
"github.com/dyaksa/archer"
|
||||
"github.com/dyaksa/archer/job"
|
||||
"github.com/glueops/autoglue/internal/config"
|
||||
"github.com/glueops/autoglue/internal/models"
|
||||
"github.com/glueops/autoglue/internal/utils"
|
||||
"github.com/google/uuid"
|
||||
"github.com/rs/zerolog/log"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type DbBackupArgs struct {
|
||||
// kept in case you want to change retention or add dry-run later
|
||||
}
|
||||
|
||||
type s3Scope struct {
|
||||
Service string `json:"service"`
|
||||
Region string `json:"region"`
|
||||
}
|
||||
|
||||
type encAWS struct {
|
||||
AccessKeyID string `json:"access_key_id"`
|
||||
SecretAccessKey string `json:"secret_access_key"`
|
||||
}
|
||||
|
||||
func DbBackupWorker(db *gorm.DB, jobs *Jobs) archer.WorkerFn {
|
||||
return func(ctx context.Context, j job.Job) (any, error) {
|
||||
if err := DbBackup(ctx, db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queue := j.QueueName
|
||||
if strings.TrimSpace(queue) == "" {
|
||||
queue = "db_backup_s3"
|
||||
}
|
||||
|
||||
next := time.Now().UTC().Add(1 * time.Hour)
|
||||
|
||||
payload := DbBackupArgs{}
|
||||
|
||||
opts := []archer.FnOptions{
|
||||
archer.WithScheduleTime(next),
|
||||
archer.WithMaxRetries(1),
|
||||
}
|
||||
|
||||
if _, err := jobs.Enqueue(ctx, uuid.NewString(), queue, payload, opts...); err != nil {
|
||||
log.Error().Err(err).Str("queue", queue).Time("next", next).Msg("failed to enqueue next db backup")
|
||||
} else {
|
||||
log.Info().Str("queue", queue).Time("next", next).Msg("scheduled next db backup")
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func DbBackup(ctx context.Context, db *gorm.DB) error {
|
||||
cfg, err := config.Load()
|
||||
log.Info().Err(err).Msg("loading config")
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
cred, sc, err := loadS3Credential(ctx, db)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load credential: %w", err)
|
||||
}
|
||||
|
||||
ak, sk, err := decryptAwsAccessKeys(ctx, db, cred)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decrypt aws keys: %w", err)
|
||||
}
|
||||
|
||||
region := sc.Region
|
||||
|
||||
if strings.TrimSpace(region) == "" {
|
||||
region = cred.Region
|
||||
if strings.TrimSpace(region) == "" {
|
||||
region = "us-west-1"
|
||||
}
|
||||
}
|
||||
|
||||
bucket := strings.ToLower(fmt.Sprintf("%s-autoglue-backups-%s", cred.OrganizationID, region))
|
||||
|
||||
s3cli, err := makeS3Client(ctx, ak, sk, region)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := ensureBucket(ctx, s3cli, bucket, region); err != nil {
|
||||
return fmt.Errorf("ensure bucket: %w", err)
|
||||
}
|
||||
|
||||
tmpDir := os.TempDir()
|
||||
now := time.Now().UTC()
|
||||
key := fmt.Sprintf("%04d/%02d/%02d/backup-%02d.sql", now.Year(), now.Month(), now.Day(), now.Hour())
|
||||
outPath := filepath.Join(tmpDir, "autoglue-backup-"+now.Format("20060102T150405Z")+".sql")
|
||||
|
||||
if err := runPgDump(ctx, cfg.DbURL, outPath); err != nil {
|
||||
return fmt.Errorf("pg_dump: %w", err)
|
||||
}
|
||||
defer os.Remove(outPath)
|
||||
|
||||
if err := uploadFileToS3(ctx, s3cli, bucket, key, outPath); err != nil {
|
||||
return fmt.Errorf("s3 upload: %w", err)
|
||||
}
|
||||
|
||||
log.Info().Str("bucket", bucket).Str("key", key).Msg("backup uploaded")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// --- Helpers
|
||||
|
||||
func loadS3Credential(ctx context.Context, db *gorm.DB) (models.Credential, s3Scope, error) {
|
||||
var c models.Credential
|
||||
err := db.
|
||||
WithContext(ctx).
|
||||
Where("provider = ? AND kind = ? AND scope_kind = ?", "aws", "aws_access_key", "service").
|
||||
Where("scope ->> 'service' = ?", "s3").
|
||||
Order("created_at DESC").
|
||||
First(&c).Error
|
||||
if err != nil {
|
||||
return models.Credential{}, s3Scope{}, fmt.Errorf("load credential: %w", err)
|
||||
}
|
||||
|
||||
var sc s3Scope
|
||||
_ = json.Unmarshal(c.Scope, &sc)
|
||||
return c, sc, nil
|
||||
}
|
||||
|
||||
func decryptAwsAccessKeys(ctx context.Context, db *gorm.DB, c models.Credential) (string, string, error) {
|
||||
plain, err := utils.DecryptForOrg(c.OrganizationID, c.EncryptedData, c.IV, c.Tag, db)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
var payload encAWS
|
||||
if err := json.Unmarshal([]byte(plain), &payload); err != nil {
|
||||
return "", "", fmt.Errorf("parse decrypted payload: %w", err)
|
||||
}
|
||||
|
||||
if payload.AccessKeyID == "" || payload.SecretAccessKey == "" {
|
||||
return "", "", errors.New("decrypted payload missing keys")
|
||||
}
|
||||
return payload.AccessKeyID, payload.SecretAccessKey, nil
|
||||
}
|
||||
|
||||
func makeS3Client(ctx context.Context, accessKey, secret, region string) (*s3.Client, error) {
|
||||
staticCredentialsProvider := credentials.NewStaticCredentialsProvider(accessKey, secret, "")
|
||||
cfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithCredentialsProvider(staticCredentialsProvider), awsconfig.WithRegion(region))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("aws config: %w", err)
|
||||
}
|
||||
return s3.NewFromConfig(cfg), nil
|
||||
}
|
||||
|
||||
func ensureBucket(ctx context.Context, s3cli *s3.Client, bucket, region string) error {
|
||||
_, err := s3cli.HeadBucket(ctx, &s3.HeadBucketInput{Bucket: aws.String(bucket)})
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if out, err := s3cli.GetBucketLocation(ctx, &s3.GetBucketLocationInput{Bucket: aws.String(bucket)}); err == nil {
|
||||
existing := string(out.LocationConstraint)
|
||||
if existing == "" {
|
||||
existing = "us-east-1"
|
||||
}
|
||||
if existing != region {
|
||||
return fmt.Errorf("bucket %q already exists in region %q (requested %q)", bucket, existing, region)
|
||||
}
|
||||
}
|
||||
|
||||
// Create; LocationConstraint except us-east-1
|
||||
in := &s3.CreateBucketInput{Bucket: aws.String(bucket)}
|
||||
if region != "us-east-1" {
|
||||
in.CreateBucketConfiguration = &s3types.CreateBucketConfiguration{
|
||||
LocationConstraint: s3types.BucketLocationConstraint(region),
|
||||
}
|
||||
}
|
||||
if _, err := s3cli.CreateBucket(ctx, in); err != nil {
|
||||
return fmt.Errorf("create bucket: %w", err)
|
||||
}
|
||||
|
||||
// default SSE (best-effort)
|
||||
_, _ = s3cli.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{
|
||||
Bucket: aws.String(bucket),
|
||||
ServerSideEncryptionConfiguration: &s3types.ServerSideEncryptionConfiguration{
|
||||
Rules: []s3types.ServerSideEncryptionRule{
|
||||
{ApplyServerSideEncryptionByDefault: &s3types.ServerSideEncryptionByDefault{
|
||||
SSEAlgorithm: s3types.ServerSideEncryptionAes256,
|
||||
}},
|
||||
},
|
||||
},
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func runPgDump(ctx context.Context, dbURL, outPath string) error {
|
||||
if err := os.MkdirAll(filepath.Dir(outPath), 0o755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
args := []string{
|
||||
"--no-owner",
|
||||
"--no-privileges",
|
||||
"--format=plain",
|
||||
"--file", outPath,
|
||||
dbURL,
|
||||
}
|
||||
|
||||
cmd := exec.CommandContext(ctx, "pg_dump", args...)
|
||||
var stderr bytes.Buffer
|
||||
cmd.Stderr = &stderr
|
||||
if err := cmd.Run(); err != nil {
|
||||
return fmt.Errorf("pg_dump failed: %v | %s", err, stderr.String())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func uploadFileToS3(ctx context.Context, s3cli *s3.Client, bucket, key, path string) error {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer f.Close()
|
||||
|
||||
info, _ := f.Stat()
|
||||
_, err = s3cli.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(bucket),
|
||||
Key: aws.String(key),
|
||||
Body: f,
|
||||
ContentLength: aws.Int64(info.Size()),
|
||||
ContentType: aws.String(mime.TypeByExtension(filepath.Ext(path))),
|
||||
ServerSideEncryption: s3types.ServerSideEncryptionAes256,
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -67,7 +67,7 @@ func NewJobs(gdb *gorm.DB, dbUrl string) (*Jobs, error) {
|
||||
archer.WithSetTableName("jobs"), // <- ensure correct table
|
||||
archer.WithSleepInterval(1*time.Second), // fast poll while debugging
|
||||
archer.WithErrHandler(func(err error) { // bubble up worker SQL errors
|
||||
log.Printf("[archer] ERROR: %v", err)
|
||||
log.Error().Err(err).Msg("[archer] worker error")
|
||||
}),
|
||||
)
|
||||
|
||||
@@ -94,6 +94,12 @@ func NewJobs(gdb *gorm.DB, dbUrl string) (*Jobs, error) {
|
||||
archer.WithTimeout(5*time.Minute),
|
||||
)
|
||||
|
||||
c.Register(
|
||||
"db_backup_s3",
|
||||
DbBackupWorker(gdb, jobs),
|
||||
archer.WithInstances(1),
|
||||
archer.WithTimeout(15*time.Minute),
|
||||
)
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user