mirror of
https://gitea.smigz.com/smiggiddy/s3_cleanup.git
synced 2024-12-25 04:30:43 -05:00
93 lines
2.3 KiB
Python
93 lines
2.3 KiB
Python
import argparse
|
|
import boto
|
|
import boto.s3.connection
|
|
import datetime as dt
|
|
from dotenv import load_dotenv
|
|
import os
|
|
import re
|
|
|
|
load_dotenv()
|
|
|
|
def get_s3_bucket(bucketname: str, host: str):
|
|
"""returns bucket object based
|
|
|
|
:param bucketname: name of s3 bucket
|
|
:param host: S3 endpoint URI
|
|
|
|
return b: s3 bucket object
|
|
"""
|
|
access_key = os.environ.get("ACCESS_KEY")
|
|
secret_key = os.environ.get("SECRET_KEY")
|
|
conn = boto.connect_s3(
|
|
aws_access_key_id=access_key,
|
|
aws_secret_access_key=secret_key,
|
|
host=host,
|
|
)
|
|
b = conn.get_bucket(bucketname)
|
|
|
|
return b
|
|
|
|
|
|
def get_docker_backup_objects(bucket):
|
|
"""returns list of objects with docker_backup in the key
|
|
:param bucket: should be s3 bucket object
|
|
|
|
:return: list of s3 keys
|
|
"""
|
|
docker_backup = [key for key in bucket if "docker_backup" in key.name]
|
|
|
|
return docker_backup
|
|
|
|
|
|
def get_key_to_delete(docker_backup: list):
|
|
"""returns keys to delete based on regular expression
|
|
:param: docker_backup list of keys with docker_backup in name
|
|
:return
|
|
|
|
"""
|
|
pattern = r"([\d\-.]{10})"
|
|
|
|
# (example output) docker_backups/2023-07-04
|
|
keys_to_delete = []
|
|
for k in docker_backup:
|
|
m = re.search(pattern, k.name)
|
|
if m:
|
|
if m.group(0) <= str(dt.date.today() - dt.timedelta(days=10)):
|
|
keys_to_delete.append(k)
|
|
|
|
return keys_to_delete
|
|
|
|
|
|
def delete_keys(keys_to_delete: list, bucket):
|
|
"""deletes keys from s3 bucket
|
|
:param keys_to_delete: list of keys to delete
|
|
:param b: s3 bucket
|
|
|
|
"""
|
|
|
|
for k in keys_to_delete:
|
|
print(f"Deleting key {k.name}")
|
|
bucket.delete_key(k)
|
|
|
|
|
|
def parse_arguments():
|
|
"""parse commmand-line arguments
|
|
|
|
:return parser.parse_args():
|
|
"""
|
|
|
|
# load cli flags
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--bucket", help="bucket to parse for docker_backups")
|
|
parser.add_argument("--host", help="S3 bucket endpoint URI")
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_arguments()
|
|
|
|
bucket = get_s3_bucket(bucketname=args.bucket, host=args.host)
|
|
objects = get_docker_backup_objects(bucket)
|
|
key_to_delete = get_key_to_delete(objects)
|
|
delete_keys(keys_to_delete=key_to_delete, bucket=bucket)
|