Introduction

In my work, I often meet clients who are struggling to stay ahead of their cloud security responsibilities. Imagine a small but growing tech company that recently migrated its infrastructure to Google Cloud Platform (GCP). They have a lean IT team, with only one or two engineers responsible for cloud security, along with countless of other tasks. Security incidents and compliance requirements come up frequently, but the team is overwhelmed with other priorities. They know the importance of managing security, but they lack the time and resources to go through endless security findings from GCP's Security Command Center (SCC).

This scenario isn’t uncommon. Many organizations - especially those with small or no dedicated cloud security engineering teams - find themselves buried under a mountain of findings, alerts, and potential threats. Even with tools like SCC, application owners often lack the context to fully understand the severity of a finding, let alone how to resolve it quickly.

Enter Vertex AI. By integrating advanced machine learning models like Gemini, cloud security teams can enhance their ability to manage and respond to security findings. Imagine a world where, instead of simply being alerted to a vulnerability, you receive clear, actionable steps - customized to your environment and threat landscape - on how to mitigate risks.

AI could become an essential tool in the security toolbox, especially for teams that are stretched thin. In the next section, we'll dive deeper into how GCP’s SCC findings can be enriched with Vertex AI and Gemini, offering not just alerts but intelligent recommendations that help organizations proactively secure their cloud environments.

Setup

The setup involves using GCP's SCC to continuously export security findings via Pub/Sub, triggering a Cloud Function that enriches the findings with Vertex AI. This enriched data provides actionable insights, helping teams prioritize and address security issues more effectively. The final step integrates the findings with a vulnerability management tool like JIRA, allowing for seamless tracking and resolution within existing workflows. This streamlined approach enables even small security teams to manage their cloud security posture more efficiently by leveraging AI.

Step 1: Continuous Export via Pub/Sub

At the heart of this setup is SCC’s continuous export feature. By enabling it at the organization level, all security findings - whether related to misconfigurations, vulnerabilities, or potential threats - are automatically streamed to a designated Pub/Sub topic in the security account. Check out my previous tutorial on how to set this up:

How to Export GCP Security Command Center Findings to Pub/Sub
Introduction Today, we’ll be exploring a powerful feature that allows you to export GCP Security Command Center findings to Pub/Sub, and then effortlessly transform them via Cloud Functions for seamless integration with other systems. Understanding GCP Security Command Center 🧠 Before we jump into the nitty-gritty details, let’s quickly recap

Step 2: Pub/Sub Subscription and Cloud Function Trigger

From Pub/Sub, the next step is to process these findings. For this, we’ve set up a subscription that listens for new messages on the Pub/Sub topic. Every time a new security finding is published, a Cloud Function is triggered. This Cloud Function serves as the bridge to Vertex AI, where the real magic of AI-driven enrichment happens.

Step 3: Enriching Security Findings with Vertex AI

Once the Cloud Function is triggered, it receives the raw security findings from Pub/Sub. At this stage, these findings are likely to be quite technical, requiring a fair amount of manual investigation to fully understand the severity and required remediation steps. This is where Vertex AI comes into play.

To ensure the Cloud Function has the necessary packages to perform this enrichment, you need to create a requirements.txt file. This file should include the following lines to install functions-framework and vertexai:

functions-framework==3.*
vertexai

By specifying these dependencies, we ensure that the function can handle events via the functions-framework and interact with Vertex AI to enrich security findings.

Using Vertex AI, we can feed the findings through machine learning models trained to provide additional context. For example, Vertex AI can:

  • Prioritize findings based on the business context, risk level, and potential impact.
  • Provide tailored remediation steps, helping application owners or DevOps teams quickly understand what actions to take to resolve the issue.
  • Correlate findings with similar vulnerabilities or patterns seen across the organization, helping to prevent recurring issues.

To achieve this enrichment, the following code in our Cloud Function processes the security findings and interacts with Vertex AI:

import base64
import functions_framework
import json
import vertexai
from vertexai.generative_models import GenerativeModel, Part, SafetySetting

generation_config = {
    "max_output_tokens": 8192,
    "temperature": 1,
    "top_p": 0.95,
}

