Introduction

During my time leading development teams, one of the most frustrating challenges we faced was dealing with the overwhelming number of security findings flooding in from various systems. They were rarely well-described, almost never prioritized, and often left us scratching our heads about their actual impact.

Google’s Security Command Center (SCC) has been a game-changer in helping prioritize findings, giving security teams a clear understanding of what to address first. But for developers - or worse, non-technical stakeholders - things aren’t quite as straightforward. The console provides high-level explanations, but what happens when developers don’t have access to the GCP console? Or when your organization tracks issues in an entirely different system like JIRA or ServiceNow?

Here’s the hard truth: if the findings don’t integrate into a company’s business processes, they’re likely to be ignored. Developers don’t care about what’s written in the console if they never see it. What they need is context. Clear, actionable descriptions that tell them exactly what’s wrong, why it matters, and what they can do about it.

This article dives into how I tackled this challenge by building a middle layer to bridge the gap. Extracting the findings from SCC, enriches them with as much detail as possible, and have them ready to integrate into systems like JIRA.

High-Level Architecture

SCC Vertex AI High Level Architecture

The architecture begins with SCC, which continuously monitors your cloud environment for security issues and generates findings when potential risks are detected. These findings are configured to be exported in near real-time to a Pub/Sub topic. This topic acts as a message broker, ensuring findings are reliably captured and passed to the next stage of processing.

From the Pub/Sub topic, the findings are delivered to a Cloud Run service. This service is designed to handle incoming messages, decode the findings, and enrich them with Vertex AI to make them actionable for developers.

Once the findings are enriched and formatted, the service could use the target system's API (e.g., JIRA REST API) to create new issues or update existing ones. These tickets are enriched with all the necessary information developers need to understand and resolve the issue, even if they don’t have direct access to the SCC console.

Setting Up the Pub/Sub Topic for SCC Findings Export

To enable continuous export of findings from SCC into Pub/Sub, the first step is to create a Pub/Sub topic in the security account where we want to aggregate findings. This topic will act as the central channel for streaming SCC findings. Deploying the Cloud Run function and processing findings will also happen in this account.

Create the topic in a standard configuration of adjust it based on your organization requirements.

Create Pub/Sub Topic

Setting Up Continuous Export of SCC Findings to Pub/Sub

To export findings from the SCC across your organization, navigate to the SCC settings in your organization's main account. In the Continuous Export section, configure a new export to the Pub/Sub topic we created earlier in the security account (e.g., scc-export). This ensures all findings from resources within the organization are streamed to the topic.

Configure SCC continuous export

Optionally, you can apply filters to export only specific findings, but by default, all findings will be sent to the topic. Once configured, SCC will continuously export findings.

Configure SCC continuous export

Creating a Service Account for Secure Communication

To facilitate secure and controlled communication between Pub/Sub, Cloud Run, and Vertex AI, we need to create a dedicated service account called scc-ai-pubsub. This service account will be configured with the necessary roles to perform its tasks while maintaining security best practices.

Create service account name

The service account requires the following roles:

  1. Cloud Run Invoker: This role allows the service account to securely invoke the Cloud Run function, ensuring Pub/Sub can trigger the function to process findings.
  2. Vertex AI User: This role enables the service account to interact with Vertex AI, allowing it to generate responses based on the processed findings.
Create service account roles

Writing the Cloud Run Function to Process Findings

To process security findings and provide developers with actionable insights, we need to build a Cloud Run function. This function will serve as a bridge between the findings exported from SCC and the enriched information required by your teams. With findings flowing into your Pub/Sub topic, the next step is to create the Cloud Run function.

Before we dive into the code, let's configure the Cloud Run function with the necessary settings to ensure it runs securely and integrates seamlessly with the Pub/Sub topic. Here's what you need to do:

  1. Choose the Python Runtime
    When creating the Cloud Run function, select the Python runtime (e.g., Python 3.12 or later). This ensures compatibility with the function's dependencies and code.
  2. Require Authentication
    For enhanced security, enable authentication for the function. This restricts access and ensures that only authorized services, such as Pub/Sub, can invoke the function.
  3. Assign a Dedicated Service Account
    Use the service account scc-ai-pubsub you created for the Pub/Sub topic.
Configure Cloud Run Function

With SCC findings now flowing into our Pub/Sub topic, it's time to develop the function that processes and enriches these findings. 

Adding Required Dependencies

Since our function relies on functions-framework to handle events and google-genai to interact with Vertex AI, we need to include them in our requirements.txt:

functions-framework==3.*
google-genai

Setting Up the Environment

The first step in our function is to set up the environment variables. This ensures that the function works seamlessly across different environments while allowing for customization. Here's how we do it:

Develop Cloud Run Function
import os

PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT", "security-alexanderhose")
if PROJECT_ID == "security-alexanderhose":
    print("Warning: Using default project ID, please ensure it's correct.")
MODEL_ID = "gemini-2.0-flash-exp"
LOCATION = os.getenv("GOOGLE_CLOUD_REGION", "us-central1")
  • PROJECT_ID : Identifies the Google Cloud project where the function is deployed.
  • MODEL_ID : Specifies the Vertex AI model to use for content generation.
  • LOCATION : Determines the region where the AI model and Cloud Run function are deployed.

