Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/CyberBattleSim
Path: blob/main/cyberbattle/samples/active_directory/tiny_ad.py
597 views
1
from cyberbattle.simulation.model import FirewallConfiguration, FirewallRule, RulePermission
2
from cyberbattle.simulation import model as m
3
from typing import Dict
4
5
6
firewall_conf = FirewallConfiguration(
7
[FirewallRule("SMB", RulePermission.ALLOW), FirewallRule("AD", RulePermission.ALLOW), FirewallRule("SHELL", RulePermission.ALLOW)],
8
[FirewallRule("SMB", RulePermission.ALLOW), FirewallRule("AD", RulePermission.ALLOW), FirewallRule("SHELL", RulePermission.ALLOW)],
9
)
10
11
12
def default_vulnerabilities() -> m.VulnerabilityLibrary:
13
lib = {}
14
lib["FindDomainControllers"] = m.VulnerabilityInfo(
15
description="Search for valid domain controllers in the current machines environment.",
16
type=m.VulnerabilityType.LOCAL,
17
outcome=m.LeakedNodesId(nodes=["domain_controller_1"]),
18
reward_string="Found domain controllers",
19
)
20
lib["EnumerateFileShares"] = m.VulnerabilityInfo(
21
description="Find all SMB shares this machine knows about.", type=m.VulnerabilityType.LOCAL, outcome=m.ExploitFailed(), reward_string="Found shares"
22
)
23
lib["ProbeAdmin"] = m.VulnerabilityInfo(
24
description="Probe a workstation to see if you have admin creds on it", type=m.VulnerabilityType.LOCAL, outcome=m.ProbeFailed(), reward_string="No admin creds."
25
)
26
lib["ScanForCreds"] = m.VulnerabilityInfo(
27
description="Scan the local security managers for credentials. Need to be admin on the box.",
28
type=m.VulnerabilityType.LOCAL,
29
outcome=m.LeakedCredentials(credentials=[m.CachedCredential(node="domain_controller_1", port="AD", credential="dc_1")]),
30
precondition=m.Precondition("admin"),
31
reward_string="DA credentials found",
32
)
33
return lib
34
35
36
def breach_vulnerabilities(lib: m.VulnerabilityLibrary) -> m.VulnerabilityLibrary:
37
lib["AuthorizationSpoofAndCrack"] = m.VulnerabilityInfo(
38
description="Spoof an authoritative source on the network to get a crackable hash, then try to crack it",
39
type=m.VulnerabilityType.LOCAL,
40
outcome=m.LeakedCredentials(credentials=[m.CachedCredential(node="workstation_1", port="SHELL", credential="user_1")]),
41
)
42
return lib
43
44
45
def admin_vulnerabilities(lib: m.VulnerabilityLibrary) -> m.VulnerabilityLibrary:
46
lib["ProbeAdmin"] = m.VulnerabilityInfo(
47
description="Probe a workstation to see if you have admin creds on it",
48
type=m.VulnerabilityType.LOCAL,
49
outcome=m.ProbeSucceeded(discovered_properties=["admin"]),
50
reward_string="Admin creds verified.",
51
)
52
return lib
53
54
55
def dc_vulnerabilities(lib: m.VulnerabilityLibrary) -> m.VulnerabilityLibrary:
56
lib["DumpNTDS"] = m.VulnerabilityInfo(
57
description="Dump the NTDS file from AD",
58
type=m.VulnerabilityType.LOCAL,
59
outcome=m.LeakedCredentials(credentials=[m.CachedCredential(node=f"workstation_{wid}", port="SHELL", credential=f"user_{uid}") for wid in range(0, 1) for uid in range(0, 20)]),
60
precondition=m.Precondition("domain_controller"),
61
reward_string="Dumped all user hashes. Get crackin'",
62
)
63
return lib
64
65
66
nodes = {
67
"domain_controller_1": m.NodeInfo(
68
services=[m.ListeningService(name="AD", allowedCredentials=["dc_1"])],
69
properties=["domain_controller"],
70
value=100,
71
firewall=firewall_conf,
72
vulnerabilities=dc_vulnerabilities(default_vulnerabilities()),
73
),
74
"workstation_0": m.NodeInfo(
75
services=[m.ListeningService(name="SHELL", allowedCredentials=[f"user_{uid}" for uid in range(0, 20)])],
76
value=0,
77
properties=["breach_node"],
78
vulnerabilities=breach_vulnerabilities(default_vulnerabilities()),
79
agent_installed=True,
80
firewall=firewall_conf,
81
reimagable=False,
82
),
83
"workstation_1": m.NodeInfo(
84
services=[m.ListeningService(name="SHELL", allowedCredentials=[f"user_{uid}" for uid in range(0, 20)])],
85
properties=["admin"],
86
value=1,
87
firewall=firewall_conf,
88
vulnerabilities=admin_vulnerabilities(default_vulnerabilities()),
89
),
90
}
91
92
global_vulnerability_library: Dict[m.VulnerabilityID, m.VulnerabilityInfo] = dict([])
93
94
# Environment constants
95
ENV_IDENTIFIERS = m.Identifiers(
96
properties=[
97
"breach_node",
98
"domain_controller",
99
"admin", # whether or not the users of this machine are admins
100
],
101
ports=["SMB", "AD", "SHELL"],
102
local_vulnerabilities=["FindDomainControllers", "EnumerateFileShares", "AuthorizationSpoofAndCrack", "ScanForCreds", "DumpNTDS", "ProbeAdmin"],
103
remote_vulnerabilities=["PasswordSpray"],
104
)
105
106
107
def new_environment() -> m.Environment:
108
return m.Environment(network=m.create_network(nodes), vulnerability_library=global_vulnerability_library, identifiers=ENV_IDENTIFIERS)
109
110