safety_settings = [
    SafetySetting(
        category=SafetySetting.HarmCategory.HARM_CATEGORY_HATE_SPEECH,
        threshold=SafetySetting.HarmBlockThreshold.OFF
    ),
    SafetySetting(
        category=SafetySetting.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
        threshold=SafetySetting.HarmBlockThreshold.OFF
    ),
    # Additional safety settings can be specified here
]
@functions_framework.cloud_event
def handler(cloud_event):
    data = base64.b64decode(cloud_event.data["message"]["data"])
    finding = data.decode("utf-8")
    
    try:
        finding_json = json.loads(finding)
        prompt = generate_prompt(finding_json)
        finding = generate_answer(prompt)
        print(finding)

    except json.JSONDecodeError:
        print("Data is not in valid JSON format:", finding)
def generate_prompt(finding_data):
    prompt = "The Security Command Center has detected a finding. Analyze the category, description, severity, and recommendation fields to create a resolution plan.\n\n"
    finding = finding_data.get('finding', {})
    resource = finding_data.get('resource', {})

    if 'category' in finding:
        prompt += f"Category: {finding['category']}\n"
    
    if 'description' in finding:
        prompt += f"Description: {finding['description']}\n"

    if 'Recommendation' in source_properties:
        prompt += f"\nRecommendation: {source_properties['Recommendation']}\n"

    if 'Explanation' in source_properties:
        prompt += f"Explanation: {source_properties['Explanation']}\n"


    if 'mitreAttack' in finding:
        prompt += "\nMITRE ATT&CK Information:\n"
        tactic = finding['mitreAttack'].get('primaryTactic', 'Unknown')
        techniques = ", ".join(finding['mitreAttack'].get('primaryTechniques', []))
        prompt += f"Tactic: {tactic}, Techniques: {techniques}\n"
    
    prompt += "Please also include a brief explanation of the potential impact if the issue is not resolved. If there are any questions or further clarifications needed, feel free to contact [email protected]"

    
    # Additional field handling here...

    return prompt
def generate_answer(prompt):
    vertexai.init(project="security-alexanderhose", location="us-central1")
    model = GenerativeModel("gemini-1.5-flash-002")
    answer = ""
    responses = model.generate_content(
        [prompt],
        generation_config=generation_config,
        safety_settings=safety_settings,
        stream=True,
    )

    for response in responses:
        answer += response.text
    
    return answer

This code performs three key tasks:

  1. Imports and Configuration: The code is importing necessary libraries, including base64json, and modules for Vertex AI and Google Cloud Functions. Configuration settings follow, such as generation_config, which determines parameters like max_output_tokens (setting the length of generated content) and temperature (which controls the randomness of the output). Safety settings, which prevent harmful content from being included in responses, are also configured here, but are set to "off" for the specified categories.
  2. Cloud Event Handler: The main function, handler, listens for cloud events, which in this case are triggered when new security findings are published to the Pub/Sub topic. When a finding is received, the function decodes the base64-encoded message data, converts it to a string, and then attempts to parse it as JSON. The decoded JSON data is printed for debugging purposes, ensuring the data format is correct before proceeding.
  3. Prompt Generation: The generate_prompt function is crucial as it creates a custom prompt tailored to each finding. It starts with a base message and pulls details from the finding_data, such as category, description, severity, and timestamp, as well as information about the affected resource and compliance data. The function checks for each relevant field in the JSON, adding them to the prompt to provide Vertex AI with context on the finding. Additional elements like recommendations, explanations, and MITRE ATT&CK information (if available) are also added, helping Vertex AI generate a more accurate and actionable response.
  4. Generating the Answer with Vertex AI: The generate_answer function initializes Vertex AI with the specified project and location settings, and then loads a generative model (in this case, "gemini-1.5-flash-002"). The prompt is passed to the model through the generate_content method, which uses the generation_config and safety_settings defined earlier. As Vertex AI streams responses, the code accumulates the answer in the answer variable, which will contain the enriched information for the finding.

This setup with Vertex AI automates the enrichment and prioritization of findings, allowing security teams to focus on remediation rather than manual analysis.

Step 4: Integration with Vulnerability Management Tools

Once the findings are enriched and prioritized, the final step is to integrate the output with a vulnerability management tool. While the specific implementation of tools like JIRA won’t be covered in this post, it’s worth noting that you can easily forward enriched findings into almost any ticketing or vulnerability management system. This ensures the security team can track and address each issue within the context of their existing workflows.

