Extracting Bulk Data with the InsightVM Console API

Whether you’re attempting to extract InsightVM scan data to ingest into your SIEM, a CMDB, or to ultimately generate tickets for your remediation teams, leveraging the InsightVM RESTful API is likely the first place to get started. What may not be as apparent is what endpoints to use and best practices for retrieving such a large amount of data.

In the situations where pulling data directly from the InsightVM console is preferred, we recommend using the SQL reporting functionality to automate ad hoc reports to retrieve the exact data needed. The InsightVM API documentation provides plenty of details on the necessary endpoints and parameters available; however, the resources specifically used for this example are noted below:

  • POST /api/3/reports
  • POST /api/3/reports/<report_id>/generate
  • GET /api/3/reports/<report_id>/history/<instance_id>/output
  • DELETE /api/3/reports/<report_id>
  • GET /api/3/reports/<report_id>/history/<instance_id>

When orchestrating the generation, download, and cleanup of reports to retrieve data, a typical workflow to follow is:

  1. Create a report template with SQL query and filtering
  2. Generate the report
  3. Check the report status
  4. Download the report
  5. Delete report template

A simple Python script demonstrating how to interact with the InsightVM RESTful API has been included to assist in getting started with programmatic use cases. This script shows the workflow described above as well as the most important endpoints used to generate and retrieve the report results.

To get started with the script, first ensure the INSIGHTVM_HOST, INSIGHTVM_USER, and INSIGHTVM_PASS environment variables are set appropriately, or modify lines 91-93 with the required values for authenticating to the InsightVM API.

HOST = os.environ.get("INSIGHTVM_HOST", "") # Format: <ip/hostname>:<port>
USER = os.environ.get("INSIGHTVM_USER", "") # InsightVM Console user with permissions to generate reports
PASS = os.environ.get("INSIGHTVM_PASS", "")

Then review the provided queries, starting at line 99, and update them in order to retrieve the information needed. The example script includes queries used for generating reports of:

  1. All known assets scanned by InsightVM
  2. All known vulnerability definitions
  3. All vulnerability findings of scanned assets with best solution details

Finally, on line 142 the script does not implement a storage or output mechanism for the generated report(s). This should be updated to either output the reports to stdout, save them to a file, or process them as necessary.

The only dependency necessary to get started is Python 3.6+. Once installed, running the script is as easy as:

> python3 insightvm_bulk_data_extraction.py

Check out the provided script to see how you can begin pulling scan data. Remember, it’s important to filter reports in large environments by site, tags, or asset groups to avoid reports that are extremely large or take a significant amount of time to generate. For more information on report filtering, review the details for filters in the report creation documentation.

The InsightVM API offers plenty more capabilities beyond this example. We would love to know how you’ve used the InsightVM API to automate reports or any other tasks in your environment.

Script
from base64 import b64encode
from datetime import datetime
import http.client
import json
import os
import ssl
import sys
from time import sleep
import uuid


class InsightVmApi:
   def __init__(self, url, username, password, verify_ssl):
       # Craft basic authentication
       auth = f"{username}:{password}"
       auth = b64encode(auth.encode('ascii')).decode()

       self.base_resource = "/api/3"
       self.headers = {
           'Accept': "application/json",
           'Content-Type': "application/json",
           'Authorization': f"Basic {auth}"
       }
       self.conn = http.client.HTTPSConnection(url)
      
       if verify_ssl.lower() == 'false':
           # Ignore certificate verification for self-signed certificate; NOT to be used in production
           self.conn._context=ssl._create_unverified_context()

   def create_report(self, report_name, report_query):
       body = {
           "name": report_name,
           "format": "sql-query",
           "query": report_query,
           "version": "2.3.0"
       }
       self.conn.request("POST", f"{self.base_resource}/reports", json.dumps(body), self.headers)

       resp = self.conn.getresponse()
       data = resp.read()

       # Return JSON response for report template
       return json.loads(data.decode())

   def run_report(self, report_id):
       self.conn.request("POST", f"{self.base_resource}/reports/{report_id}/generate", None, self.headers)

       resp = self.conn.getresponse()
       instance = json.loads(resp.read().decode())

       while True:
           instance_details = self.get_report_details(report_id, instance["id"])

           if any(instance_details["status"] in s for s in ['aborted', 'failed', 'complete']):
               # Return report instance id and status on completion status
               return instance_details
           else:
               # Wait between checking status; reports can take a while to complete
               sleep(5)

   def download_report(self, report_id, instance_id):
       self.conn.request("GET", f"{self.base_resource}/reports/{report_id}/history/{instance_id}/output",
                         None, self.headers)

       resp = self.conn.getresponse()
       data = resp.read().decode()

       # Return JSON response for report instance
       return data

   def delete_report(self, report_id):
       self.conn.request("DELETE", f"{self.base_resource}/reports/{report_id}", None, self.headers)

       resp = self.conn.getresponse()
       data = resp.read()

       # Return JSON response for report instance
       return json.loads(data.decode())

   def get_report_details(self, report_id, instance_id):
       self.conn.request("GET", f"{self.base_resource}/reports/{report_id}/history/{instance_id}", None, self.headers)

       resp = self.conn.getresponse()
       data = resp.read()

       # Return JSON response for report instance
       return json.loads(data.decode())


