import boto3
from botocore.exceptions import ClientError
import time
import json
import uuid

# A script to create a simple EMR cluster using Boto3.

# --- Configuration ---
REGION = "us-east-1"
CLUSTER_NAME = "MyBoto3EMRCluster"
EMR_SERVICE_ROLE_NAME = "EMR_DefaultRole_Boto3"
EMR_EC2_INSTANCE_PROFILE_NAME = "EMR_EC2_DefaultRole_Boto3"
LOG_BUCKET_NAME = f"my-boto3-emr-logs-{uuid.uuid4().hex[:8]}"

iam_client = boto3.client('iam', region_name=REGION)
s3_client = boto3.client('s3', region_name=REGION)
emr_client = boto3.client('emr', region_name=REGION)

def create_iam_roles():
    """Creates IAM roles for EMR service and EC2 instances."""
    print("--- Creating IAM Roles for EMR ---")
    
    # EMR Service Role
    print(f"Creating EMR Service Role: {EMR_SERVICE_ROLE_NAME}...")
    trust_policy_emr = {
      "Version": "2012-10-17",
      "Statement": [{"Effect": "Allow", "Principal": {"Service": "elasticmapreduce.amazonaws.com"}, "Action": "sts:AssumeRole"}]
    }
    try:
        emr_service_role_response = iam_client.create_role(
            RoleName=EMR_SERVICE_ROLE_NAME,
            AssumeRolePolicyDocument=json.dumps(trust_policy_emr)
        )
        emr_service_role_arn = emr_service_role_response['Role']['Arn']
        iam_client.attach_role_policy(
            RoleName=EMR_SERVICE_ROLE_NAME,
            PolicyArn='arn:aws:iam::aws:policy/service-role/AmazonEMRServicePolicy_v2'
        )
        print(f"EMR Service Role ARN: {emr_service_role_arn}")
    except ClientError as e:
        if e.response['Error']['Code'] == 'EntityAlreadyExists':
            print(f"EMR Service Role '{EMR_SERVICE_ROLE_NAME}' already exists. Fetching ARN.")
            emr_service_role_arn = iam_client.get_role(RoleName=EMR_SERVICE_ROLE_NAME)['Role']['Arn']
        else:
            raise e

    # EMR EC2 Instance Profile Role
    print(f"Creating EMR EC2 Instance Profile Role: {EMR_EC2_INSTANCE_PROFILE_NAME}...")
    trust_policy_ec2 = {
      "Version": "2012-10-17",
      "Statement": [{"Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"}, "Action": "sts:AssumeRole"}]
    }
    try:
        emr_ec2_role_response = iam_client.create_role(
            RoleName=EMR_EC2_INSTANCE_PROFILE_NAME,
            AssumeRolePolicyDocument=json.dumps(trust_policy_ec2)
        )
        emr_ec2_role_arn = emr_ec2_role_response['Role']['Arn']
        iam_client.attach_role_policy(
            RoleName=EMR_EC2_INSTANCE_PROFILE_NAME,
            PolicyArn='arn:aws:iam::aws:policy/AmazonEC2forEMRRole'
        )
        iam_client.attach_role_policy(
            RoleName=EMR_EC2_INSTANCE_PROFILE_NAME,
            PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess' # For simplicity, grant S3 access for logs and data
        )
        
        # Create instance profile
        iam_client.create_instance_profile(InstanceProfileName=EMR_EC2_INSTANCE_PROFILE_NAME)
        iam_client.add_role_to_instance_profile(
            InstanceProfileName=EMR_EC2_INSTANCE_PROFILE_NAME,
            RoleName=EMR_EC2_INSTANCE_PROFILE_NAME
        )
        print(f"EMR EC2 Instance Profile ARN: {emr_ec2_role_arn}")
    except ClientError as e:
        if e.response['Error']['Code'] == 'EntityAlreadyExists':
            print(f"EMR EC2 Instance Profile Role '{EMR_EC2_INSTANCE_PROFILE_NAME}' already exists. Fetching ARN.")
            emr_ec2_role_arn = iam_client.get_role(RoleName=EMR_EC2_INSTANCE_PROFILE_NAME)['Role']['Arn']
        else:
            raise e

    print("Waiting for IAM roles to propagate...")
    time.sleep(15)
    return emr_service_role_arn, emr_ec2_role_arn

def create_s3_bucket_for_logs():
    """Creates an S3 bucket for EMR logs."""
    print(f"\n--- Creating S3 Bucket for EMR Logs: {LOG_BUCKET_NAME} ---")
    try:
        s3_client.create_bucket(
            Bucket=LOG_BUCKET_NAME,
            CreateBucketConfiguration={'LocationConstraint': REGION} if REGION != 'us-east-1' else {}
        )
        print("S3 Bucket created.")
    except ClientError as e:
        print(f"Error creating S3 bucket: {e}")
        raise