With this setup, security teams can dramatically reduce the time spent analyzing findings and focus on more strategic efforts. Even small or resource-constrained teams can now stay on top of their security posture with AI-driven insights and automated prioritization.

Full Code

Below is the complete code for the Cloud Function.

import base64
import functions_framework
import json
import vertexai
from vertexai.generative_models import GenerativeModel, Part, SafetySetting

generation_config = {
    "max_output_tokens": 8192,
    "temperature": 1,
    "top_p": 0.95,
}

safety_settings = [
    SafetySetting(
        category=SafetySetting.HarmCategory.HARM_CATEGORY_HATE_SPEECH,
        threshold=SafetySetting.HarmBlockThreshold.OFF
    ),
    SafetySetting(
        category=SafetySetting.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
        threshold=SafetySetting.HarmBlockThreshold.OFF
    ),
    SafetySetting(
        category=SafetySetting.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
        threshold=SafetySetting.HarmBlockThreshold.OFF
    ),
    SafetySetting(
        category=SafetySetting.HarmCategory.HARM_CATEGORY_HARASSMENT,
        threshold=SafetySetting.HarmBlockThreshold.OFF
    ),
]

@functions_framework.cloud_event
def handler(cloud_event):
    # Decode the base64-encoded data
    data = base64.b64decode(cloud_event.data["message"]["data"])
    
    # Decode the byte array to a string
    finding = data.decode("utf-8")
    
    try:
        # Attempt to parse the string as JSON
        finding_json = json.loads(finding)
        print("Decoded and parsed JSON data:", finding_json)

        prompt = generate_prompt(finding_json)
        print(prompt)
        finding = generate_answer(prompt)
        print(finding)

    except json.JSONDecodeError:
        print("Data is not in valid JSON format:", finding)


def generate_prompt(finding_data):
    # Initialize the prompt with a base message
    prompt = "The Security Command Center has detected a finding. Analyze the category, description, severity, and recommendation fields to create a resolution plan including efforts and links to the documentation:\n\n"

    # Check for presence of key fields in the 'finding' part of the JSON
    finding = finding_data.get('finding', {})
    resource = finding_data.get('resource', {})
    source_properties = finding_data.get('sourceProperties', {})

    # Add finding details to the prompt
    if 'category' in finding:
        prompt += f"Category: {finding['category']}\n"
    
    if 'description' in finding:
        prompt += f"Description: {finding['description']}\n"
    
    if 'severity' in finding:
        prompt += f"Severity: {finding['severity']}\n"
    
    if 'eventTime' in finding:
        prompt += f"Event Time: {finding['eventTime']}\n"

    # Add resource details to the prompt
    if 'displayName' in resource:
        prompt += f"Resource Name: {resource['displayName']}\n"
    
    if 'type' in resource:
        prompt += f"Resource Type: {resource['type']}\n"
    
    if 'location' in resource:
        prompt += f"Resource Location: {resource['location']}\n"

    # Extract compliance details if available
    if 'compliances' in finding:
        prompt += "\nCompliance Information:\n"
        for compliance in finding['compliances']:
            standard = compliance.get('standard', 'Unknown')
            version = compliance.get('version', 'Unknown')
            ids = ", ".join(compliance.get('ids', []))
            prompt += f"- Standard: {standard}, Version: {version}, IDs: {ids}\n"

    # Include recommendations and explanations from source properties
    if 'Recommendation' in source_properties:
        prompt += f"\nRecommendation: {source_properties['Recommendation']}\n"

    if 'Explanation' in source_properties:
        prompt += f"Explanation: {source_properties['Explanation']}\n"

    # If MitreAttack information exists, add that too
    if 'mitreAttack' in finding:
        prompt += "\nMITRE ATT&CK Information:\n"
        tactic = finding['mitreAttack'].get('primaryTactic', 'Unknown')
        techniques = ", ".join(finding['mitreAttack'].get('primaryTechniques', []))
        prompt += f"Tactic: {tactic}, Techniques: {techniques}\n"
    
    prompt += "Please also include a brief explanation of the potential impact if the issue is not resolved. If there are any questions or further clarifications needed, feel free to contact [email protected]"
    return prompt