Handler Function

The handler function is the main entry point for the Cloud Run service. It decodes incoming messages, processes the data, and delegates to helper functions.

import base64
import functions_framework
import json

def handler(cloud_event):
    # Decode the base64-encoded data
    try:
        data = base64.b64decode(cloud_event.data["message"]["data"])
        # Decode the byte array to a string
        finding = data.decode("utf-8")
        print("Decoded base64 data successfully.")
    except Exception as e:
        print(f"Failed to decode cloud event data: {str(e)}")
        return

    try:
        # Attempt to parse the string as JSON
        finding_json = json.loads(finding)
        print("Decoded and parsed JSON data successfully.")
        
        # Generate prompt and answer
        prompt = generate_prompt(finding_json)
        print(f"Generated prompt: {prompt}")

        finding_response = generate_answer(prompt)
        print(f"Generated answer: {finding_response}")

    except json.JSONDecodeError:
        print(f"Data is not in valid JSON format: {finding}")
    except Exception as e:
        print(f"Error processing cloud event: {str(e)}")
  • Decoding Base64 Data: Converts the encoded Pub/Sub message into a readable string.
  • Parsing JSON: Extracts structured information from the finding.
  • Delegation: Calls generate_prompt to construct a detailed prompt and generate_answer to fetch AI-generated insights.

Generating the Prompt

The generate_prompt function constructs a detailed and structured prompt for the AI model based on the security finding's data.

def generate_prompt(finding_data):
    prompt = (
        "You are an expert security analyst assisting in creating a detailed and actionable resolution plan "
        "for a security finding detected by the Google Cloud Security Command Center (SCC). Analyze the "
        "following data, prioritize based on severity, and include the following in your response:\n"
        "- A clear and concise summary of the finding.\n"
        "- A detailed explanation of the issue, including potential impact if not resolved.\n"
        "- A step-by-step resolution plan with actionable items.\n"
        "- CLI commands (if applicable) to mitigate the issue using common cloud tools or APIs.\n"
        "- Links to relevant documentation to guide developers.\n\n"
    )

    finding = finding_data.get('finding', {})
    resource = finding_data.get('resource', {})
    source_properties = finding_data.get('sourceProperties', {})

    # Constructing the prompt incrementally
    prompt += f"Category: {finding.get('category', 'Unknown category')}\n"
    prompt += f"Description: {finding.get('description', 'No description available')}\n"
    prompt += f"Severity: {finding.get('severity', 'Unknown severity')}\n"
    prompt += f"Event Time: {finding.get('eventTime', 'Unknown event time')}\n"
    prompt += f"Resource Name: {resource.get('displayName', 'Unknown resource')}\n"
    prompt += f"Resource Type: {resource.get('type', 'Unknown resource type')}\n"
    prompt += f"Resource Location: {resource.get('location', 'Unknown location')}\n"

    # Add compliance information if available
    if 'compliances' in finding:
        prompt += "\nCompliance Information:\n"
        for compliance in finding['compliances']:
            standard = compliance.get('standard', 'Unknown')
            version = compliance.get('version', 'Unknown')
            ids = ", ".join(compliance.get('ids', []))
            prompt += f"- Standard: {standard}, Version: {version}, IDs: {ids}\n"

    # Add recommendation and explanation
    prompt += f"\nRecommendation: {source_properties.get('Recommendation', 'No recommendation available')}\n"
    prompt += f"Explanation: {source_properties.get('Explanation', 'No explanation available')}\n"

    # Add MITRE ATT&CK info if available
    if 'mitreAttack' in finding:
        prompt += "\nMITRE ATT&CK Information:\n"
        tactic = finding['mitreAttack'].get('primaryTactic', 'Unknown')
        techniques = ", ".join(finding['mitreAttack'].get('primaryTechniques', []))
        prompt += f"Tactic: {tactic}, Techniques: {techniques}\n"

    prompt += (
        "\nFormat your response using headings and bullet points for clarity. "
        "Ensure the resolution plan is actionable and easy to understand for developers. "
        "Generate CLI commands using Google Cloud CLI, APIs, or other standard tools where appropriate."
    )
    
    return prompt

Generating an Answer

The generate_answer function sends the prompt to the Vertex AI model and retrieves the response.

from google import genai

def generate_answer(prompt):
    try:
        # Initialize GenAI client
        client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION)
        # Generate the content using the model
        response = client.models.generate_content(
            model=MODEL_ID,
            contents=prompt
        )
        print("Generated response from model successfully.")
        return response.text

    except Exception as e:
        print(f"Error generating content: {str(e)}")
        return None
  • Initializing the Client: Establishes a connection to Vertex AI using the project, region, and model configuration.
  • Calling the Model: Sends the prompt to the AI model and retrieves the generated response.
  • Returning the Response: The AI’s output is returned for further processing.

Configuring the Cloud Run Trigger