def create_emr_cluster(emr_service_role_arn, emr_ec2_role_arn):
    """Creates an EMR cluster."""
    print(f"\n--- Creating EMR Cluster: {CLUSTER_NAME} ---")
    try:
        response = emr_client.run_job_flow(
            Name=CLUSTER_NAME,
            ReleaseLabel='emr-6.12.0', # Specify a supported EMR release
            Applications=[{'Name': 'Spark'}, {'Name': 'Hadoop'}],
            Instances={
                'MasterInstanceType': 'm5.xlarge',
                'SlaveInstanceType': 'm5.xlarge',
                'InstanceCount': 2, # 1 Master, 1 Core
                'KeepJobFlowAliveWhenNoSteps': True,
                'TerminationProtected': False,
                'Ec2InstanceProfile': EMR_EC2_INSTANCE_PROFILE_NAME # Use the instance profile name
            },
            ServiceRole=emr_service_role_arn,
            LogUri=f"s3://{LOG_BUCKET_NAME}/",
            Tags=[{'Key': 'Name', 'Value': CLUSTER_NAME}]
        )
        cluster_id = response['JobFlowId']
        print(f"EMR cluster '{cluster_id}' created. Waiting for it to be in a waiting state (ready for jobs)...")
        emr_client.get_waiter('cluster_waiting').wait(ClusterId=cluster_id)
        print("EMR cluster is ready.")
        return cluster_id
    except ClientError as e:
        print(f"Error creating EMR cluster: {e}")
        raise

def cleanup_resources(cluster_id, emr_service_role_arn, emr_ec2_role_arn):
    """Cleans up all created resources."""
    print(f"\n--- Cleaning up resources ---")

    # Terminate EMR Cluster
    if cluster_id:
        print(f"Terminating EMR cluster '{cluster_id}'...")
        try:
            emr_client.terminate_job_flows(JobFlowIds=[cluster_id])
            emr_client.get_waiter('cluster_terminated').wait(ClusterId=cluster_id)
            print("EMR cluster terminated.")
        except ClientError as e:
            if e.response['Error']['Code'] == 'InvalidRequestException':
                print(f"EMR cluster '{cluster_id}' not found, skipping termination.")
            else:
                print(f"Error terminating EMR cluster: {e}")

    # Delete S3 Bucket for EMR Logs
    print(f"Deleting S3 Bucket '{LOG_BUCKET_NAME}'...")
    try:
        s3_resource = boto3.resource('s3', region_name=REGION)
        bucket = s3_resource.Bucket(LOG_BUCKET_NAME)
        bucket.objects.all().delete() # Delete all objects in the bucket
        bucket.delete()
        print("S3 Bucket deleted.")
    except ClientError as e:
        print(f"Error deleting S3 bucket: {e}")

    # Detach and Delete IAM Roles
    if emr_service_role_arn:
        print(f"Detaching policies from IAM Role '{EMR_SERVICE_ROLE_NAME}'...")
        try:
            iam_client.detach_role_policy(RoleName=EMR_SERVICE_ROLE_NAME, PolicyArn='arn:aws:iam::aws:policy/service-role/AmazonEMRServicePolicy_v2')
            print(f"Deleting IAM Role '{EMR_SERVICE_ROLE_NAME}'...")
            iam_client.delete_role(RoleName=EMR_SERVICE_ROLE_NAME)
            print("EMR Service Role deleted.")
        except ClientError as e:
            if e.response['Error']['Code'] == 'NoSuchEntity':
                print(f"IAM Role '{EMR_SERVICE_ROLE_NAME}' not found, skipping deletion.")
            else:
                print(f"Error deleting EMR Service Role: {e}")

    if emr_ec2_role_arn:
        print(f"Detaching policies from IAM Role '{EMR_EC2_INSTANCE_PROFILE_NAME}'...")
        try:
            iam_client.detach_role_policy(RoleName=EMR_EC2_INSTANCE_PROFILE_NAME, PolicyArn='arn:aws:iam::aws:policy/AmazonEC2forEMRRole')
            iam_client.detach_role_policy(RoleName=EMR_EC2_INSTANCE_PROFILE_NAME, PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess')
            
            print(f"Removing role from instance profile '{EMR_EC2_INSTANCE_PROFILE_NAME}'...")
            iam_client.remove_role_from_instance_profile(
                InstanceProfileName=EMR_EC2_INSTANCE_PROFILE_NAME,
                RoleName=EMR_EC2_INSTANCE_PROFILE_NAME
            )
            print(f"Deleting instance profile '{EMR_EC2_INSTANCE_PROFILE_NAME}'...")
            iam_client.delete_instance_profile(InstanceProfileName=EMR_EC2_INSTANCE_PROFILE_NAME)
            
            print(f"Deleting IAM Role '{EMR_EC2_INSTANCE_PROFILE_NAME}'...")
            iam_client.delete_role(RoleName=EMR_EC2_INSTANCE_PROFILE_NAME)
            print("EMR EC2 Instance Profile Role deleted.")
        except ClientError as e:
            if e.response['Error']['Code'] == 'NoSuchEntity':
                print(f"IAM Role '{EMR_EC2_INSTANCE_PROFILE_NAME}' not found, skipping deletion.")
            else:
                print(f"Error deleting EMR EC2 Instance Profile Role: {e}")

def main():
    cluster_id = None
    emr_service_role_arn = None
    emr_ec2_role_arn = None
    try:
        emr_service_role_arn, emr_ec2_role_arn = create_iam_roles()
        create_s3_bucket_for_logs()
        cluster_id = create_emr_cluster(emr_service_role_arn, emr_ec2_role_arn)

        print("\n--- EMR Cluster Created Successfully! ---")
        print(f"Cluster ID: {cluster_id}")
        print("You can now submit jobs to this cluster.")

        input("Press Enter to terminate the EMR cluster and clean up resources...")

    except ClientError as e:
        print(f"An AWS client error occurred: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    finally:
        cleanup_resources(cluster_id, emr_service_role_arn, emr_ec2_role_arn)
        print("\n--- EMR cluster demonstration and cleanup complete ---")

if __name__ == "__main__":
    main()