def generate_answer(prompt):
    vertexai.init(project="security-alexanderhose", location="us-central1")
    model = GenerativeModel(
        "gemini-1.5-flash-002",
    )
    answer = ""

    responses = model.generate_content(
        [prompt],
        generation_config=generation_config,
        safety_settings=safety_settings,
        stream=True,
    )

    for response in responses:
        answer += response.text
    
    return answer

Sample AI Response

When this code is run, Vertex AI generates a response based on the given prompt. Below is a sample output from Vertex AI, providing additional details and recommended actions:

## Resolution Plan: CREDENTIAL_EXFILTRATION Finding on alexanderhose.com

**Category:** CREDENTIAL_EXFILTRATION

**Severity:** HIGH

**Event Time:** 2024-10-13T21:22:54.493Z

**Resource Name:** alexanderhose.com

**Resource Type:** google.cloud.resourcemanager.Organization

**Potential Impact:**  Credential exfiltration is a HIGH severity issue because it allows unauthorized access to your Google Cloud resources and potentially your entire organization's data.  This could lead to data breaches, unauthorized modification or deletion of data, service disruptions, financial losses, and reputational damage.  Unresolved, this could result in significant legal and regulatory repercussions depending on the nature of the data compromised.

**Resolution Efforts:**

This plan focuses on immediate containment and then root cause analysis.  Due to the high severity, immediate action is critical.

**Phase 1: Immediate Containment (within 24 hours)**

1. **Rotate all affected credentials:**  Immediately rotate all API keys, service account keys, and any other credentials associated with the `alexanderhose.com` organization. This includes keys used for access to Google Cloud Platform (GCP), any connected services, and any third-party applications with access to GCP.  [Google Cloud Documentation on Key Rotation](https://cloud.google.com/docs/authentication/managing-keys)

2. **Enable Cloud Audit Logging:** Ensure Cloud Audit Logging is enabled at the organizational level and review logs from the time of the exfiltration event (2024-10-13T21:22:54.493Z) to identify the source and extent of the compromise. [Google Cloud Documentation on Cloud Audit Logging](https://cloud.google.com/logging/docs/audit)

3. **Review IAM Permissions:** Conduct a thorough review of Identity and Access Management (IAM) permissions within the organization to identify any overly permissive roles or unnecessary access grants that might have facilitated the exfiltration.  Focus on service accounts and their associated permissions. [Google Cloud Documentation on IAM Roles](https://cloud.google.com/iam/docs/understanding-roles)

4. **Enable Security Health Analytics:** Review the Security Health Analytics dashboard for potential vulnerabilities that might have contributed to the exfiltration. [Google Cloud Documentation on Security Health Analytics](https://cloud.google.com/security-command-center/docs/using-security-health-analytics)


**Phase 2: Root Cause Analysis and Remediation (within 72 hours)**

1. **Analyze Audit Logs:**  Thoroughly analyze the Cloud Audit Logs to identify the specific actions that led to the credential exfiltration.  This may involve correlating log entries across different services.

2. **Investigate Compromised Systems:** Identify any compromised systems or applications within your organization that may have been used as a stepping stone to exfiltrate the credentials. This may involve examining endpoint security logs and performing malware scans.

3. **Remediate identified vulnerabilities:** Based on the root cause analysis, implement appropriate security measures to prevent future credential exfiltration. This might include:
    * Implementing strong password policies.
    * Enforcing multi-factor authentication (MFA) for all users and service accounts.
    * Regularly patching systems and applications.
    * Implementing intrusion detection and prevention systems.
    * Utilizing Cloud Security Posture Management (CSPM) tools.


**Phase 3: Ongoing Monitoring and Prevention (ongoing)**

1. **Continuous Monitoring of Security Health Analytics:** Regularly review Security Health Analytics dashboards and alerts to proactively identify and mitigate potential vulnerabilities.

2. **Regular Security Assessments:** Conduct regular security assessments to identify and address any security gaps.

3. **Security Awareness Training:** Provide regular security awareness training to employees to educate them about security best practices and the importance of protecting credentials.


**Further Clarifications:**

If after implementing these steps, the issue persists or you require further assistance, please contact [email protected] with details of the findings from the audit logs and any other relevant information.  The specific steps taken and their results should be documented.  This documentation will be crucial for future incident response and preventing similar incidents.

This example demonstrates how Vertex AI helps transform raw findings into structured, actionable insights, simplifying the process for security teams and application owners to address issues efficiently.

Share this post