mirror of
https://github.com/GlueOps/autoglue.git
synced 2026-02-13 12:50:05 +01:00
265 lines
7.0 KiB
Go
265 lines
7.0 KiB
Go
package bg
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"mime"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
awsconfig "github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
"github.com/dyaksa/archer"
|
|
"github.com/dyaksa/archer/job"
|
|
"github.com/glueops/autoglue/internal/config"
|
|
"github.com/glueops/autoglue/internal/models"
|
|
"github.com/glueops/autoglue/internal/utils"
|
|
"github.com/google/uuid"
|
|
"github.com/rs/zerolog/log"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type DbBackupArgs struct {
|
|
IntervalS int `json:"interval_seconds,omitempty"`
|
|
}
|
|
|
|
type s3Scope struct {
|
|
Service string `json:"service"`
|
|
Region string `json:"region"`
|
|
}
|
|
|
|
type encAWS struct {
|
|
AccessKeyID string `json:"access_key_id"`
|
|
SecretAccessKey string `json:"secret_access_key"`
|
|
}
|
|
|
|
func DbBackupWorker(db *gorm.DB, jobs *Jobs) archer.WorkerFn {
|
|
return func(ctx context.Context, j job.Job) (any, error) {
|
|
args := DbBackupArgs{IntervalS: 3600}
|
|
_ = j.ParseArguments(&args)
|
|
|
|
if args.IntervalS <= 0 {
|
|
args.IntervalS = 3600
|
|
}
|
|
|
|
if err := DbBackup(ctx, db); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
queue := j.QueueName
|
|
if strings.TrimSpace(queue) == "" {
|
|
queue = "db_backup_s3"
|
|
}
|
|
|
|
next := time.Now().Add(time.Duration(args.IntervalS) * time.Second)
|
|
|
|
payload := DbBackupArgs{}
|
|
|
|
opts := []archer.FnOptions{
|
|
archer.WithScheduleTime(next),
|
|
archer.WithMaxRetries(1),
|
|
}
|
|
|
|
if _, err := jobs.Enqueue(ctx, uuid.NewString(), queue, payload, opts...); err != nil {
|
|
log.Error().Err(err).Str("queue", queue).Time("next", next).Msg("failed to enqueue next db backup")
|
|
} else {
|
|
log.Info().Str("queue", queue).Time("next", next).Msg("scheduled next db backup")
|
|
}
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
func DbBackup(ctx context.Context, db *gorm.DB) error {
|
|
cfg, err := config.Load()
|
|
if err != nil {
|
|
return fmt.Errorf("load config: %w", err)
|
|
}
|
|
|
|
cred, sc, err := loadS3Credential(ctx, db)
|
|
if err != nil {
|
|
return fmt.Errorf("load credential: %w", err)
|
|
}
|
|
|
|
ak, sk, err := decryptAwsAccessKeys(ctx, db, cred)
|
|
if err != nil {
|
|
return fmt.Errorf("decrypt aws keys: %w", err)
|
|
}
|
|
|
|
region := sc.Region
|
|
|
|
if strings.TrimSpace(region) == "" {
|
|
region = cred.Region
|
|
if strings.TrimSpace(region) == "" {
|
|
region = "us-west-1"
|
|
}
|
|
}
|
|
|
|
bucket := strings.ToLower(fmt.Sprintf("%s-autoglue-backups-%s", cred.OrganizationID, region))
|
|
|
|
s3cli, err := makeS3Client(ctx, ak, sk, region)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := ensureBucket(ctx, s3cli, bucket, region); err != nil {
|
|
return fmt.Errorf("ensure bucket: %w", err)
|
|
}
|
|
|
|
tmpDir := os.TempDir()
|
|
now := time.Now().UTC()
|
|
key := fmt.Sprintf("%04d/%02d/%02d/backup-%02d.sql", now.Year(), now.Month(), now.Day(), now.Hour())
|
|
outPath := filepath.Join(tmpDir, "autoglue-backup-"+now.Format("20060102T150405Z")+".sql")
|
|
|
|
if err := runPgDump(ctx, cfg.DbURL, outPath); err != nil {
|
|
return fmt.Errorf("pg_dump: %w", err)
|
|
}
|
|
defer os.Remove(outPath)
|
|
|
|
if err := uploadFileToS3(ctx, s3cli, bucket, key, outPath); err != nil {
|
|
return fmt.Errorf("s3 upload: %w", err)
|
|
}
|
|
|
|
log.Info().Str("bucket", bucket).Str("key", key).Msg("backup uploaded")
|
|
|
|
return nil
|
|
}
|
|
|
|
// --- Helpers
|
|
|
|
func loadS3Credential(ctx context.Context, db *gorm.DB) (models.Credential, s3Scope, error) {
|
|
var c models.Credential
|
|
err := db.
|
|
WithContext(ctx).
|
|
Where("provider = ? AND kind = ? AND scope_kind = ?", "aws", "aws_access_key", "service").
|
|
Where("scope ->> 'service' = ?", "s3").
|
|
Order("created_at DESC").
|
|
First(&c).Error
|
|
if err != nil {
|
|
return models.Credential{}, s3Scope{}, fmt.Errorf("load credential: %w", err)
|
|
}
|
|
|
|
var sc s3Scope
|
|
_ = json.Unmarshal(c.Scope, &sc)
|
|
return c, sc, nil
|
|
}
|
|
|
|
func decryptAwsAccessKeys(ctx context.Context, db *gorm.DB, c models.Credential) (string, string, error) {
|
|
plain, err := utils.DecryptForOrg(c.OrganizationID, c.EncryptedData, c.IV, c.Tag, db)
|
|
if err != nil {
|
|
return "", "", err
|
|
}
|
|
|
|
var payload encAWS
|
|
if err := json.Unmarshal([]byte(plain), &payload); err != nil {
|
|
return "", "", fmt.Errorf("parse decrypted payload: %w", err)
|
|
}
|
|
|
|
if payload.AccessKeyID == "" || payload.SecretAccessKey == "" {
|
|
return "", "", errors.New("decrypted payload missing keys")
|
|
}
|
|
return payload.AccessKeyID, payload.SecretAccessKey, nil
|
|
}
|
|
|
|
func makeS3Client(ctx context.Context, accessKey, secret, region string) (*s3.Client, error) {
|
|
staticCredentialsProvider := credentials.NewStaticCredentialsProvider(accessKey, secret, "")
|
|
cfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithCredentialsProvider(staticCredentialsProvider), awsconfig.WithRegion(region))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("aws config: %w", err)
|
|
}
|
|
return s3.NewFromConfig(cfg), nil
|
|
}
|
|
|
|
func ensureBucket(ctx context.Context, s3cli *s3.Client, bucket, region string) error {
|
|
_, err := s3cli.HeadBucket(ctx, &s3.HeadBucketInput{Bucket: aws.String(bucket)})
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
if out, err := s3cli.GetBucketLocation(ctx, &s3.GetBucketLocationInput{Bucket: aws.String(bucket)}); err == nil {
|
|
existing := string(out.LocationConstraint)
|
|
if existing == "" {
|
|
existing = "us-east-1"
|
|
}
|
|
if existing != region {
|
|
return fmt.Errorf("bucket %q already exists in region %q (requested %q)", bucket, existing, region)
|
|
}
|
|
}
|
|
|
|
// Create; LocationConstraint except us-east-1
|
|
in := &s3.CreateBucketInput{Bucket: aws.String(bucket)}
|
|
if region != "us-east-1" {
|
|
in.CreateBucketConfiguration = &s3types.CreateBucketConfiguration{
|
|
LocationConstraint: s3types.BucketLocationConstraint(region),
|
|
}
|
|
}
|
|
if _, err := s3cli.CreateBucket(ctx, in); err != nil {
|
|
return fmt.Errorf("create bucket: %w", err)
|
|
}
|
|
|
|
// default SSE (best-effort)
|
|
_, _ = s3cli.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{
|
|
Bucket: aws.String(bucket),
|
|
ServerSideEncryptionConfiguration: &s3types.ServerSideEncryptionConfiguration{
|
|
Rules: []s3types.ServerSideEncryptionRule{
|
|
{ApplyServerSideEncryptionByDefault: &s3types.ServerSideEncryptionByDefault{
|
|
SSEAlgorithm: s3types.ServerSideEncryptionAes256,
|
|
}},
|
|
},
|
|
},
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func runPgDump(ctx context.Context, dbURL, outPath string) error {
|
|
if err := os.MkdirAll(filepath.Dir(outPath), 0o755); err != nil {
|
|
return err
|
|
}
|
|
|
|
args := []string{
|
|
"--no-owner",
|
|
"--no-privileges",
|
|
"--format=plain",
|
|
"--file", outPath,
|
|
dbURL,
|
|
}
|
|
|
|
cmd := exec.CommandContext(ctx, "pg_dump", args...)
|
|
var stderr bytes.Buffer
|
|
cmd.Stderr = &stderr
|
|
if err := cmd.Run(); err != nil {
|
|
return fmt.Errorf("pg_dump failed: %v | %s", err, stderr.String())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func uploadFileToS3(ctx context.Context, s3cli *s3.Client, bucket, key, path string) error {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
info, _ := f.Stat()
|
|
_, err = s3cli.PutObject(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(bucket),
|
|
Key: aws.String(key),
|
|
Body: f,
|
|
ContentLength: aws.Int64(info.Size()),
|
|
ContentType: aws.String(mime.TypeByExtension(filepath.Ext(path))),
|
|
ServerSideEncryption: s3types.ServerSideEncryptionAes256,
|
|
})
|
|
|
|
return err
|
|
}
|