AWSTemplateFormatVersion: "2010-09-09" Description: "This is the template for creating a solution to analyze Amazon Connect ctr logs using Amazon Athena as described in the blog" Parameters: ProjectName: Type: "String" Default: "test123" Description: Enter an all-lowercase single-word name which is between 3 and 56 chars long. An S3 bucket will be created with this name to store all collected data. MinLength : 3 MaxLength : 42 AllowedPattern : "^[a-z0-9]*$" CreateANewGlueDatabaseAndS3BucketForTheProject: Type: "String" Default: "Create a new AWS Glue database and Amazon S3 data collection bucket for this project" AllowedValues: - "Create a new AWS Glue database and Amazon S3 data collection bucket for this project" - "Use existing AWS Glue database and Amazon S3 data collection bucket created for this project" Description: Choose whether to create a AWS Glue database and an Amazon S3 bucket for this project. If there is an existing bucket named '{ProjectName}' and a database named '{ProjectName}', then choose to reuse them. Conditions: CreateCommonGlueDatabaseAndDestinationBucket: !Equals - !Ref CreateANewGlueDatabaseAndS3BucketForTheProject - Create a new AWS Glue database and Amazon S3 data collection bucket for this project Resources: DataDestinationBucket: Type: "AWS::S3::Bucket" Condition: CreateCommonGlueDatabaseAndDestinationBucket DeletionPolicy: Retain Properties: BucketName: !Sub "${ProjectName}" GlueDatabase: Type: "AWS::Glue::Database" Condition: CreateCommonGlueDatabaseAndDestinationBucket DependsOn: DataDestinationBucket DeletionPolicy: Retain Properties: DatabaseInput: Name: !Sub "${ProjectName}" CatalogId: !Ref "AWS::AccountId" DbAndBucketCreationWaitHandle: Condition: CreateCommonGlueDatabaseAndDestinationBucket DependsOn: GlueDatabase Type: "AWS::CloudFormation::WaitConditionHandle" WaitHandle: Type: "AWS::CloudFormation::WaitConditionHandle" DbAndBucketWaitCondition: Type: "AWS::CloudFormation::WaitCondition" Properties: Handle: !If [CreateCommonGlueDatabaseAndDestinationBucket, !Ref DbAndBucketCreationWaitHandle, !Ref WaitHandle] Timeout: "1" Count: 0 KinesisStream: Type: "AWS::Kinesis::Stream" DependsOn: "DbAndBucketWaitCondition" Properties: Name: !Sub "${ProjectName}-Ctr" StreamModeDetails: StreamMode: "ON_DEMAND" KinesisFirehoseServiceRoleForCtr: Type: "AWS::IAM::Role" Properties: Path: "/service-role/" RoleName: !Sub "${ProjectName}-KFH-Ctr" AssumeRolePolicyDocument: !Sub | { "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Principal":{"Service":"firehose.amazonaws.com"}, "Action":"sts:AssumeRole" } ] } MaxSessionDuration: 3600 ManagedPolicyArns: - !Ref KinesisFirehoseServicePolicyForCtr KinesisFirehoseServicePolicyForCtr: Type: "AWS::IAM::ManagedPolicy" DependsOn: GlueTableForCtr Properties: ManagedPolicyName: !Sub "${ProjectName}-KFH-Ctr" Path: "/service-role/" PolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Action": [ "glue:GetTable", "glue:GetTableVersion", "glue:GetTableVersions" ], "Resource": [ "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog", "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ProjectName}", "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ProjectName}/connect_ctr" ] }, { "Sid": "", "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::${ProjectName}", "arn:aws:s3:::${ProjectName}/*" ] }, { "Sid": "", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/${ProjectName}-ctr:log-stream:*" ] }, { "Sid": "", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${ProjectName}-Ctr" } ] } KinesisFirehoseDeliveryStreamForCtrFromKinesisStream: Type: "AWS::KinesisFirehose::DeliveryStream" DependsOn: - GlueTableForCtr Properties: DeliveryStreamName: !Sub "${ProjectName}-FromDataStream-Ctr" DeliveryStreamType: "KinesisStreamAsSource" KinesisStreamSourceConfiguration: KinesisStreamARN: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${ProjectName}-Ctr" RoleARN: !GetAtt KinesisFirehoseServiceRoleForCtr.Arn ExtendedS3DestinationConfiguration: BucketARN: !Sub "arn:aws:s3:::${ProjectName}" BufferingHints: SizeInMBs: 128 IntervalInSeconds: 60 CloudWatchLoggingOptions: Enabled: true LogGroupName: !Sub "/aws/kinesisfirehose/${ProjectName}" LogStreamName: "S3Delivery" CompressionFormat: "UNCOMPRESSED" DataFormatConversionConfiguration: SchemaConfiguration: RoleARN: !GetAtt KinesisFirehoseServiceRoleForCtr.Arn DatabaseName: !Sub "${ProjectName}" TableName: "connect_ctr" Region: !Ref AWS::Region VersionId: "LATEST" InputFormatConfiguration: Deserializer: OpenXJsonSerDe: {} OutputFormatConfiguration: Serializer: ParquetSerDe: {} Enabled: true EncryptionConfiguration: NoEncryptionConfig: "NoEncryption" Prefix: "ctr-base/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/" ErrorOutputPrefix: "ctr-erroroutputbase/!{timestamp:yyy/MM/dd}/!{firehose:random-string}/!{firehose:error-output-type}/" RoleARN: !GetAtt KinesisFirehoseServiceRoleForCtr.Arn ProcessingConfiguration: Enabled: false S3BackupMode: "Disabled" KinesisFirehoseDeliveryStreamForCtrDirectPut: Type: "AWS::KinesisFirehose::DeliveryStream" DependsOn: - GlueTableForCtr Properties: DeliveryStreamName: !Sub "${ProjectName}-Ctr" DeliveryStreamType: "DirectPut" ExtendedS3DestinationConfiguration: BucketARN: !Sub "arn:aws:s3:::${ProjectName}" BufferingHints: SizeInMBs: 128 IntervalInSeconds: 60 CloudWatchLoggingOptions: Enabled: true LogGroupName: !Sub "/aws/kinesisfirehose/${ProjectName}" LogStreamName: "S3Delivery" CompressionFormat: "UNCOMPRESSED" DataFormatConversionConfiguration: SchemaConfiguration: RoleARN: !GetAtt KinesisFirehoseServiceRoleForCtr.Arn DatabaseName: !Sub "${ProjectName}" TableName: "connect_ctr" Region: !Ref AWS::Region VersionId: "LATEST" InputFormatConfiguration: Deserializer: OpenXJsonSerDe: {} OutputFormatConfiguration: Serializer: ParquetSerDe: {} Enabled: true EncryptionConfiguration: NoEncryptionConfig: "NoEncryption" Prefix: "ctr-base/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/" ErrorOutputPrefix: "ctr-erroroutputbase/!{timestamp:yyy/MM/dd}/!{firehose:random-string}/!{firehose:error-output-type}/" RoleARN: !GetAtt KinesisFirehoseServiceRoleForCtr.Arn ProcessingConfiguration: Enabled: false S3BackupMode: "Disabled" GlueTableForCtr: Type: "AWS::Glue::Table" DependsOn: "DbAndBucketWaitCondition" Properties: DatabaseName: !Sub "${ProjectName}" CatalogId: !Ref "AWS::AccountId" TableInput: Owner: "hadoop" TableType: "EXTERNAL_TABLE" Parameters: EXTERNAL: "TRUE" "parquet.compression": "SNAPPY" "projection.year.type": "integer" "projection.year.range": "2022,2024" "projection.month.type": "integer" "projection.month.range": "01,12" "projection.month.digits": "2" "projection.day.type": "integer" "projection.day.range": "01,31" "projection.day.digits": "2" "projection.enabled": "true" transient_lastDdlTime: "1628108573" StorageDescriptor: Columns: - Name: "awsaccountid" Type: "string" - Name: "awscontacttracerecordformatversion" Type: "string" - Name: "agent" Type: "struct,username:string>" - Name: "agentconnectionattempts" Type: "int" - Name: "attributes" Type: "string" - Name: "campaign" Type: "struct" - Name: "channel" Type: "string" - Name: "connectedtosystemtimestamp" Type: "string" - Name: "contactid" Type: "string" - Name: "customerendpoint" Type: "struct" - Name: "disconnectreason" Type: "string" - Name: "disconnecttimestamp" Type: "string" - Name: "initialcontactid" Type: "string" - Name: "initiationmethod" Type: "string" - Name: "initiationtimestamp" Type: "string" - Name: "instancearn" Type: "string" - Name: "lastupdatetimestamp" Type: "string" - Name: "mediastreams" Type: "array>" - Name: "nextcontactid" Type: "string" - Name: "previouscontactid" Type: "string" - Name: "queue" Type: "struct" - Name: "recording" Type: "struct" - Name: "recordings" Type: "array>" - Name: "scheduledtimestamp" Type: "string" - Name: "systemendpoint" Type: "struct" - Name: "transfercompletedtimestamp" Type: "string" - Name: "transferredtoendpoint" Type: "struct" Location: !Sub "s3://${ProjectName}/ctr-base" InputFormat: "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" OutputFormat: "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" Compressed: false NumberOfBuckets: -1 SerdeInfo: SerializationLibrary: "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" Parameters: "serialization.format": "1" Parameters: {} SkewedInfo: SkewedColumnValueLocationMaps: {} StoredAsSubDirectories: false PartitionKeys: - Name: "year" Type: "int" - Name: "month" Type: "int" - Name: "day" Type: "int" Retention: 0 Name: "connect_ctr" LambdaForIngestingOldCtrDataBasicPolicy: Type: "AWS::IAM::ManagedPolicy" DependsOn: DbAndBucketWaitCondition Properties: ManagedPolicyName: !Sub "${ProjectName}-Lambda-IngestingOldCtrData" Path: "/" PolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*", "Effect": "Allow", "Sid": "WriteCloudWatchLogs" }, { "Sid": "AccessS3", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::${ProjectName}/Old-Ctr-Data/*" }, { "Sid": "AccessFirehose", "Effect": "Allow", "Action": [ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Resource": [ "${KinesisFirehoseDeliveryStreamForCtrDirectPut.Arn}" ] } ] } LambdaRoleForIngestingOldCtrData: Type: "AWS::IAM::Role" Properties: Path: "/service-role/" RoleName: !Sub "${ProjectName}-Lambda-IngestingOldCtrData" AssumeRolePolicyDocument: !Sub | { "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Principal":{ "Service":"lambda.amazonaws.com" }, "Action":"sts:AssumeRole" } ] } MaxSessionDuration: 3600 ManagedPolicyArns: - !Ref LambdaForIngestingOldCtrDataBasicPolicy LambdaFunctionForIngestingOldCtrData: Type: "AWS::Lambda::Function" Properties: Description: "Ingests the old ctr data with the event coming from event bridge" Environment: Variables: DestinationKinesisFirehoseName: !Sub "${ProjectName}-Ctr" FunctionName: !Sub "${ProjectName}-IngestOldCtrData" Handler: "index.lambda_handler" Code: ZipFile: | import base64 import json import boto3 import urllib.parse import os print('Loading function') s3 = boto3.resource('s3') DESTINATION_KFH_NAME = os.environ['DestinationKinesisFirehoseName'] firehose = boto3.client('firehose') class FileToBeIgnored(Exception): """Raised when the input is to be ignored - e.g. we ignore files other than json """ def __init__(self, key, message=" will not be processed as is not json or it is in an ignored directory"): self.key = key self.message = message super().__init__(self.message) def __str__(self): return f'{self.key} -> {self.message}' class FileProcessingError(Exception): """Raised when we try to process the file """ def __init__(self, key, message="Error during file processing"): self.key = key self.message = message super().__init__(self.message) def __str__(self): return f'{self.key} -> {self.message}' def processTheDataRetrievedFromFile(dataToProcess): firehose.put_record(DeliveryStreamName=DESTINATION_KFH_NAME, Record={'Data': dataToProcess}) return def readRecordFromS3AndProcess(s3Event): ## Get the location of the log file from S3 event key = urllib.parse.unquote_plus(s3Event['s3']['object']['key'], encoding='utf-8') if (key[-4:] != 'json'): raise FileToBeIgnored (key) bucket = s3.Bucket(s3Event['s3']['bucket']['name']) encodedFileContent = bucket.Object(key).get()['Body'].read() ## Get the log content from the file in S3. try: fileContent = encodedFileContent.decode() except Exception as e: print (e) raise FileProcessingError(key) return processTheDataRetrievedFromFile(fileContent) def processKFHRecord(eachS3Record): try: transformedRecords = readRecordFromS3AndProcess(eachS3Record) return {'result': 'Processed'} except FileToBeIgnored as eFileToBeIgnored: print (eFileToBeIgnored) except FileProcessingError as eFileProcessingError: print (eFileProcessingError) except Exception as e: print ('Unknown Exception') print (e) return {'result': 'Dropped'} def lambda_handler(event, context): print (json.dumps(event, indent=2)) for eachKFHRecord in event['Records'] : print (processKFHRecord(eachKFHRecord)) MemorySize: 128 Role: !GetAtt LambdaRoleForIngestingOldCtrData.Arn Runtime: "python3.9" Timeout: 180 TracingConfig: Mode: "PassThrough" LambdaPermissionForIngestingOldCtrData: Type: "AWS::Lambda::Permission" Properties: Action: "lambda:InvokeFunction" FunctionName: !GetAtt LambdaFunctionForIngestingOldCtrData.Arn Principal: "events.amazonaws.com" SourceArn: !GetAtt EventsRuleForIngestingOldCtrData.Arn EventsRuleForIngestingOldCtrData: Type: "AWS::Events::Rule" Properties: Name: !Sub "${ProjectName}-OldCtrData" EventPattern: !Sub | { "source": ["aws.s3"], "detail-type": ["Object Created"], "detail": { "bucket": { "name": ["${ProjectName}"] }, "object": { "key": [{ "prefix": "Old-Ctr-Data/" }] } } } State: "ENABLED" Targets: - Arn: !GetAtt LambdaFunctionForIngestingOldCtrData.Arn Id: "Id1a015866-3b34-40f2-97b7-685170e29dda" InputTransformer: InputPathsMap: bucketname: "$.detail.bucket.name" key: "$.detail.object.key" InputTemplate: | { "Records":[ { "s3": { "object":{ "key":"" }, "bucket":{ "name":"" } } }] } EventBusName: "default"