Workflow for sending InsightIDR investigations to teams

Anyone have an example workflow for sending InsightIDR investigations to teams?

Struggling to get this working at the moment I have a workflow setup with the “Insight IDR - Detection Rule” trigger set on certain logsets and then the next action is to send the teams message to a channel.

It is failing with a can’t find the variable in any previous steps error, do I have to use an artifact to extract this?

Cheers.

Hey @esoteric

Have you taken a look at Rapid7 Extensions ? We have a modified version of this running. If you have not, it might be a good starting point.

That said, when in doubt, I do use artifacts to help “debug” as I go while working on a Workflow.

Hope this helps

Good luck

Marco

I’ve gotten this workflow semi working, can’t get the json2html module correctly working with the python connection.

Did you run into any issues with this side of setup?

I had the same problem where it failed to load. With an unashamedly large bit of chatgpt I found this to work. In the “Convert Evidence to HTML” circle near the bottom of the workflow replace the function value with the below:

def run(params={}):

import json as json_parser
from collections import OrderedDict
from html import escape as html_escape

_TEXT_TYPES = (str, bytes, bytearray)

def _to_text(x):
    if isinstance(x, (bytes, bytearray)):
        try:
            return x.decode("utf-8", errors="replace")
        except Exception:
            return str(x)
    return str(x)

def _is_text(x):
    return isinstance(x, _TEXT_TYPES)

def _is_dict_like(x):
    return hasattr(x, "items")

def _is_list_like(x):
    return (not _is_text(x)) and hasattr(x, "__iter__") and hasattr(x, "__getitem__")

def _maybe_escape(s, do_escape=True):
    return html_escape(s, quote=True) if do_escape else s

def _column_headers_from_list_of_dicts(list_input):
    if (not list_input) or (not hasattr(list_input[0], "keys")):
        return None
    headers = list(list_input[0].keys())
    for entry in list_input:
        if not hasattr(entry, "keys"):
            return None
        for h in headers:
            if h not in entry:
                return None
    return headers

def convert(json="", table_attributes='border="1"', clubbing=True, encode=False, escape=True):
    if json is None or json == "":
        json_input = {}
    elif _is_text(json):
        s = _to_text(json)
        try:
            json_input = json_parser.loads(s, object_pairs_hook=OrderedDict)
        except Exception:
            json_input = s
    else:
        json_input = json

    table_init = f"<table {table_attributes}>"

    def convert_node(node):
        if _is_text(node):
            s = _to_text(node)
            s = _maybe_escape(s, escape)
            if s.startswith("https://"):
                return f'<a href="{s}">{s}</a>'
            return s

        if _is_dict_like(node):
            return convert_object(node)

        if _is_list_like(node):
            return convert_list(node)

        return _maybe_escape(_to_text(node), escape)

    def convert_object(obj):
        if not obj:
            return ""
        out = [table_init]
        for k, v in obj.items():
            out.append("<tr>")
            out.append(f"<th>{convert_node(k)}</th>")
            out.append(f"<td>{convert_node(v)}</td>")
            out.append("</tr>")
        out.append("</table>")
        return "".join(out)

    def convert_list(list_input):
        if not list_input:
            return ""

        if clubbing:
            headers = _column_headers_from_list_of_dicts(list_input)
            if headers is not None:
                out = [
                    table_init,
                    "<thead><tr><th>",
                    "</th><th>".join([convert_node(h) for h in headers]),
                    "</th></tr></thead><tbody>",
                ]
                for entry in list_input:
                    out.append("<tr><td>")
                    out.append("</td><td>".join([convert_node(entry.get(h, "")) for h in headers]))
                    out.append("</td></tr>")
                out.append("</tbody></table>")
                return "".join(out)

        out = ["<ul><li>"]
        out.append("</li><li>".join([convert_node(child) for child in list_input]))
        out.append("</li></ul>")
        return "".join(out)

    rendered = convert_node(json_input)
    if encode:
        return rendered.encode("ascii", "xmlcharrefreplace")
    return rendered

input_json = params.get("json") or []
flat_list = []
for row in input_json:
    if isinstance(row, list):
        flat_list.extend(row)
    else:
        flat_list.append(row)

output = convert(json=flat_list, clubbing=False)
return {"html": output}

Can you please go into more detail with the issue. Have you tried sending a teams message without anything extra. As an example, open a Snippet instead of a workflow, have a single action to send a teams message. Ensure that you can send the message to the channel.

After you have confirmed this works, you can troubleshoot the workflow. Are you getting an error message?