With the Cloud Run function deployed, we need to set up the trigger that will invoke it whenever a new security finding is published. This can be done directly in the Cloud Run console.

We select the Pub/Sub trigger and choose the scc-export topic we created earlier. The most important configuration is specifying the service account (scc-ai-pubsub) for authentication. This ensures that only the designated service can invoke the function securely.

Configure Cloud Run Function Trigger

Example of AI-Generated Response

To see the function in action, here’s an example of how the AI processes a security finding and generates a structured remediation plan:

Sample Cloud Run Function Output

Final Full Code with Error Handling

Here's the complete version of the Cloud Run function. This includes error handling and debug messages.

import base64
import functions_framework
import json
import os
from google import genai

# Fetch environment variables or set defaults
PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT", "security-alexanderhose")
if PROJECT_ID == "security-alexanderhose":
    print("Warning: Using default project ID, please ensure it's correct.")
MODEL_ID = "gemini-2.0-flash-exp"
LOCATION = os.getenv("GOOGLE_CLOUD_REGION", "us-central1")

@functions_framework.cloud_event
def handler(cloud_event):
    # Decode the base64-encoded data
    try:
        data = base64.b64decode(cloud_event.data["message"]["data"])
        # Decode the byte array to a string
        finding = data.decode("utf-8")
        print("Decoded base64 data successfully.")
    except Exception as e:
        print(f"Failed to decode cloud event data: {str(e)}")
        return

    try:
        # Attempt to parse the string as JSON
        finding_json = json.loads(finding)
        print("Decoded and parsed JSON data successfully.")
        
        # Generate prompt and answer
        prompt = generate_prompt(finding_json)
        print(f"Generated prompt: {prompt}")

        finding_response = generate_answer(prompt)
        print(f"Generated answer: {finding_response}")

    except json.JSONDecodeError:
        print(f"Data is not in valid JSON format: {finding}")
    except Exception as e:
        print(f"Error processing cloud event: {str(e)}")


def generate_prompt(finding_data):
    prompt = (
        "You are an expert security analyst assisting in creating a detailed and actionable resolution plan "
        "for a security finding detected by the Google Cloud Security Command Center (SCC). Analyze the "
        "following data, prioritize based on severity, and include the following in your response:\n"
        "- A clear and concise summary of the finding.\n"
        "- A detailed explanation of the issue, including potential impact if not resolved.\n"
        "- A step-by-step resolution plan with actionable items.\n"
        "- CLI commands (if applicable) to mitigate the issue using common cloud tools or APIs.\n"
        "- Links to relevant documentation to guide developers.\n\n"
    )

    finding = finding_data.get('finding', {})
    resource = finding_data.get('resource', {})
    source_properties = finding_data.get('sourceProperties', {})

    # Constructing the prompt incrementally
    prompt += f"Category: {finding.get('category', 'Unknown category')}\n"
    prompt += f"Description: {finding.get('description', 'No description available')}\n"
    prompt += f"Severity: {finding.get('severity', 'Unknown severity')}\n"
    prompt += f"Event Time: {finding.get('eventTime', 'Unknown event time')}\n"
    prompt += f"Resource Name: {resource.get('displayName', 'Unknown resource')}\n"
    prompt += f"Resource Type: {resource.get('type', 'Unknown resource type')}\n"
    prompt += f"Resource Location: {resource.get('location', 'Unknown location')}\n"

    # Add compliance information if available
    if 'compliances' in finding:
        prompt += "\nCompliance Information:\n"
        for compliance in finding['compliances']:
            standard = compliance.get('standard', 'Unknown')
            version = compliance.get('version', 'Unknown')
            ids = ", ".join(compliance.get('ids', []))
            prompt += f"- Standard: {standard}, Version: {version}, IDs: {ids}\n"

    # Add recommendation and explanation
    prompt += f"\nRecommendation: {source_properties.get('Recommendation', 'No recommendation available')}\n"
    prompt += f"Explanation: {source_properties.get('Explanation', 'No explanation available')}\n"

    # Add MITRE ATT&CK info if available
    if 'mitreAttack' in finding:
        prompt += "\nMITRE ATT&CK Information:\n"
        tactic = finding['mitreAttack'].get('primaryTactic', 'Unknown')
        techniques = ", ".join(finding['mitreAttack'].get('primaryTechniques', []))
        prompt += f"Tactic: {tactic}, Techniques: {techniques}\n"

    prompt += (
        "\nFormat your response using headings and bullet points for clarity. "
        "Ensure the resolution plan is actionable and easy to understand for developers. "
        "Generate CLI commands using Google Cloud CLI, APIs, or other standard tools where appropriate."
    )
    
    return prompt


def generate_answer(prompt):
    try:
        # Initialize GenAI client
        client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION)
        # Generate the content using the model
        response = client.models.generate_content(
            model=MODEL_ID,
            contents=prompt
        )
        print("Generated response from model successfully.")
        return response.text

    except Exception as e:
        print(f"Error generating content: {str(e)}")
        return None
ℹ️
Google Cloud credits are provided for this project. #VertexAISprint
Share this post