Path: blob/main/cyberbattle/simulation/actions_test.py
597 views
# Copyright (c) Microsoft Corporation.1# Licensed under the MIT License.23"""4This is the set of tests for actions.py which implements the actions an agent can take5in this simulation.6"""78import random9from datetime import datetime10from typing import Dict, List1112import pytest13import networkx as nx1415from . import model, actions1617ADMINTAG = model.AdminEscalation().tag18SYSTEMTAG = model.SystemEscalation().tag1920# pylint: disable=redefined-outer-name, protected-access21Fixture = actions.AgentActions2223empty_vuln_dict: Dict[model.VulnerabilityID, model.VulnerabilityInfo] = {}24SINGLE_VULNERABILITIES = {25"UACME61": model.VulnerabilityInfo(26description="UACME UAC bypass #61",27type=model.VulnerabilityType.LOCAL,28URL="https://github.com/hfiref0x/UACME",29precondition=model.Precondition(f"Windows&Win10&(~({ADMINTAG}|{SYSTEMTAG}))"),30outcome=model.AdminEscalation(),31rates=model.Rates(0, 0.2, 1.0),32)33}3435# temporary vuln dictionary for development purposes only.36# Remove once the full list of vulnerabilities is put together37# here we'll have 1 UAC bypass, 1 credential dump, and 1 remote infection vulnerability38SAMPLE_VULNERABILITIES = {39"UACME61": model.VulnerabilityInfo(40description="UACME UAC bypass #61",41type=model.VulnerabilityType.LOCAL,42URL="https://github.com/hfiref0x/UACME",43precondition=model.Precondition(f"Windows&Win10&(~({ADMINTAG}|{SYSTEMTAG}))"),44outcome=model.AdminEscalation(),45rates=model.Rates(0, 0.2, 1.0),46),47"UACME67": model.VulnerabilityInfo(48description="UACME UAC bypass #67 (fake system escalation) ",49type=model.VulnerabilityType.LOCAL,50URL="https://github.com/hfiref0x/UACME",51precondition=model.Precondition(f"Windows&Win10&(~({ADMINTAG}|{SYSTEMTAG}))"),52outcome=model.SystemEscalation(),53rates=model.Rates(0, 0.2, 1.0),54),55"MimikatzLogonpasswords": model.VulnerabilityInfo(56description="Mimikatz sekurlsa::logonpasswords.",57type=model.VulnerabilityType.LOCAL,58URL="https://github.com/gentilkiwi/mimikatz",59precondition=model.Precondition(f"Windows&({ADMINTAG}|{SYSTEMTAG})"),60outcome=model.LeakedCredentials([]),61rates=model.Rates(0, 1.0, 1.0),62),63"RDPBF": model.VulnerabilityInfo(64description="RDP Brute Force",65type=model.VulnerabilityType.REMOTE,66URL="https://attack.mitre.org/techniques/T1110/",67precondition=model.Precondition("Windows&PortRDPOpen"),68outcome=model.LateralMove(),69rates=model.Rates(0, 0.2, 1.0),70cost=1.0,71),72}7374ENV_IDENTIFIERS = model.Identifiers(75local_vulnerabilities=["UACME61", "UACME67", "MimikatzLogonpasswords", "UACME61"],76remote_vulnerabilities=["RDPBF"],77ports=["RDP", "HTTP", "HTTPS", "SSH"],78properties=[79"Linux",80"PortSSHOpen",81"PortSQLOpen",82"Windows",83"Win10",84"PortRDPOpen",85"PortHTTPOpen",86"PortHTTPsOpen",87"SharepointLeakingPassword",88],89)909192def sample_random_firwall_configuration() -> model.FirewallConfiguration:93"""Sample a random firewall set of rules"""94return model.FirewallConfiguration(95outgoing=[96model.FirewallRule(p, permission=model.RulePermission.ALLOW)97for p in random.choices(98ENV_IDENTIFIERS.properties,99k=random.randint(0, len(ENV_IDENTIFIERS.properties)),100)101],102incoming=[103model.FirewallRule(p, permission=model.RulePermission.ALLOW)104for p in random.choices(105ENV_IDENTIFIERS.properties,106k=random.randint(0, len(ENV_IDENTIFIERS.properties)),107)108],109)110111112# temporary info for a single node network113SINGLE_NODE = {114"a": model.NodeInfo(115services=[116model.ListeningService("RDP"),117model.ListeningService("HTTP"),118model.ListeningService("HTTPS"),119],120value=70,121properties=list(["Windows", "Win10", "PortRDPOpen", "PortHTTPOpen", "PortHTTPsOpen"]),122firewall=sample_random_firwall_configuration(),123agent_installed=False,124)125}126127# temporary info for 4 nodes128# a is a windows web server, b is linux SQL server, c is a windows workstation,129# and dc is a domain controller130NODES = {131"a": model.NodeInfo(132services=[133model.ListeningService("RDP"),134model.ListeningService("HTTP"),135model.ListeningService("HTTPS"),136],137value=70,138properties=list(["Windows", "Win10", "PortRDPOpen", "PortHTTPOpen", "PortHTTPsOpen"]),139vulnerabilities=dict(140ListNeighbors=model.VulnerabilityInfo(141description="reveal other nodes",142type=model.VulnerabilityType.LOCAL,143outcome=model.LeakedNodesId(nodes=["b", "c", "dc"]),144),145DumpCreds=model.VulnerabilityInfo(146description="leaking some creds",147type=model.VulnerabilityType.LOCAL,148outcome=model.LeakedCredentials(149[150model.CachedCredential("Sharepoint", "HTTPS", "ADPrincipalCreds"),151model.CachedCredential("Sharepoint", "HTTPS", "cred"),152]153),154),155),156agent_installed=True,157),158"b": model.NodeInfo(159services=[model.ListeningService("SSH"), model.ListeningService("SQL")],160value=80,161properties=list(["Linux", "PortSSHOpen", "PortSQLOpen"]),162agent_installed=False,163),164"c": model.NodeInfo(165services=[166model.ListeningService("RDP"),167model.ListeningService("HTTP"),168model.ListeningService("HTTPS"),169],170value=40,171properties=list(["Windows", "Win10", "PortRDPOpen", "PortHTTPOpen", "PortHTTPsOpen"]),172agent_installed=True,173),174"dc": model.NodeInfo(175services=[model.ListeningService("RDP"), model.ListeningService("WMI")],176value=100,177properties=list(["Windows", "Win10", "PortRDPOpen", "PortWMIOpen"]),178agent_installed=False,179),180"Sharepoint": model.NodeInfo(181services=[model.ListeningService("HTTPS", allowedCredentials=["ADPrincipalCreds"])],182value=100,183properties=["SharepointLeakingPassword"],184firewall=model.FirewallConfiguration(185incoming=[186model.FirewallRule(port="SSH", permission=model.RulePermission.ALLOW),187model.FirewallRule(port="HTTPS", permission=model.RulePermission.ALLOW),188model.FirewallRule(port="HTTP", permission=model.RulePermission.ALLOW),189model.FirewallRule(port="RDP", permission=model.RulePermission.BLOCK),190],191outgoing=[],192),193vulnerabilities=dict(194ScanSharepointParentDirectory=model.VulnerabilityInfo(195description="Navigate to SharePoint site, browse parent " "directory",196type=model.VulnerabilityType.REMOTE,197outcome=model.LeakedCredentials(198credentials=[199model.CachedCredential(200node="AzureResourceManager",201port="HTTPS",202credential="ADPrincipalCreds",203)204]205),206rates=model.Rates(successRate=1.0),207cost=1.0,208)209),210),211}212213214# Define an environment from this graph215ENV = model.Environment(216network=model.create_network(NODES),217vulnerability_library=dict([]),218identifiers=ENV_IDENTIFIERS,219creationTime=datetime.utcnow(),220lastModified=datetime.utcnow(),221)222223224@pytest.fixture225def actions_on_empty_environment() -> actions.AgentActions:226"""227the test fixtures to reduce the amount of overhead228This fixture will provide us with an empty environment.229"""230egraph = nx.empty_graph(0, create_using=nx.DiGraph())231env = model.Environment(232network=egraph,233version=model.VERSION_TAG,234vulnerability_library=SAMPLE_VULNERABILITIES,235identifiers=ENV_IDENTIFIERS,236creationTime=datetime.utcnow(),237lastModified=datetime.utcnow(),238)239return actions.AgentActions(env)240241242@pytest.fixture243def actions_on_single_node_environment() -> actions.AgentActions:244"""245This fixture will provide us with a single node environment246"""247env = model.Environment(248network=model.create_network(SINGLE_NODE),249version=model.VERSION_TAG,250vulnerability_library=SAMPLE_VULNERABILITIES,251identifiers=ENV_IDENTIFIERS,252creationTime=datetime.utcnow(),253lastModified=datetime.utcnow(),254)255return actions.AgentActions(env)256257258@pytest.fixture259def actions_on_simple_environment() -> actions.AgentActions:260"""261This fixture will provide us with a 4 node environment environment.262simulating three workstations connected to a single server263"""264env = model.Environment(265network=model.create_network(NODES),266version=model.VERSION_TAG,267vulnerability_library=SAMPLE_VULNERABILITIES,268identifiers=ENV_IDENTIFIERS,269creationTime=datetime.utcnow(),270lastModified=datetime.utcnow(),271)272return actions.AgentActions(env)273274275def test_list_vulnerabilities_function(actions_on_single_node_environment: Fixture, actions_on_simple_environment: Fixture) -> None:276"""277This function will test the list_vulnerabilities function from the278AgentActions class in actions.py279"""280# test on an environment with a single node281single_node_results: List[model.VulnerabilityID] = []282single_node_results = actions_on_single_node_environment.list_vulnerabilities_in_target("a")283assert len(single_node_results) == 3284285simple_graph_results: List[model.VulnerabilityID] = []286simple_graph_results = actions_on_simple_environment.list_vulnerabilities_in_target("dc")287assert len(simple_graph_results) == 3288289290def test_exploit_remote_vulnerability(actions_on_simple_environment: Fixture) -> None:291"""292This function will test the exploit_remote_vulnerability function from the293AgentActions class in actions.py294"""295296actions_on_simple_environment.exploit_local_vulnerability("a", "ListNeighbors")297298# test with invalid source node299with pytest.raises(ValueError, match=r"invalid node id '.*'"):300actions_on_simple_environment.exploit_remote_vulnerability("z", "b", "RDPBF")301302# test with invalid destination node303with pytest.raises(ValueError, match=r"invalid target node id '.*'"):304actions_on_simple_environment.exploit_remote_vulnerability("a", "z", "RDPBF")305306# test with a local vulnerability307with pytest.raises(ValueError, match=r"vulnerability id '.*' is for an attack of type .*"):308actions_on_simple_environment.exploit_remote_vulnerability("a", "c", "MimikatzLogonpasswords")309310# test with an invalid vulnerability (one not there)311result = actions_on_simple_environment.exploit_remote_vulnerability("a", "c", "HackTheGibson")312assert result.outcome is None and result.reward <= 0313314# add RDP brute force to the target node315# very hacky not to be used normally.316graph: nx.graph.Graph = actions_on_simple_environment._environment.network317node: model.NodeInfo = graph.nodes["c"]["data"]318node.vulnerabilities = SAMPLE_VULNERABILITIES319320# test a valid and functional one.321result = actions_on_simple_environment.exploit_remote_vulnerability("a", "c", "RDPBF")322assert isinstance(result.outcome, model.LateralMove)323assert result.reward < node.value324325326def test_exploit_local_vulnerability(actions_on_simple_environment: Fixture) -> None:327"""328This function will test the exploit_local_vulnerability function from the329AgentActions class in actions.py330"""331332# check one with invalid prerequisites333result: actions.ActionResult = actions_on_simple_environment.exploit_local_vulnerability("a", "MimikatzLogonpasswords")334assert isinstance(result.outcome, model.ExploitFailed)335336# test admin privilege escalation337# exploit_local_vulnerability(node_id, vulnerability_id)338result = actions_on_simple_environment.exploit_local_vulnerability("a", "UACME61")339assert isinstance(result.outcome, model.AdminEscalation)340node: model.NodeInfo = actions_on_simple_environment._environment.network.nodes["a"]["data"]341assert model.AdminEscalation().tag in node.properties342343# test system privilege escalation344result = actions_on_simple_environment.exploit_local_vulnerability("c", "UACME67")345assert isinstance(result.outcome, model.SystemEscalation)346node = actions_on_simple_environment._environment.network.nodes["c"]["data"]347assert model.SystemEscalation().tag in node.properties348349# test dump credentials350result = actions_on_simple_environment.exploit_local_vulnerability("a", "MimikatzLogonpasswords")351assert isinstance(result.outcome, model.LeakedCredentials)352353354def test_connect_to_remote_machine(355actions_on_empty_environment: Fixture,356actions_on_single_node_environment: Fixture,357actions_on_simple_environment: Fixture,358) -> None:359"""360This function will test the connect_to_remote_machine function from the361AgentActions class in actions.py362"""363actions_on_simple_environment.exploit_local_vulnerability("a", "ListNeighbors")364actions_on_simple_environment.exploit_local_vulnerability("a", "DumpCreds")365366# test connect to remote machine on an empty environment367with pytest.raises(ValueError, match=r"invalid node id '.*'"):368actions_on_empty_environment.connect_to_remote_machine("a", "b", "RDP", "cred")369370# test connect to remote machine on an environment with 1 node371with pytest.raises(ValueError, match=r"invalid node id '.*'"):372actions_on_single_node_environment.connect_to_remote_machine("a", "b", "RDP", "cred")373374graph: nx.graph.Graph = actions_on_simple_environment._environment.network375376# test connect to remote machine on an environment with multiple nodes377# test with valid source node and invalid destination node378with pytest.raises(ValueError, match=r"invalid node id '.*'"):379actions_on_simple_environment.connect_to_remote_machine("a", "f", "RDP", "cred")380381# test with an invalid source node and valid destination node382with pytest.raises(ValueError, match=r"invalid node id '.*'"):383actions_on_simple_environment.connect_to_remote_machine("f", "dc", "RDP", "cred")384385# test with both nodes invalid386with pytest.raises(ValueError, match=r"invalid node id '.*'"):387actions_on_simple_environment.connect_to_remote_machine("f", "z", "RDP", "cred")388389# test with invalid protocol390result = actions_on_simple_environment.connect_to_remote_machine("a", "dc", "TCPIP", "cred")391assert result.reward <= 0 and result.outcome is None392393# test with invalid credentials394result2 = actions_on_simple_environment.connect_to_remote_machine("a", "dc", "RDP", "cred")395assert result2.outcome is None and result2.reward <= 0396397# test blocking firewall rule398ret_val = actions_on_simple_environment.connect_to_remote_machine("a", "Sharepoint", "RDP", "ADPrincipalCreds")399assert ret_val.reward < 0400401# test with valid nodes402ret_val = actions_on_simple_environment.connect_to_remote_machine("a", "Sharepoint", "HTTPS", "ADPrincipalCreds")403404assert ret_val.reward == 100405406assert graph.has_edge("a", "dc")407408409def test_check_prerequisites(actions_on_simple_environment: Fixture) -> None:410"""411This function will test the _checkPrerequisites function412It's marked as a private function but still needs to be tested before use413414"""415# testing on a node/vuln combo which should give us a negative result416result = actions_on_simple_environment._check_prerequisites("dc", SAMPLE_VULNERABILITIES["MimikatzLogonpasswords"])417assert not result418419# testing on a node/vuln combo which should give us a positive reuslt.420result = actions_on_simple_environment._check_prerequisites("dc", SAMPLE_VULNERABILITIES["UACME61"])421assert result422423424