if __name__ == '__main__':
   HOST = os.environ.get("INSIGHTVM_HOST", "")  # Format: <ip/hostname>:<port>
   USER = os.environ.get("INSIGHTVM_USER", "")  # InsightVM Console user with permissions to generate reports
   PASS = os.environ.get("INSIGHTVM_PASS", "")
   SSL_VERIFY = os.environ.get("INSIGHTVM_SSL_VERIFY", "true")  # Override to False to ignore certification verification

   if any(v is None or v is "" for v in [HOST, USER, PASS]):
       sys.exit("Host, user, or password not defined; check environment variables and try again!")

   # Reference: https://nexpose.help.rapid7.com/docs/understanding-the-reporting-data-model-overview-and-query-design
   QUERIES = {
       "assets": """
           SELECT fa.asset_id, da.ip_address, da.host_name, da.mac_address, dos.vendor as operating_system_vendor,
                  dos.name as operating_system_name, dos.version as operating_system_version,
                  fa.scan_finished as last_scanned, fa.riskscore, fa.vulnerabilities, fa.critical_vulnerabilities,
                  fa.severe_vulnerabilities, fa.moderate_vulnerabilities, fa.vulnerability_instances
           FROM fact_asset AS fa
           JOIN dim_asset AS da ON da.asset_id = fa.asset_id
           JOIN dim_operating_system AS dos ON dos.operating_system_id = da.operating_system_id
       """,
       "vulnerabilities": """
           SELECT dv.vulnerability_id, dv.nexpose_id, dv.title, htmlToText(dv.description) as description,
                  dv.date_published, dv.severity_score, dv.riskscore, dvc.category_name, dvf.source, dvf.reference,
                  fv.affected_assets, fv.vulnerability_instances, fv.first_discovered, fv.most_recently_discovered
           FROM dim_vulnerability AS dv
           JOIN dim_vulnerability_category AS dvc ON dv.vulnerability_id = dvc.vulnerability_id
           JOIN dim_vulnerability_reference AS dvf ON dv.vulnerability_id = dvf.vulnerability_id
           JOIN fact_vulnerability AS fv ON dv.vulnerability_id = fv.vulnerability_id
       """,
       "finding_with_best_solution": """
           SELECT davbs.asset_id, davbs.vulnerability_id, davbs.solution_id, ds.nexpose_id, ds.solution_type,
                  htmlToText(ds.fix) as fix, ds.summary
           FROM dim_asset_vulnerability_best_solution AS davbs
           JOIN dim_solution AS ds ON davbs.solution_id = ds.solution_id
       """
   }

   # Initialize API helper
   api = InsightVmApi(HOST, USER, PASS, SSL_VERIFY)

   # Process each query
   for name, query in QUERIES.items():
       print(f"Generating report for {name} query")
       start_time = datetime.now()

       # Create report template with name and query
       report = api.create_report(f"adhoc-{name}-{uuid.uuid4()}", query)

       # Generate report and poll until completion
       report_instance = api.run_report(report["id"])

       if report_instance["status"] == "complete":
           # Download report; do something with it!
           api.download_report(report["id"], report_instance["id"])

       # Cleanup adhoc report template
       api.delete_report(report["id"])

       end_time = datetime.now()
       delta = end_time - start_time

       print(f"{name} report generated in {delta.seconds} seconds")
1 Like