AWS S3 is a great storage platform for hosting data. However, it's also a target for malicious intruders. 🎯
Most applications allow the user to upload their own data on S3 buckets. For example, to store images 📷, videos 📹, or any other custom data.
How can you safeguard your AWS S3 bucket from attackers uploading malicious files?
One way to safeguard your AWS S3 bucket files is to use VirusTotal to scan them for malware. 🔍 This free online service allows you to quickly scan files for viruses and other malicious content.
In this article, we will create lambda functions who are performing all the logic to scan the files and merge everything together in a Step Function.
Table of contents
Create S3 bucket 🚧
Generate VirusTotal API key ✨
Add VirusTotal API key to AWS Secrets Manager 🔑
Create IAM role 🛡️
Create lambda functions 💡
Create Step Functions 🔄
Create EventBridge rule ⏰
Conclusion 🎉
Create S3 bucket 🚧
In the first step, we create a new bucket or choose an existing one. The bucket configuration doesn't matter, but we need to ensure that the Amazon EventBridge notifications are enabled. We later use this to trigger our Step Function. You can enable this in the properties of your bucket underneath Event notifications.
Generate VirusTotal API key ✨
Next, we go to https://www.virustotal.com/ and create an account. Please make sure that the free account is only a limited version and that you are not allowed to use it for business workflows, commercial products, or services.
After the successful registration, we can navigate to our profile (https://www.virustotal.com/gui/user/<username>/apikey) and obtain the API key.
Add VirusTotal API key to AWS Secrets Manager 🔑
We take the API key from the previous step and create a new AWS Secrets Manager secret. For later reference, we need to note down the secret name. The key itself should be named api_key
and the secret virustotal_api_key
.
Create IAM role 🛡️
We need to create an IAM role with access to the encryption keys for the AWS Secrets Manager and access to the S3 bucket. For the S3 access, I have attached the AmazonS3FullAccess
role.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"secretsmanager:GetSecretValue",
"ssm:GetParameter"
],
"Resource": "*"
}
]
}
As we attach the role to Lambda, we need to enable a trusted relationship:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Create lambda functions 💡
We need to create two Lambda functions. The first one will upload the file to VirusTotal and the second one will get the results of the scan. Please make sure that both functions have the AWS-Parameters-and-Secrets-Lambda-Extension
layer added. You can do that from the bottom of the Code section in your lambda function.
VirustotalScan
import json
import os
import urllib3
import time
import boto3
s3Client = boto3.client('s3')
url = "https://www.virustotal.com/api/v3/files"
def lambda_handler(event, context):
secret_id = "virustotal_api_key"
secret_key = "api_key"
auth_headers = {"X-Aws-Parameters-Secrets-Token": os.environ.get('AWS_SESSION_TOKEN')}
http = urllib3.PoolManager()
r = http.request("GET", "http://localhost:2773/secretsmanager/get?secretId=" + secret_id, headers=auth_headers)
parameter = json.loads(r.data)
VIRUSTOTAL_API_KEY = json.loads(parameter["SecretString"])[secret_key]
headers = {
"accept": "application/json",
"x-apikey": VIRUSTOTAL_API_KEY,
}
bucketName = event['detail']['bucket']['name']
objectName = event['detail']['object']['key']
fileName = event['detail']['object']['key'].split('/')[-1]
s3file = s3Client.get_object(Bucket=bucketName, Key=objectName)
s3ContentType = s3file['ContentType']
s3file = s3file['Body'].read()
files = {"file": (fileName, s3file, s3ContentType)}
response = http.request('POST', url, headers=headers, fields = files)
analysisURL = json.loads(response.data)["data"]["links"]["self"]
return {
'analysisURL': analysisURL,
'bucket': bucketName,
'key': objectName
}
This function will first get the API key from the AWS Secrets Manager. For a full explanation, check out my previous post:
To call the VirusTotal API, we need to always add the API key into the header of the HTTP call:
headers = {
"accept": "application/json",
"x-apikey": VIRUSTOTAL_API_KEY,
}
Next, we want to get the file that was uploaded to S3 and save it in the s3file
variable. We can retrieve this information from the event variable. This variable is populated by the service triggering the lambda function. Additionally, we want to save the file type in the s3ContentType
variable.
bucketName = event['detail']['bucket']['name']
objectName = event['detail']['object']['key']
fileName = event['detail']['object']['key'].split('/')[-1]
s3file = s3Client.get_object(Bucket=bucketName, Key=objectName)
s3ContentType = s3file['ContentType']
s3file = s3file['Body'].read()
Now we can send the file to VirusTotal for the scan.
files = {"file": (fileName, s3file, s3ContentType)}
response = http.request('POST', url, headers=headers, fields = files)
analysisURL = json.loads(response.data)["data"]["links"]["self"]
As the scan can take some time, we return the unique URL of the scan and the S3 file information. This information will be passed to the next lambda function.
return {
'analysisURL': analysisURL,
'bucket': bucketName,
'key': objectName
}
Join our community of cloud security professionals. 🔐
Subscribe to our newslettervirustotalResults
import json
import os
import urllib3
import time
import boto3
s3Client = boto3.client('s3')
def lambda_handler(event, context):
secret_id = "virustotal_api_key"
secret_key = "api_key"
auth_headers = {"X-Aws-Parameters-Secrets-Token": os.environ.get('AWS_SESSION_TOKEN')}
http = urllib3.PoolManager()
r = http.request("GET", "http://localhost:2773/secretsmanager/get?secretId=" + secret_id, headers=auth_headers)
parameter = json.loads(r.data)
VIRUSTOTAL_API_KEY = json.loads(parameter["SecretString"])[secret_key]
headers = {
"accept": "application/json",
"x-apikey": VIRUSTOTAL_API_KEY,
}
response = http.request('GET', event['analysisURL'], headers=headers)
response = json.loads(response.data)
status = response['data']['attributes']['status']
if status != "queued":
suspicious = response['data']['attributes']['stats']['suspicious']
malicious = response['data']['attributes']['stats']['malicious']
if(malicious > 0):
s3Client.put_object_tagging(
Bucket= event['bucket'],
Key= event['key'],
Tagging={
'TagSet': [
{
'Key': 'malicious',
'Value': 'true'
},
]
}
)
else:
s3Client.put_object_tagging(
Bucket= event['bucket'],
Key= event['key'],
Tagging={
'TagSet': [
{
'Key': 'malicious',
'Value': 'false'
},
]
}
)
return {
'status': 'sucess',
'suspicious': suspicious,
'malicious': malicious
}
else:
return {
'status': 'failed'
}
This lambda function will retrieve the results of the VirusTotal scan. First, we take the analysis URL from the previous function. It is stored in the event variable. If we call the analysis URL we get information about the status of the analysis and the results of the different scanners.
response = http.request('GET', event['analysisURL'], headers=headers)
response = json.loads(response.data)
status = response['data']['attributes']['status']
The response from VirusTotal will look like this:
{
"meta":{
"file_info":{
...
}
},
"data":{
"attributes":{
"date":1684306866,
"status":"completed",
"stats":{
"harmless":0,
"type-unsupported":16,
"suspicious":0,
"confirmed-timeout":0,
"timeout":0,
"failure":0,
"malicious":0,
"undetected":59
},
"results":{
"Bkav":{
"category":"undetected",
"engine_name":"Bkav",
"engine_version":"2.0.0.1",
"result":"None",
"method":"blacklist",
"engine_update":"20230516"
},
"Lionic":{
...
}
}
},
"type":"analysis",
"id":"YjdkNjE5MTM4MDViNGNlMmFhZTZkYmE2MzJmMDg1ZjM6MTY4NDMwNjg2Ng==",
"links":{
...
}
}
}
If the scan is not finished yet, the status message will be queued otherwise it will state completed ($.data.attributes.status
). If the status is queued we will exit the function and return the status as failed.
if status != "queued":
...
else:
return {
'status': 'failed'
}
If the scan is completed we will tag the S3 object. To identify how to tag the S3 object, we take the number of scanners that flagged the file as suspicious or malicious. In my case, I just check if the malicious count is higher than 0 and would flag the file as malicious. You can adjust the logic to fit your business purpose.
suspicious = response['data']['attributes']['stats']['suspicious']
malicious = response['data']['attributes']['stats']['malicious']
if(malicious > 0):
s3Client.put_object_tagging(
Bucket= event['bucket'],
Key= event['key'],
Tagging={
'TagSet': [
{
'Key': 'malicious',
'Value': 'true'
},
]
}
)
else:
s3Client.put_object_tagging(
Bucket= event['bucket'],
Key= event['key'],
Tagging={
'TagSet': [
{
'Key': 'malicious',
'Value': 'false'
},
]
}
)
return {
'status': 'sucess',
'suspicious': suspicious,
'malicious': malicious
}
Create Step Functions 🔄
Now we can put everything together and create the Step Function logic. The Workflow editor makes it very easy. Just drag and drop the correct functions to reflect the below flow. You can adjust the wait time accordingly. Usually, a wait time of 60 seconds is sufficient.
Create EventBridge rule ⏰
Let's create the EventBridge rule now. First, we need to define the event pattern. You can define the detail type and bucket:
{
"source": ["aws.s3"],
"detail-type": ["Object Created"],
"detail": {
"bucket": {
"name": ["virustotal"]
}
}
}
Next, we define the previously created Step Function as our target.
Every time we upload a new object to our S3 bucket the file will be scanned via VirusTotal and tagged accordingly.
Conclusion 🎉
Integrating VirusTotal into your AWS S3 bucket security strategy can provide an additional layer of protection. 🔒 VirusTotal's comprehensive threat detection capabilities, combined with the step-by-step guide we have provided, can help you detect and mitigate potential risks effectively. 🔍
Remember to configure access control and permissions properly, implement encryption, enable logging and monitoring, conduct regular audits and vulnerability assessments, implement two-factor authentication, utilize AWS security services, educate users, and follow security best practices. These measures will enhance the overall security of your AWS S3 bucket files and reduce the risk of malicious intrusions. 🚧
By prioritizing data security and leveraging tools like VirusTotal, you can have peace of mind knowing that your AWS S3 bucket files are well-protected from potential threats.
Member discussion