# AI Prompt: "Create Pulumi Python program for data platform infrastructure"
"""Data Platform Infrastructure with Pulumi Python"""
import pulumi_kubernetes as k8s
from pulumi import Config, Output, export
environment = config.require( "environment" )
region = config.get( "region" ) or "us-east-1"
vpc_cidr = config.get( "vpcCidr" ) or "10.0.0.0/16"
"Environment" : environment,
"Project" : pulumi.get_project(),
"Stack" : pulumi.get_stack(),
enable_dns_hostnames = True ,
tags = { ** tags, "Name" : f " { environment } -vpc" },
# Create Internet Gateway
igw = aws.ec2.InternetGateway(
tags = { ** tags, "Name" : f " { environment } -igw" },
azs = aws.get_availability_zones( state = "available" )
public_subnet = aws.ec2.Subnet(
f " { environment } -public- { i + 1} " ,
cidr_block = f "10.0. { i + 1} .0/24" ,
availability_zone = azs.names[i],
map_public_ip_on_launch = True ,
"Name" : f " { environment } -public- { i + 1} " ,
"kubernetes.io/role/elb" : "1" ,
public_subnets.append(public_subnet)
private_subnet = aws.ec2.Subnet(
f " { environment } -private- { i + 1} " ,
cidr_block = f "10.0. { i + 11} .0/24" ,
availability_zone = azs.names[i],
"Name" : f " { environment } -private- { i + 1} " ,
"kubernetes.io/role/internal-elb" : "1" ,
private_subnets.append(private_subnet)
database_subnet = aws.ec2.Subnet(
f " { environment } -database- { i + 1} " ,
cidr_block = f "10.0. { i + 21} .0/24" ,
availability_zone = azs.names[i],
"Name" : f " { environment } -database- { i + 1} " ,
database_subnets.append(database_subnet)
f " { environment } -nat-eip" ,
tags = { ** tags, "Name" : f " { environment } -nat-eip" },
nat_gateway = aws.ec2.NatGateway(
subnet_id = public_subnets[ 0 ].id,
tags = { ** tags, "Name" : f " { environment } -nat" },
public_route_table = aws.ec2.RouteTable(
f " { environment } -public-rt" ,
tags = { ** tags, "Name" : f " { environment } -public-rt" },
public_route = aws.ec2.Route(
f " { environment } -public-route" ,
route_table_id = public_route_table.id,
destination_cidr_block = "0.0.0.0/0" ,
# Associate public subnets with public route table
for i, subnet in enumerate (public_subnets):
aws.ec2.RouteTableAssociation(
f " { environment } -public-rta- { i + 1} " ,
route_table_id = public_route_table.id,
private_route_table = aws.ec2.RouteTable(
f " { environment } -private-rt" ,
tags = { ** tags, "Name" : f " { environment } -private-rt" },
private_route = aws.ec2.Route(
f " { environment } -private-route" ,
route_table_id = private_route_table.id,
destination_cidr_block = "0.0.0.0/0" ,
nat_gateway_id = nat_gateway.id,
# Associate private subnets with private route table
for i, subnet in enumerate (private_subnets):
aws.ec2.RouteTableAssociation(
f " { environment } -private-rta- { i + 1} " ,
route_table_id = private_route_table.id,
alb_sg = aws.ec2.SecurityGroup(
description = "Security group for Application Load Balancer" ,
"cidr_blocks" : [ "0.0.0.0/0" ],
"cidr_blocks" : [ "0.0.0.0/0" ],
"cidr_blocks" : [ "0.0.0.0/0" ],
tags = { ** tags, "Name" : f " { environment } -alb-sg" },
app_sg = aws.ec2.SecurityGroup(
description = "Security group for application servers" ,
"security_groups" : [alb_sg.id],
"cidr_blocks" : [ "0.0.0.0/0" ],
tags = { ** tags, "Name" : f " { environment } -app-sg" },
db_sg = aws.ec2.SecurityGroup(
description = "Security group for database" ,
"security_groups" : [app_sg.id],
"cidr_blocks" : [ "0.0.0.0/0" ],
tags = { ** tags, "Name" : f " { environment } -db-sg" },
db_subnet_group = aws.rds.SubnetGroup(
f " { environment } -db-subnet-group" ,
subnet_ids = [subnet.id for subnet in database_subnets],
tags = { ** tags, "Name" : f " { environment } -db-subnet-group" },
database = aws.rds.Instance(
f " { environment } -database" ,
instance_class = "db.t3.medium" ,
password = config.require_secret( "dbPassword" ),
vpc_security_group_ids = [db_sg.id],
db_subnet_group_name = db_subnet_group.name,
backup_retention_period = 7 ,
backup_window = "03:00-04:00" ,
maintenance_window = "sun:04:00-sun:05:00" ,
skip_final_snapshot = False ,
final_snapshot_identifier = f " { environment } -db-final-snapshot" ,
tags = { ** tags, "Name" : f " { environment } -database" },
# S3 Buckets for Data Lake
data_lake_bucket = aws.s3.BucketV2(
f " { environment } -data-lake" ,
bucket = f " { environment } -data-lake- { pulumi.get_stack() } " ,
aws.s3.BucketVersioningV2(
f " { environment } -data-lake-versioning" ,
bucket = data_lake_bucket.id,
versioning_configuration = {
aws.s3.BucketServerSideEncryptionConfigurationV2(
f " { environment } -data-lake-encryption" ,
bucket = data_lake_bucket.id,
"apply_server_side_encryption_by_default" : {
"sse_algorithm" : "AES256" ,
# Lifecycle rules for data lake
aws.s3.BucketLifecycleConfigurationV2(
f " { environment } -data-lake-lifecycle" ,
bucket = data_lake_bucket.id,
"id" : "archive-old-data" ,
"storage_class" : "STANDARD_IA" ,
"storage_class" : "GLACIER" ,
"id" : "delete-temp-data" ,
# EMR Cluster for Big Data Processing
f " { environment } -emr-role" ,
assume_role_policy = json.dumps({
"Service" : "elasticmapreduce.amazonaws.com" ,
"Action" : "sts:AssumeRole" ,
aws.iam.RolePolicyAttachment(
f " { environment } -emr-policy" ,
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole" ,
emr_ec2_role = aws.iam.Role(
f " { environment } -emr-ec2-role" ,
assume_role_policy = json.dumps({
"Service" : "ec2.amazonaws.com" ,
"Action" : "sts:AssumeRole" ,
aws.iam.RolePolicyAttachment(
f " { environment } -emr-ec2-policy" ,
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role" ,
emr_ec2_instance_profile = aws.iam.InstanceProfile(
f " { environment } -emr-ec2-profile" ,
emr_cluster = aws.emr.Cluster(
release_label = "emr-6.15.0" ,
applications = [ "Spark" , "Hadoop" , "Hive" , "JupyterHub" ],
termination_protection = False ,
keep_job_flow_alive_when_no_steps = True ,
scale_down_behavior = "TERMINATE_AT_TASK_COMPLETION" ,
service_role = emr_role.arn,
"subnet_id" : private_subnets[ 0 ].id,
"emr_managed_master_security_group" : app_sg.id,
"emr_managed_slave_security_group" : app_sg.id,
"instance_profile" : emr_ec2_instance_profile.arn,
"key_name" : config.get( "keyPairName" ),
"instance_type" : "m5.xlarge" ,
"instance_type" : "m5.xlarge" ,
configurations = json.dumps([
"Classification" : "spark-defaults" ,
"spark.dynamicAllocation.enabled" : "true" ,
"spark.executor.memory" : "4g" ,
"spark.executor.cores" : "2" ,
log_uri = Output.concat( "s3://" , data_lake_bucket.bucket, "/emr-logs/" ),
tags = { ** tags, "Name" : f " { environment } -emr" },
# Lambda Functions for Data Processing
lambda_role = aws.iam.Role(
f " { environment } -lambda-role" ,
assume_role_policy = json.dumps({
"Service" : "lambda.amazonaws.com" ,
"Action" : "sts:AssumeRole" ,
# Attach policies to Lambda role
aws.iam.RolePolicyAttachment(
f " { environment } -lambda-basic" ,
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" ,
# Lambda policy for S3 and DynamoDB access
lambda_policy = aws.iam.RolePolicy(
f " { environment } -lambda-policy" ,
policy = Output.all(data_lake_bucket.arn).apply(
lambda args: json.dumps({
"Resource" : f " { args[ 0 ] } /*" ,
data_processor = aws.lambda_.Function(
f " { environment } -data-processor" ,
"ENVIRONMENT" : environment,
"DATA_BUCKET" : data_lake_bucket.bucket,
code = pulumi.AssetArchive({
"." : pulumi.FileArchive( "./lambda/data-processor" ),
tags = { ** tags, "Name" : f " { environment } -data-processor" },
# EventBridge Rule for scheduled processing
processing_schedule = aws.cloudwatch.EventRule(
f " { environment } -processing-schedule" ,
schedule_expression = "rate(1 hour)" ,
aws.cloudwatch.EventTarget(
f " { environment } -processing-target" ,
rule = processing_schedule.name,
f " { environment } -processing-permission" ,
action = "lambda:InvokeFunction" ,
function = data_processor.name,
principal = "events.amazonaws.com" ,
source_arn = processing_schedule.arn,
glue_database = aws.glue.CatalogDatabase(
f " { environment } -glue-db" ,
name = f " { environment } _data_catalog" ,
description = f "Data catalog for { environment } environment" ,
athena_workgroup = aws.athena.Workgroup(
f " { environment } -athena-workgroup" ,
name = f " { environment } -workgroup" ,
"result_configuration" : {
"output_location" : Output.concat( "s3://" , data_lake_bucket.bucket, "/athena-results/" ),
"enforce_workgroup_configuration" : True ,
"publish_cloudwatch_metrics_enabled" : True ,
kinesis_stream = aws.kinesis.Stream(
name = f " { environment } -data-stream" ,
tags = { ** tags, "Name" : f " { environment } -stream" },
# Kinesis Firehose for S3 delivery
firehose_role = aws.iam.Role(
f " { environment } -firehose-role" ,
assume_role_policy = json.dumps({
"Service" : "firehose.amazonaws.com" ,
"Action" : "sts:AssumeRole" ,
firehose_policy = aws.iam.RolePolicy(
f " { environment } -firehose-policy" ,
policy = Output.all(data_lake_bucket.arn, kinesis_stream.arn).apply(
lambda args: json.dumps({
"kinesis:DescribeStream" ,
"kinesis:GetShardIterator" ,
firehose_delivery_stream = aws.kinesis.FirehoseDeliveryStream(
f " { environment } -firehose" ,
name = f " { environment } -s3-delivery" ,
destination = "extended_s3" ,
kinesis_source_configuration = {
"kinesis_stream_arn" : kinesis_stream.arn,
"role_arn" : firehose_role.arn,
extended_s3_configuration = {
"role_arn" : firehose_role.arn,
"bucket_arn" : data_lake_bucket.arn,
"prefix" : "raw-data/year=! {timestamp : yyyy} /month=! {timestamp : MM} /day=! {timestamp :dd } /" ,
"error_output_prefix" : "error-data/" ,
"buffering_interval" : 60 ,
"compression_format" : "GZIP" ,
"data_format_conversion_configuration" : {
"output_format_configuration" : {
"schema_configuration" : {
"database_name" : glue_database.name,
"table_name" : "raw_events" ,
"role_arn" : firehose_role.arn,
tags = { ** tags, "Name" : f " { environment } -firehose" },
export( "database_endpoint" , database.endpoint)
export( "data_lake_bucket" , data_lake_bucket.bucket)
export( "kinesis_stream_name" , kinesis_stream.name)
export( "emr_cluster_id" , emr_cluster.id)
export( "athena_workgroup" , athena_workgroup.name)
# Component Resources for better organization
class DataPlatform ( pulumi . ComponentResource ):
def __init__ (self, name, opts = None ):
super (). __init__ ( "custom:infrastructure:DataPlatform" , name, None , opts)
# All resources created above would be children of this component
# This provides better organization in the Pulumi console
data_platform = DataPlatform( f " { environment } -data-platform" )