Path: blob/main/cyberbattle/simulation/environment_generation.py
597 views
# Copyright (c) Microsoft Corporation.1# Licensed under the MIT License.23"""4environment_generation.py this function generates a semi random environment for5the loonshot simulation v0.6"""78from typing import List, Dict, Set9import random10import re11import networkx as nx12from . import model1314# These two lists are lists of potential vulnerabilities. They are split into linux vulnerabilities15# and Windows vulnerabilities so i can1617ADMINTAG = model.AdminEscalation().tag18SYSTEMTAG = model.SystemEscalation().tag1920potential_windows_vulns = {21"UACME43": model.VulnerabilityInfo(22description="UACME UAC bypass #43",23type=model.VulnerabilityType.LOCAL,24URL="https://github.com/hfiref0x/UACME",25precondition=model.Precondition(f"Windows&(Win10|Win7)&(~({ADMINTAG}|{SYSTEMTAG}))"),26outcome=model.AdminEscalation(),27rates=model.Rates(0, 0.2, 1.0),28),29"UACME45": model.VulnerabilityInfo(30description="UACME UAC bypass #45",31type=model.VulnerabilityType.LOCAL,32URL="https://github.com/hfiref0x/UACME",33precondition=model.Precondition(f"Windows&Win10&(~({ADMINTAG}|{SYSTEMTAG}))"),34outcome=model.AdminEscalation(),35rates=model.Rates(0, 0.2, 1.0),36),37"UACME52": model.VulnerabilityInfo(38description="UACME UAC bypass #52",39type=model.VulnerabilityType.LOCAL,40URL="https://github.com/hfiref0x/UACME",41precondition=model.Precondition(f"Windows&(Win10|Win7)&(~({ADMINTAG}|{SYSTEMTAG}))"),42outcome=model.AdminEscalation(),43rates=model.Rates(0, 0.2, 1.0),44),45"UACME55": model.VulnerabilityInfo(46description="UACME UAC bypass #55",47type=model.VulnerabilityType.LOCAL,48URL="https://github.com/hfiref0x/UACME",49precondition=model.Precondition(f"Windows&(Win10|Win7)&(~({ADMINTAG}|{SYSTEMTAG}))"),50outcome=model.AdminEscalation(),51rates=model.Rates(0, 0.2, 1.0),52),53"UACME61": model.VulnerabilityInfo(54description="UACME UAC bypass #61",55type=model.VulnerabilityType.LOCAL,56URL="https://github.com/hfiref0x/UACME",57precondition=model.Precondition(f"Windows&Win10&(~({ADMINTAG}|{SYSTEMTAG}))"),58outcome=model.AdminEscalation(),59rates=model.Rates(0, 0.2, 1.0),60),61"MimikatzLogonpasswords": model.VulnerabilityInfo(62description="Mimikatz sekurlsa::logonpasswords.",63type=model.VulnerabilityType.LOCAL,64URL="https://github.com/gentilkiwi/mimikatz",65precondition=model.Precondition(f"Windows&({ADMINTAG}|{SYSTEMTAG})"),66outcome=model.LeakedCredentials([]),67rates=model.Rates(0, 1.0, 1.0),68),69"MimikatzKerberosExport": model.VulnerabilityInfo(70description="Mimikatz Kerberos::list /export." "Exports .kirbi files to be used with pass the ticket",71type=model.VulnerabilityType.LOCAL,72URL="https://github.com/gentilkiwi/mimikatz",73precondition=model.Precondition(f"Windows&DomainJoined&({ADMINTAG}|{SYSTEMTAG})"),74outcome=model.LeakedCredentials([]),75rates=model.Rates(0, 1.0, 1.0),76),77"PassTheTicket": model.VulnerabilityInfo(78description="Mimikatz Kerberos::ptt /export." "Exports .kirbi files to be used with pass the ticket",79type=model.VulnerabilityType.REMOTE,80URL="https://github.com/gentilkiwi/mimikatz",81precondition=model.Precondition(f"Windows&DomainJoined&KerberosTicketsDumped" f"&({ADMINTAG}|{SYSTEMTAG})"),82outcome=model.LeakedCredentials([]),83rates=model.Rates(0, 1.0, 1.0),84),85"RDPBF": model.VulnerabilityInfo(86description="RDP Brute Force",87type=model.VulnerabilityType.REMOTE,88URL="https://attack.mitre.org/techniques/T1110/",89precondition=model.Precondition("Windows&PortRDPOpen"),90outcome=model.LateralMove(),91rates=model.Rates(0, 0.2, 1.0),92),93"SMBBF": model.VulnerabilityInfo(94description="SSH Brute Force",95type=model.VulnerabilityType.REMOTE,96URL="https://attack.mitre.org/techniques/T1110/",97precondition=model.Precondition("(Windows|Linux)&PortSMBOpen"),98outcome=model.LateralMove(),99rates=model.Rates(0, 0.2, 1.0),100),101}102103potential_linux_vulns = {104"SudoCaching": model.VulnerabilityInfo(105description="Escalating privileges from poorly configured sudo on linux/unix machines",106type=model.VulnerabilityType.REMOTE,107URL="https://attack.mitre.org/techniques/T1206/",108precondition=model.Precondition(f"Linux&(~{ADMINTAG})"),109outcome=model.AdminEscalation(),110rates=model.Rates(0, 1.0, 1.0),111),112"SSHBF": model.VulnerabilityInfo(113description="SSH Brute Force",114type=model.VulnerabilityType.REMOTE,115URL="https://attack.mitre.org/techniques/T1110/",116precondition=model.Precondition("Linux&PortSSHOpen"),117outcome=model.LateralMove(),118rates=model.Rates(0, 0.2, 1.0),119),120"SMBBF": model.VulnerabilityInfo(121description="SSH Brute Force",122type=model.VulnerabilityType.REMOTE,123URL="https://attack.mitre.org/techniques/T1110/",124precondition=model.Precondition("(Windows|Linux)&PortSMBOpen"),125outcome=model.LateralMove(),126rates=model.Rates(0, 0.2, 1.0),127),128}129130# These are potential endpoints that can be open in a game. Note to add any more endpoints simply131# add the protocol name to this list.132# further note that ports are stored in a tuple. This is because some protoocls133# (like SMB) have multiple official ports.134potential_ports: List[model.PortName] = ["RDP", "SSH", "HTTP", "HTTPs", "SMB", "SQL", "FTP", "WMI"]135136# These two lists are potential node states. They are split into linux states and windows137# states so that we can generate real graphs that aren't just totally random.138potential_linux_node_states: List[model.PropertyName] = ["Linux", ADMINTAG, "PortRDPOpen", "PortHTTPOpen", "PortHTTPsOpen", "PortSSHOpen", "PortSMBOpen", "PortFTPOpen", "DomainJoined"]139potential_windows_node_states: List[model.PropertyName] = [140"Windows",141"Win10",142"PortRDPOpen",143"PortHTTPOpen",144"PortHTTPsOpen",145"PortSSHOpen",146"PortSMBOpen",147"PortFTPOpen",148"BITSEnabled",149"Win7",150"DomainJoined",151]152153ENV_IDENTIFIERS = model.Identifiers(154ports=potential_ports,155properties=potential_linux_node_states + potential_windows_node_states,156local_vulnerabilities=list(potential_windows_vulns.keys()),157remote_vulnerabilities=list(potential_windows_vulns.keys()),158)159160161def create_random_environment(name: str, size: int) -> model.Environment:162"""163This is the create random environment function. It takes a string for the name164of the environment and an int for the size. It returns a randomly genernated165environment.166167Note this does not currently support generating credentials.168"""169if not name:170raise ValueError("Please supply a non empty string for the name")171172if size < 1:173raise ValueError("Please supply a positive non zero positive" "integer for the size of the environment")174graph = nx.DiGraph()175nodes: Dict[str, model.NodeInfo] = {}176177# append the linux and windows vulnerability dictionaries178local_vuln_lib: Dict[model.VulnerabilityID, model.VulnerabilityInfo] = {**potential_windows_vulns, **potential_linux_vulns}179180os_types: List[str] = ["Linux", "Windows"]181for i in range(size):182rand_os: str = os_types[random.randint(0, 1)]183nodes[str(i)] = create_random_node(rand_os, potential_ports)184185nodes["0"].agent_installed = True186187graph.add_nodes_from([(k, {"data": v}) for (k, v) in list(nodes.items())])188189return model.Environment(network=graph, vulnerability_library=local_vuln_lib, identifiers=ENV_IDENTIFIERS)190191192def create_random_node(os_type: str, end_points: List[model.PortName]) -> model.NodeInfo:193"""194This is the create random node function.195Currently it takes a string for the OS type and returns a NodeInfo object196Options for OS type are currently Linux or Windows,197Options for the role are Server or Workstation198"""199200if not end_points:201raise ValueError("No endpoints supplied")202203if os_type not in ("Windows", "Linux"):204raise ValueError("Unsupported OS Type please enter Linux or Windows")205206# get the vulnerability dictionary for the important OS207vulnerabilities: model.VulnerabilityLibrary = dict([])208if os_type == "Linux":209vulnerabilities = select_random_vulnerabilities(os_type, random.randint(1, len(potential_linux_vulns)))210else:211vulnerabilities = select_random_vulnerabilities(os_type, random.randint(1, len(potential_windows_vulns)))212213firewall: model.FirewallConfiguration = create_firewall_rules(end_points)214properties: List[model.PropertyName] = get_properties_from_vulnerabilities(os_type, vulnerabilities)215216return model.NodeInfo(217services=[model.ListeningService(name=p) for p in end_points], vulnerabilities=vulnerabilities, value=int(random.random()), properties=properties, firewall=firewall, agent_installed=False218)219220221def select_random_vulnerabilities(os_type: str, num_vulns: int) -> Dict[str, model.VulnerabilityInfo]:222"""223It takes an a string for the OS type, and an int for the number of224vulnerabilities to select.225226It selects num_vulns vulnerabilities from the global list of vulnerabilities for that227specific operating system. It returns a dictionary of VulnerabilityInfo objects to228the caller.229"""230231if num_vulns < 1:232raise ValueError("Expected a positive value for num_vulns in select_random_vulnerabilities")233234ret_val: Dict[str, model.VulnerabilityInfo] = {}235keys: List[str]236if os_type == "Linux":237keys = random.sample(list(potential_linux_vulns.keys()), num_vulns)238ret_val = {k: potential_linux_vulns[k] for k in keys}239elif os_type == "Windows":240keys = random.sample(list(potential_windows_vulns.keys()), num_vulns)241ret_val = {k: potential_windows_vulns[k] for k in keys}242else:243raise ValueError("Invalid Operating System supplied to select_random_vulnerabilities")244return ret_val245246247def get_properties_from_vulnerabilities(os_type: str, vulns: Dict[model.NodeID, model.VulnerabilityInfo]) -> List[model.PropertyName]:248"""249get_properties_from_vulnerabilities function.250This function takes a string for os_type and returns a list of PropertyName objects251"""252ret_val: Set[model.PropertyName] = set()253properties: List[model.PropertyName] = []254255if os_type == "Linux":256properties = potential_linux_node_states257elif os_type == "Windows":258properties = potential_windows_node_states259260for prop in properties:261for vuln_id, vuln in vulns.items():262if re.search(prop, str(vuln.precondition.expression)):263ret_val.add(prop)264265return list(ret_val)266267268def create_firewall_rules(end_points: List[model.PortName]) -> model.FirewallConfiguration:269"""270This function takes a List of endpoints and returns a FirewallConfiguration271272It iterates through the list of potential ports and if they're in the list passed273to the function it adds a firewall rule allowing that port.274Otherwise it adds a rule blocking that port.275"""276277ret_val: model.FirewallConfiguration = model.FirewallConfiguration()278ret_val.incoming.clear()279ret_val.outgoing.clear()280for protocol in potential_ports:281if protocol in end_points:282ret_val.incoming.append(model.FirewallRule(protocol, model.RulePermission.ALLOW))283ret_val.outgoing.append(model.FirewallRule(protocol, model.RulePermission.ALLOW))284else:285ret_val.incoming.append(model.FirewallRule(protocol, model.RulePermission.BLOCK))286ret_val.outgoing.append(model.FirewallRule(protocol, model.RulePermission.BLOCK))287288return ret_val289290291