So you’ve already gotten yourself acquainted with the InsightVM API (v3), but you want to dive deeper into how to leverage Asset Search within your scripts and automation. The Asset Search endpoint is one of the most powerful ways to find devices that meet any number of criteria - assets of a site, assets scanned in the last 24 hours, etc.
Similar to some of our other posts, the best way to get started is to identify the endpoint in question:
- POST /api/3/assets/search
Seems simple enough, but then we need to identify the request payload to be passed to the search endpoint. The body includes two components to make up the Search Criteria:
- Filters: list of fields, operators, and values used to match assets
- Match: used to determine how to match multiple filters; options include:
any
andall
When applying filters to a search criteria it is important to remember that more than one filter can be applied. This means you can search for all assets that “have been scanned in the past 30 days” AND “are linux devices”:
{
"filters": [
{
"field": "last-scan-date",
"operator": "is-within-the-last",
"value": 30
}
],
"match": "all"
}
Other search criteria could be for “all assets in the agent site (site ID: 1)”:
{
"filters": [
{
"field": "site-id",
"operator": "in",
"values": [1]
}
],
"match": "all"
}
Or, all Linux OR Mac devices:
{
"filters": [
{
"field": "operating-system",
"operator": "contains",
"value": "Mac"
},
{
"field": "operating-system",
"operator": "contains",
"value": "Linux"
}
],
"match": "any"
}
The Asset Search endpoint is a paged endpoint so remember you may have to make multiple requests to different pages to get all the assets you are searching for. The example script will keep requesting until all pages have been returned:
def asset_search(self, filters, match):
body = {
"filters": filters,
"match": match,
}
page = 0
size = 500
matched_assets = []
while(True):
self.conn.request("POST", f"{self.base_resource}/assets/search?page={page}&size={size}", json.dumps(body),
self.headers)
resp = self.conn.getresponse()
data = resp.read()
resp_dict = json.loads(data.decode())
matched_assets.extend(resp_dict["resources"])
if resp_dict["page"]["totalPages"] > page + 1:
page += 1
else:
break
# List of assets
return matched_assets
One thing to keep in mind when using the Asset Search endpoint is that while it can be used to return all devices in InsightVM, there are better ways to bulk extract data. Instead look to leverage the Reports endpoint as explained in the Extracting Bulk Data with the InsightVM Console API post.
A full script has been included to search for assets based on these particular filters - feel free to add comments for filters you’ve found useful for others to use as well. Give it a run in your environment and use it as a starting point to get started. The only dependency necessary to get started is Python 3.6+. Once installed, running the script is as easy as setting the appropriate environment variables and running the script:
Variable | Description |
---|---|
INSIGHTVM_HOST | Security Console host URL, format: <ip/hostname>: |
INSIGHTVM_USER | API username |
INSIGHTVM_PASS | API user password |
> python3 insightvm_bulk_data_extraction.py
0 assets matched filter [{'field': 'last-scan-date', 'operator': 'is-within-the-last', 'value': 30}, {'field': 'operating-system', 'operator': 'contains', 'value': 'Linux'}]
67 assets matched filter [{'field': 'site-id', 'operator': 'in', 'values': [1]}]
211 assets matched filter [{'field': 'operating-system', 'operator': 'contains', 'value': 'Mac'}, {'field': 'operating-system', 'operator': 'contains', 'value': 'Linux'}]
Script
from base64 import b64encode
import http.client
import json
import os
import ssl
import sys
class InsightVmApi:
def __init__(self, url, username, password, verify_ssl):
# Craft basic authentication
auth = f"{username}:{password}"
auth = b64encode(auth.encode('ascii')).decode()
self.base_resource = "/api/3"
self.headers = {
'Accept': "application/json",
'Content-Type': "application/json",
'Authorization': f"Basic {auth}"
}
self.conn = http.client.HTTPSConnection(url)
if verify_ssl.lower() == 'false':
# Ignore certificate verification for self-signed certificate; NOT to be used in production
self.conn._context=ssl._create_unverified_context()
def asset_search(self, filters, match):
body = {
"filters": filters,
"match": match,
}
page = 0
size = 500
matched_assets = []
while(True):
self.conn.request("POST", f"{self.base_resource}/assets/search?page={page}&size={size}", json.dumps(body),
self.headers)
resp = self.conn.getresponse()
data = resp.read()
resp_dict = json.loads(data.decode())
matched_assets.extend(resp_dict["resources"])
if resp_dict["page"]["totalPages"] > page + 1:
page += 1
else:
break
# Return JSON response for report template
return matched_assets
if __name__ == '__main__':
HOST = os.environ.get("INSIGHTVM_HOST", "") # Format: <ip/hostname>:<port>
USER = os.environ.get("INSIGHTVM_USER", "") # InsightVM Console user with permissions to generate reports
PASS = os.environ.get("INSIGHTVM_PASS", "")
SSL_VERIFY = os.environ.get("INSIGHTVM_SSL_VERIFY", "true") # Override to False to ignore certification verification
if any(v is None or v is "" for v in [HOST, USER, PASS]):
sys.exit("Host, user, or password not defined; check environment variables and try again!")
# Initialize API helper
api = InsightVmApi(HOST, USER, PASS, SSL_VERIFY)
# Search for all linux devices scanned in last 30 days
search_filters = [
{
"field": "last-scan-date",
"operator": "is-within-the-last",
"value": 30
},
{
"field": "operating-system",
"operator": "contains",
"value": "Linux"
}
]
assets = api.asset_search(search_filters, "all")
print(f"{len(assets)} assets matched filter {search_filters}")
# Search for all devices in agent site (always site ID 1)
search_filters = [
{
"field": "site-id",
"operator": "in",
"values": [1]
}
]
assets = api.asset_search(search_filters, "all")
print(f"{len(assets)} assets matched filter {search_filters}")
# Search for all Linux or Mac devices
search_filters = [
{
"field": "operating-system",
"operator": "contains",
"value": "Mac"
},
{
"field": "operating-system",
"operator": "contains",
"value": "Linux"
}
]
assets = api.asset_search(search_filters, "any")
print(f"{len(assets)} assets matched filter {search_filters}")