The bigger our cloud environments are getting, the harder it is to manage all security alerts and vulnerabilities of our workloads and services. For this purpose, we can utilize a tool called Security information and event management (SIEM), which provide real-time analysis of security alerts. AWS offers the service OpenSearch for that, a highly scalable system for providing fast access and response to large volumes of data like logs with integrated dashboards and analytics. OpenSearch is a community-driven, open-source fork of Elasticsearch and Kibana. It was introduced in 2021 after the Apache 2.0-licensed Elasticsearch source code was dual-licensed under the Elastic License and SSPL 1.0.
In this article, we are creating a process to ingest security alerts and log data into OpenSearch in near-real time to be able to react to compromises in our AWS environment.
Table of Contents
💡 Create Lambda function
🚚 Create Kinesis data & delivery stream
💾 CloudWatch subscription filter
💻 OpenSearch Dashboard
🧰 CloudFormation template
Prerequisites
OpenSearch service domain
You already need to have an OpenSearch Service domain up and running. For more information on the setup, you can follow the official AWS documentation https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gsg.html
I have created a minimal setup using the smallest instance type for this article https://docs.aws.amazon.com/opensearch-service/latest/developerguide/supported-instance-types.html
CloudWatch log group
The process is based on logs and alerts aggregated in a CloudWatch log group in the security account. This is the same account that has the OpenSearch Service domain deployed. In an earlier article, I explained, how to aggregate GuardDuty findings in a centralized log group. You are good to go if you deploy the CloudFormation template from that article.
💡 Create Lambda function
In the first step, we want to create a new lambda function that extracts individual log events from records sent by Cloudwatch Logs subscription filters. The easiest way is to use a blueprint created by AWS. They offer a function called Process CloudWatch logs sent to Kinesis Firehose, which is either based on python or node js. Choose the programming language you are most comfortable in, so you can adjust the function if needed.
Make sure that the function role has a trust relationship created with lambda and that the attached policy allows the creation of CloudWatch log groups and log events:
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:<region>:<accountID>:*",
"Effect": "Allow"
},
{
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:<region>:<accountID>:log-group:/aws/lambda/<functionName>:*:*",
"Effect": "Allow"
}
]
}
The timeout of the function should be greater than 60 seconds to avoid early termination. This can be changed after creation in the Configurations tab of the function:
As the standard blueprint is not easy to deploy via CloudFormation, I have created an alternative lightweight version that you could use:
import base64
import gzip
import json
import os
def transformLogEvent(log_event):
return log_event['message'] + os.linesep
def processRecords(event):
for r in event['records']:
data = json.loads(gzip.decompress(base64.b64decode(r['data'])))
if data['messageType'] == 'CONTROL_MESSAGE':
yield {'result': 'Dropped', 'recordId': r['recordId']}
elif data['messageType'] == 'DATA_MESSAGE':
joinedData = ''.join([transformLogEvent(e) for e in data['logEvents']])
dataBytes = joinedData.encode('utf-8')
encodedData = base64.b64encode(dataBytes).decode('utf-8')
yield {'data': encodedData, 'result': 'Ok', 'recordId': r['recordId']}
else:
yield {'result': 'Dropped', 'recordId': r['recordId']}
def lambda_handler(event, context):
records = list(processRecords(event))
print('%d records processed, %d returned as Ok' % (
len(event['records']),
len([r for r in records if r['result'] != 'Dropped'])))
return {'records': records}
Join our community of cloud security professionals. 🔐
Subscribe to our newsletter🚚 Create Kinesis data & delivery stream
In the next step, we want to create a new provisioned Kinesis Data Stream so we can consume logs from CloudWatch:
Afterward, we need to connect the Kinesis Data Stream to our OpenSearch service via a Kinesis Data Firehose. We choose the Delivery Stream option in Kinesis and create a new one. Make sure to choose the newly created Data Stream as the source option.
As we want to transform the events before delivery, we need to choose the CloudWatch transformation lambda function created earlier:
Lastly, we configure the OpenSearch service domain and create a new S3 bucket for all failed data.
We don't need to create a new IAM role as the setup will automatically create one for us. We need to note down the ARN of the automatically created role, as we need it in the next step while setting up access to OpenSearch. We can find the ARN in the configuration of the delivery stream after the creation
Open the IAM role and copy the arn, it should be in the following format: arn:aws:iam::<accountID>:role/OpenSearchKinesisFirehoseServiceRole
. Afterward, we log in to OpenSearch and navigate to Security —> Roles —> all_access —> Mapped users and add the ARN there:
💾 CloudWatch subscription filter
Next, we connect the CloudWatch log group with our Kinesis data stream to deliver all log information from this group to OpenSearch. We choose the desired group and add a new Kinesis subscription filter:
To grant CloudWatch permission to put data into our Kinesis data stream, we create a new role with the below policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "kinesis:PutRecord",
"Resource": "arn:aws:kinesis:<region>:<accountID>:stream/OpenSearchDataStream",
"Effect": "Allow"
}
]
}
Always ensure you create a trusting relationship between the role and the AWS service that should assume the role. Otherwise, the delivery would fail.
💻 OpenSearch Dashboard
Let's create a few sample findings to test the connection:
root@alexanderhose:~$ aws guardduty create-sample-findings --detector-id <detectorID> --finding-types Backdoor:EC2/DenialOfService.Tcp
In the end, we should see the GuardDuty findings in the OpenSearch dashboard 🎉
Now we can configure dashboards and alerts in OpenSearch to enable centralized control of security incidents in our AWS environment.
🧰 CloudFormation template
If you want to automate the deployment of this setup, you can download the CloudFromation template and deploy it. After creation, please check the output of the CloudFormation template as the role ARN will be printed, which needs to be added to OpenSearch.
{
"Parameters": {
"OpenSeachIndex": {
"Description": "Name of the OpenSearch index",
"Default": "guardduty",
"Type": "String",
"MinLength": "1",
"MaxLength": "255"
},
"OpenSeachDomainARN": {
"Description": "Name of the OpenSearch Domain arn",
"Default": "arn:aws:es:<region>:<accountID>:domain/<domainName>",
"Type": "String",
"MinLength": "1",
"MaxLength": "255"
},
"LogGroup": {
"Description": "Name of the CloudWatch group to stream to OpenSearch",
"Default": "/aws/events/GuardDuty",
"Type": "String",
"MinLength": "1",
"MaxLength": "255"
}
},
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"OpenSearchDataStream": {
"Type": "AWS::Kinesis::Stream",
"Properties": {
"Name": "OpenSearchDataStream"
}
},
"OpenSearchDeliveryStreamLogGroup": {
"Type": "AWS::Logs::LogGroup",
"Properties": {
"LogGroupName": "/aws/kinesisfirehose/OpenSearchDeliveryStream"
}
},
"OpenSearchDeliveryStreamDestinationDeliveryLogGroup": {
"Type": "AWS::Logs::LogStream",
"Properties": {
"LogGroupName": "/aws/kinesisfirehose/OpenSearchDeliveryStream",
"LogStreamName": "DestinationDelivery"
},
"DependsOn": "OpenSearchDeliveryStreamLogGroup"
},
"OpenSearchDeliveryStreamBackupDeliveryLogGroup": {
"Type": "AWS::Logs::LogStream",
"Properties": {
"LogGroupName": "/aws/kinesisfirehose/OpenSearchDeliveryStream",
"LogStreamName": "BackupDelivery"
},
"DependsOn": "OpenSearchDeliveryStreamLogGroup"
},
"OpenSearchKinesisFirehoseServiceRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"RoleName": "OpenSearchKinesisFirehoseServiceRole",
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
},
"Policies": [
{
"PolicyName": "OpenSearchKinesisFirehoseServicePolicy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
],
"Resource": [
{
"Fn::GetAtt": [
"LogBackupS3Bucket",
"Arn"
]
},
{
"Fn::Join": [
"",
[
{
"Fn::GetAtt": [
"LogBackupS3Bucket",
"Arn"
]
},
"/*"
]
]
}
]
},
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
"lambda:GetFunctionConfiguration"
],
"Resource": {
"Fn::Join": [
"",
[
{
"Fn::GetAtt": [
"CloudWatchTransformFunction",
"Arn"
]
},
":$LATEST"
]
]
}
},
{
"Effect": "Allow",
"Action": [
"ec2:DescribeVpcs",
"ec2:DescribeVpcAttribute",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups",
"ec2:DescribeNetworkInterfaces",
"ec2:CreateNetworkInterface",
"ec2:CreateNetworkInterfacePermission",
"ec2:DeleteNetworkInterface"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"es:DescribeElasticsearchDomain",
"es:DescribeElasticsearchDomains",
"es:DescribeElasticsearchDomainConfig",
"es:ESHttpPost",
"es:ESHttpPut"
],
"Resource": [
{
"Ref": "OpenSeachDomainARN"
},
{
"Fn::Join": [
"",
[
{
"Ref": "OpenSeachDomainARN"
},
"/*"
]
]
}
]
},
{
"Effect": "Allow",
"Action": [
"es:ESHttpGet"
],
"Resource": [
{
"Fn::Join": [
"",
[
{
"Ref": "OpenSeachDomainARN"
},
"/_all/_settings"
]
]
},
{
"Fn::Join": [
"",
[
{
"Ref": "OpenSeachDomainARN"
},
"/_cluster/stats"
]
]
},
{
"Fn::Join": [
"",
[
{
"Ref": "OpenSeachDomainARN"
},
"/_nodes"
]
]
},
{
"Fn::Join": [
"",
[
{
"Ref": "OpenSeachDomainARN"
},
"/_nodes/*/stats"
]
]
},
{
"Fn::Join": [
"",
[
{
"Ref": "OpenSeachDomainARN"
},
"/_stats"
]
]
},
{
"Fn::Join": [
"",
[
{
"Ref": "OpenSeachDomainARN"
},
"/guardduty/_stats"
]
]
}
]
},
{
"Effect": "Allow",
"Action": [
"logs:PutLogEvents"
],
"Resource": {
"Fn::Join": [
"",
[
{
"Fn::GetAtt": [
"OpenSearchDeliveryStreamLogGroup",
"Arn"
]
},
":*"
]
]
}
},
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:ListShards"
],
"Resource": {
"Fn::GetAtt": [
"OpenSearchDataStream",
"Arn"
]
}
}
]
}
}
]
}
},
"OpenSearchFirehoseDeliveryStream": {
"Type": "AWS::KinesisFirehose::DeliveryStream",
"Properties": {
"DeliveryStreamName": "OpenSearchDeliveryStream",
"DeliveryStreamType": "KinesisStreamAsSource",
"KinesisStreamSourceConfiguration": {
"KinesisStreamARN": {
"Fn::GetAtt": [
"OpenSearchDataStream",
"Arn"
]
},
"RoleARN": {
"Fn::GetAtt": [
"OpenSearchKinesisFirehoseServiceRole",
"Arn"
]
}
},
"ElasticsearchDestinationConfiguration": {
"IndexName": {
"Ref": "OpenSeachIndex"
},
"IndexRotationPeriod": "NoRotation",
"BufferingHints": {
"IntervalInSeconds": 60,
"SizeInMBs": 5
},
"RetryOptions": {
"DurationInSeconds": 300
},
"S3BackupMode": "AllDocuments",
"S3Configuration": {
"RoleARN": {
"Fn::GetAtt": [
"OpenSearchKinesisFirehoseServiceRole",
"Arn"
]
},
"BucketARN": {
"Fn::GetAtt": [
"LogBackupS3Bucket",
"Arn"
]
},
"Prefix": "firehose/",
"ErrorOutputPrefix": "",
"BufferingHints": {
"SizeInMBs": 5,
"IntervalInSeconds": 60
},
"CompressionFormat": "UNCOMPRESSED",
"EncryptionConfiguration": {
"NoEncryptionConfig": "NoEncryption"
},
"CloudWatchLoggingOptions": {
"Enabled": true,
"LogGroupName": {
"Fn::Select": [
"6",
{
"Fn::Split": [
":",
{
"Fn::GetAtt": [
"OpenSearchDeliveryStreamLogGroup",
"Arn"
]
}
]
}
]
},
"LogStreamName": "BackupDelivery"
}
},
"ProcessingConfiguration": {
"Enabled": true,
"Processors": [
{
"Type": "Lambda",
"Parameters": [
{
"ParameterName": "LambdaArn",
"ParameterValue": {
"Fn::Join": [
"",
[
{
"Fn::GetAtt": [
"CloudWatchTransformFunction",
"Arn"
]
},
":$LATEST"
]
]
}
},
{
"ParameterName": "NumberOfRetries",
"ParameterValue": "3"
},
{
"ParameterName": "RoleArn",
"ParameterValue": {
"Fn::GetAtt": [
"OpenSearchKinesisFirehoseServiceRole",
"Arn"
]
}
},
{
"ParameterName": "BufferSizeInMBs",
"ParameterValue": "3"
},
{
"ParameterName": "BufferIntervalInSeconds",
"ParameterValue": "60"
}
]
}
]
},
"CloudWatchLoggingOptions": {
"Enabled": true,
"LogGroupName": {
"Fn::Select": [
"6",
{
"Fn::Split": [
":",
{
"Fn::GetAtt": [
"OpenSearchDeliveryStreamLogGroup",
"Arn"
]
}
]
}
]
},
"LogStreamName": "DestinationDelivery"
},
"DomainARN": {
"Ref": "OpenSeachDomainARN"
},
"RoleARN": {
"Fn::GetAtt": [
"OpenSearchKinesisFirehoseServiceRole",
"Arn"
]
}
}
},
"DependsOn": [
"OpenSearchDataStream",
"OpenSearchDeliveryStreamLogGroup",
"CloudWatchTransformFunction",
"LogBackupS3Bucket",
"OpenSearchKinesisFirehoseServiceRole"
]
},
"CloudWatchTransformFunctionLogGroup": {
"Type": "AWS::Logs::LogGroup",
"Properties": {
"LogGroupName": "/aws/lambda/CloudWatchTransformFunction"
}
},
"CloudWatchTransformFunctionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"RoleName": "CloudWatchTransformFunctionRole",
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
},
"Action": [
"sts:AssumeRole"
]
}
]
},
"Policies": [
{
"PolicyName": "AWSLambdaBasicExecutionRole",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": {
"Fn::Sub": "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*"
}
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": {
"Fn::Join": [
"",
[
{
"Fn::GetAtt": [
"CloudWatchTransformFunctionLogGroup",
"Arn"
]
},
":*"
]
]
}
}
]
}
}
]
},
"DependsOn": "CloudWatchTransformFunctionLogGroup"
},
"CloudWatchTransformFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"FunctionName": "CloudWatchTransformFunction",
"Handler": "index.lambda_handler",
"MemorySize": 128,
"Description": "Lambda function to transform CloudWatch logs",
"Architectures": [
"x86_64"
],
"Role": {
"Fn::GetAtt": [
"CloudWatchTransformFunctionRole",
"Arn"
]
},
"Code": {
"ZipFile": {
"Fn::Join": [
"",
[
"import base64\n",
"import gzip\n",
"import json\n",
"import os\n",
"def transformLogEvent(log_event):\n",
" return log_event['message'] + os.linesep\n",
"def processRecords(event):\n",
" for r in event['records']:\n",
" data = json.loads(gzip.decompress(base64.b64decode(r['data'])))\n",
" if data['messageType'] == 'CONTROL_MESSAGE':\n",
" yield {'result': 'Dropped', 'recordId': r['recordId']}\n",
" elif data['messageType'] == 'DATA_MESSAGE':\n",
" joinedData = ''.join([transformLogEvent(e) for e in data['logEvents']])\n",
" dataBytes = joinedData.encode('utf-8')\n",
" encodedData = base64.b64encode(dataBytes).decode('utf-8')\n",
" yield {'data': encodedData, 'result': 'Ok', 'recordId': r['recordId']}\n",
" else:\n",
" yield {'result': 'Dropped', 'recordId': r['recordId']}\n",
"def lambda_handler(event, context):\n",
" records = list(processRecords(event))\n",
" print('%d input records, %d returned as Ok or ProcessingFailed' % (\n",
" len(event['records']),\n",
" len([r for r in records if r['result'] != 'Dropped'])))\n",
" return {'records': records}\n",
]
]
}
},
"Runtime": "python3.8",
"Timeout": 64,
"TracingConfig": {
"Mode": "PassThrough"
}
},
"DependsOn": "CloudWatchTransformFunctionRole"
},
"LogBackupS3Bucket": {
"Type": "AWS::S3::Bucket",
"Description": "Kinesis Firehose log backup",
"Properties": {
"BucketName": {
"Fn::Sub": "log-backup-s3-${AWS::AccountId}"
}
}
},
"CloudWatchSubscriptionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"RoleName": "CloudWatchSubscriptionRole",
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
{
"Fn::Sub": "logs.${AWS::Region}.amazonaws.com"
}
]
},
"Action": [
"sts:AssumeRole"
]
}
]
},
"Policies": [
{
"PolicyName": "CloudWatchSubscriptionPolicy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kinesis:PutRecord",
"Resource": {
"Fn::GetAtt": [
"OpenSearchDataStream",
"Arn"
]
}
}
]
}
}
]
}
},
"SubscriptionFilter": {
"Type": "AWS::Logs::SubscriptionFilter",
"Properties": {
"RoleArn": {
"Fn::GetAtt": [
"CloudWatchSubscriptionRole",
"Arn"
]
},
"LogGroupName": {
"Ref": "LogGroup"
},
"FilterPattern" : "",
"DestinationArn": {
"Fn::GetAtt": [
"OpenSearchDataStream",
"Arn"
]
}
}
}
},
"Outputs": {
"OpenSearchKinesisFirehoseServiceRole": {
"Description": "Role Created using this template. Use the following ARN in the OpenSearch all_access role",
"Value": {
"Fn::GetAtt": [
"OpenSearchKinesisFirehoseServiceRole",
"Arn"
]
}
}
}
}
Member discussion