Path: blob/main/cyberbattle/samples/active_directory/generate_ad.py
597 views
"""Generating random active directory networks"""12import random3from typing import Any4from cyberbattle.simulation.model import FirewallConfiguration, FirewallRule, Identifiers, RulePermission5from cyberbattle.simulation import model as m6import networkx as nx78ENV_IDENTIFIERS = Identifiers(9properties=[10"breach_node",11"domain_controller",12"admin", # whether or not the users of this machine are admins13],14ports=["SMB", "AD", "SHELL"],15local_vulnerabilities=["FindDomainControllers", "EnumerateFileShares", "AuthorizationSpoofAndCrack", "ScanForCreds", "DumpNTDS", "ProbeAdmin"],16remote_vulnerabilities=["PasswordSpray"],17)181920def create_network_from_smb_traffic(n_clients: int, n_servers: int, n_users: int) -> nx.DiGraph:21graph = nx.DiGraph()22graph.add_nodes_from([f"workstation_{i}" for i in range(0, n_clients)])23graph.add_nodes_from([f"share_{i}" for i in range(0, n_servers)])24graph.add_node("domain_controller_1")25# NOTE: The following is not needed. The AgentActions code will annotate our network based on dynamic discovery26# graph.add_edges_from([(f"share_{i}", f"workstation_{j}") for i in range(0, n_servers) for j in range(0, n_clients)], protocol="SMB")27# graph.add_edges_from([("domain_controller_1", f"workstation_{i}") for i in range(0, n_clients)], protocol="AD")28# graph.add_edges_from([("domain_controller_1", f"share_{i}") for i in range(0, n_servers)], protocol="AD")2930firewall_conf = FirewallConfiguration(31[FirewallRule("SMB", RulePermission.ALLOW), FirewallRule("AD", RulePermission.ALLOW), FirewallRule("SHELL", RulePermission.ALLOW)],32[FirewallRule("SMB", RulePermission.ALLOW), FirewallRule("AD", RulePermission.ALLOW), FirewallRule("SHELL", RulePermission.ALLOW)],33)3435def default_vulnerabilities() -> m.VulnerabilityLibrary:36lib = {}37lib["FindDomainControllers"] = m.VulnerabilityInfo(38description="Search for valid domain controllers in the current machines environment.",39type=m.VulnerabilityType.LOCAL,40outcome=m.LeakedNodesId(nodes=["domain_controller_1"]),41reward_string="Found domain controllers",42)43lib["EnumerateFileShares"] = m.VulnerabilityInfo(44description="Find all SMB shares this machine knows about.",45type=m.VulnerabilityType.LOCAL,46outcome=m.LeakedNodesId(nodes=[f"share_{i}" for i in range(0, n_servers)]),47reward_string="Found shares",48)49lib["ProbeAdmin"] = m.VulnerabilityInfo(50description="Probe a workstation to see if you have admin creds on it", type=m.VulnerabilityType.LOCAL, outcome=m.ProbeFailed(), reward_string="No admin creds."51)52lib["ScanForCreds"] = m.VulnerabilityInfo(53description="Scan the local security managers for credentials. Need to be admin on the box.",54type=m.VulnerabilityType.LOCAL,55outcome=m.LeakedCredentials(credentials=[m.CachedCredential(node="domain_controller_1", port="AD", credential="dc_1")]),56precondition=m.Precondition("admin"),57rates=m.Rates(successRate=0.9),58reward_string="DA credentials found",59)60return lib6162def breach_vulnerabilities(lib: m.VulnerabilityLibrary) -> m.VulnerabilityLibrary:63# NOTE: The randomization here does not support random credential leakage... This is kind of a major limitation for tooling like responder64credrandset = set([random.randrange(0, n_users) for i in range(0, random.randrange(3, n_clients))])65lib["AuthorizationSpoofAndCrack"] = m.VulnerabilityInfo(66description="Spoof an authoritative source on the network to get a crackable hash, then try to crack it",67type=m.VulnerabilityType.LOCAL,68outcome=m.LeakedCredentials(69credentials=[m.CachedCredential(node=f"share_{shareid}", port="SMB", credential=f"user_{credind}") for credind in credrandset for shareid in range(0, n_servers)]70+ [m.CachedCredential(node=f"workstation_{credind % n_clients}", port="SHELL", credential=f"user_{credind}") for credind in credrandset]71),72)73return lib7475def admin_vulnerabilities(lib: m.VulnerabilityLibrary) -> m.VulnerabilityLibrary:76lib["ProbeAdmin"] = m.VulnerabilityInfo(77description="Probe a workstation to see if you have admin creds on it",78type=m.VulnerabilityType.LOCAL,79outcome=m.ProbeSucceeded(discovered_properties=["admin"]),80reward_string="Admin creds verified.",81)82return lib8384def dc_vulnerabilities(lib: m.VulnerabilityLibrary) -> m.VulnerabilityLibrary:85lib["DumpNTDS"] = m.VulnerabilityInfo(86description="Dump the NTDS file from AD",87type=m.VulnerabilityType.LOCAL,88outcome=m.LeakedCredentials(89[m.CachedCredential(node=f"share_{shareind}", port="SMB", credential=f"user_{credind}") for credind in range(0, n_users) for shareind in range(0, n_servers)]90+ [m.CachedCredential(node=f"workstation_{wkid}", port="SHELL", credential=f"user_{uid}") for wkid in range(0, n_clients) for uid in range(0, n_users)]91),92precondition=m.Precondition("domain_controller"),93reward_string="Dumped all user hashes. Get crackin'",94)95return lib9697# Workstation 1 is our entry node98entry_node_id = "workstation_0"99graph.nodes[entry_node_id].clear()100graph.nodes[entry_node_id].update(101{102"data": m.NodeInfo(103services=[], value=0, properties=["breach_node"], vulnerabilities=breach_vulnerabilities(default_vulnerabilities()), agent_installed=True, firewall=firewall_conf, reimagable=False104)105}106)107108for i in range(1, n_clients):109nodeid = f"workstation_{i}"110graph.nodes[nodeid].clear()111props = []112vulns = default_vulnerabilities()113if random.random() > 0.2: # TODO: make this value rarer as network size increases? Kinda wonky considering we only have one exploit path really114props = ["admin"]115vulns = admin_vulnerabilities(vulns)116graph.nodes[nodeid].update(117{118"data": m.NodeInfo(119services=[m.ListeningService(name="SHELL", allowedCredentials=[f"user_{uid}" for uid in range(0, n_users) if uid % n_clients == i])],120properties=props,121value=1,122firewall=firewall_conf,123vulnerabilities=vulns,124)125}126)127128for i in range(0, n_servers):129nodeid = f"share_{i}"130graph.nodes[nodeid].clear()131graph.nodes[nodeid].update(132{133"data": m.NodeInfo(134services=[m.ListeningService(name="SMB", allowedCredentials=[f"user_{sid}" for sid in range(0, n_users) if sid % n_servers == i])],135properties=[],136value=5,137firewall=firewall_conf,138vulnerabilities=default_vulnerabilities(),139)140}141)142143nodeid = "domain_controller_1"144graph.nodes[nodeid].clear()145graph.nodes[nodeid].update(146{147"data": m.NodeInfo(148services=[m.ListeningService(name="AD", allowedCredentials=["dc_1"])],149properties=["domain_controller"],150value=1000,151firewall=firewall_conf,152vulnerabilities=dc_vulnerabilities(default_vulnerabilities()),153)154}155)156157return graph158159160def new_random_environment(seed: Any) -> m.Environment:161"""Create a new simulation environment based on162a randomly generated network topology for SMB shares.163"""164random.seed(seed)165clients = random.randrange(5, 10)166servers = random.randrange(1, 2)167users = random.randrange(20, 100)168network = create_network_from_smb_traffic(clients, servers, users)169170return m.Environment(network=network, vulnerability_library=dict([]), identifiers=ENV_IDENTIFIERS)171172173