Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/CyberBattleSim
Path: blob/main/cyberbattle/samples/active_directory/generate_ad.py
597 views
1
"""Generating random active directory networks"""
2
3
import random
4
from typing import Any
5
from cyberbattle.simulation.model import FirewallConfiguration, FirewallRule, Identifiers, RulePermission
6
from cyberbattle.simulation import model as m
7
import networkx as nx
8
9
ENV_IDENTIFIERS = Identifiers(
10
properties=[
11
"breach_node",
12
"domain_controller",
13
"admin", # whether or not the users of this machine are admins
14
],
15
ports=["SMB", "AD", "SHELL"],
16
local_vulnerabilities=["FindDomainControllers", "EnumerateFileShares", "AuthorizationSpoofAndCrack", "ScanForCreds", "DumpNTDS", "ProbeAdmin"],
17
remote_vulnerabilities=["PasswordSpray"],
18
)
19
20
21
def create_network_from_smb_traffic(n_clients: int, n_servers: int, n_users: int) -> nx.DiGraph:
22
graph = nx.DiGraph()
23
graph.add_nodes_from([f"workstation_{i}" for i in range(0, n_clients)])
24
graph.add_nodes_from([f"share_{i}" for i in range(0, n_servers)])
25
graph.add_node("domain_controller_1")
26
# NOTE: The following is not needed. The AgentActions code will annotate our network based on dynamic discovery
27
# graph.add_edges_from([(f"share_{i}", f"workstation_{j}") for i in range(0, n_servers) for j in range(0, n_clients)], protocol="SMB")
28
# graph.add_edges_from([("domain_controller_1", f"workstation_{i}") for i in range(0, n_clients)], protocol="AD")
29
# graph.add_edges_from([("domain_controller_1", f"share_{i}") for i in range(0, n_servers)], protocol="AD")
30
31
firewall_conf = FirewallConfiguration(
32
[FirewallRule("SMB", RulePermission.ALLOW), FirewallRule("AD", RulePermission.ALLOW), FirewallRule("SHELL", RulePermission.ALLOW)],
33
[FirewallRule("SMB", RulePermission.ALLOW), FirewallRule("AD", RulePermission.ALLOW), FirewallRule("SHELL", RulePermission.ALLOW)],
34
)
35
36
def default_vulnerabilities() -> m.VulnerabilityLibrary:
37
lib = {}
38
lib["FindDomainControllers"] = m.VulnerabilityInfo(
39
description="Search for valid domain controllers in the current machines environment.",
40
type=m.VulnerabilityType.LOCAL,
41
outcome=m.LeakedNodesId(nodes=["domain_controller_1"]),
42
reward_string="Found domain controllers",
43
)
44
lib["EnumerateFileShares"] = m.VulnerabilityInfo(
45
description="Find all SMB shares this machine knows about.",
46
type=m.VulnerabilityType.LOCAL,
47
outcome=m.LeakedNodesId(nodes=[f"share_{i}" for i in range(0, n_servers)]),
48
reward_string="Found shares",
49
)
50
lib["ProbeAdmin"] = m.VulnerabilityInfo(
51
description="Probe a workstation to see if you have admin creds on it", type=m.VulnerabilityType.LOCAL, outcome=m.ProbeFailed(), reward_string="No admin creds."
52
)
53
lib["ScanForCreds"] = m.VulnerabilityInfo(
54
description="Scan the local security managers for credentials. Need to be admin on the box.",
55
type=m.VulnerabilityType.LOCAL,
56
outcome=m.LeakedCredentials(credentials=[m.CachedCredential(node="domain_controller_1", port="AD", credential="dc_1")]),
57
precondition=m.Precondition("admin"),
58
rates=m.Rates(successRate=0.9),
59
reward_string="DA credentials found",
60
)
61
return lib
62
63
def breach_vulnerabilities(lib: m.VulnerabilityLibrary) -> m.VulnerabilityLibrary:
64
# NOTE: The randomization here does not support random credential leakage... This is kind of a major limitation for tooling like responder
65
credrandset = set([random.randrange(0, n_users) for i in range(0, random.randrange(3, n_clients))])
66
lib["AuthorizationSpoofAndCrack"] = m.VulnerabilityInfo(
67
description="Spoof an authoritative source on the network to get a crackable hash, then try to crack it",
68
type=m.VulnerabilityType.LOCAL,
69
outcome=m.LeakedCredentials(
70
credentials=[m.CachedCredential(node=f"share_{shareid}", port="SMB", credential=f"user_{credind}") for credind in credrandset for shareid in range(0, n_servers)]
71
+ [m.CachedCredential(node=f"workstation_{credind % n_clients}", port="SHELL", credential=f"user_{credind}") for credind in credrandset]
72
),
73
)
74
return lib
75
76
def admin_vulnerabilities(lib: m.VulnerabilityLibrary) -> m.VulnerabilityLibrary:
77
lib["ProbeAdmin"] = m.VulnerabilityInfo(
78
description="Probe a workstation to see if you have admin creds on it",
79
type=m.VulnerabilityType.LOCAL,
80
outcome=m.ProbeSucceeded(discovered_properties=["admin"]),
81
reward_string="Admin creds verified.",
82
)
83
return lib
84
85
def dc_vulnerabilities(lib: m.VulnerabilityLibrary) -> m.VulnerabilityLibrary:
86
lib["DumpNTDS"] = m.VulnerabilityInfo(
87
description="Dump the NTDS file from AD",
88
type=m.VulnerabilityType.LOCAL,
89
outcome=m.LeakedCredentials(
90
[m.CachedCredential(node=f"share_{shareind}", port="SMB", credential=f"user_{credind}") for credind in range(0, n_users) for shareind in range(0, n_servers)]
91
+ [m.CachedCredential(node=f"workstation_{wkid}", port="SHELL", credential=f"user_{uid}") for wkid in range(0, n_clients) for uid in range(0, n_users)]
92
),
93
precondition=m.Precondition("domain_controller"),
94
reward_string="Dumped all user hashes. Get crackin'",
95
)
96
return lib
97
98
# Workstation 1 is our entry node
99
entry_node_id = "workstation_0"
100
graph.nodes[entry_node_id].clear()
101
graph.nodes[entry_node_id].update(
102
{
103
"data": m.NodeInfo(
104
services=[], value=0, properties=["breach_node"], vulnerabilities=breach_vulnerabilities(default_vulnerabilities()), agent_installed=True, firewall=firewall_conf, reimagable=False
105
)
106
}
107
)
108
109
for i in range(1, n_clients):
110
nodeid = f"workstation_{i}"
111
graph.nodes[nodeid].clear()
112
props = []
113
vulns = default_vulnerabilities()
114
if random.random() > 0.2: # TODO: make this value rarer as network size increases? Kinda wonky considering we only have one exploit path really
115
props = ["admin"]
116
vulns = admin_vulnerabilities(vulns)
117
graph.nodes[nodeid].update(
118
{
119
"data": m.NodeInfo(
120
services=[m.ListeningService(name="SHELL", allowedCredentials=[f"user_{uid}" for uid in range(0, n_users) if uid % n_clients == i])],
121
properties=props,
122
value=1,
123
firewall=firewall_conf,
124
vulnerabilities=vulns,
125
)
126
}
127
)
128
129
for i in range(0, n_servers):
130
nodeid = f"share_{i}"
131
graph.nodes[nodeid].clear()
132
graph.nodes[nodeid].update(
133
{
134
"data": m.NodeInfo(
135
services=[m.ListeningService(name="SMB", allowedCredentials=[f"user_{sid}" for sid in range(0, n_users) if sid % n_servers == i])],
136
properties=[],
137
value=5,
138
firewall=firewall_conf,
139
vulnerabilities=default_vulnerabilities(),
140
)
141
}
142
)
143
144
nodeid = "domain_controller_1"
145
graph.nodes[nodeid].clear()
146
graph.nodes[nodeid].update(
147
{
148
"data": m.NodeInfo(
149
services=[m.ListeningService(name="AD", allowedCredentials=["dc_1"])],
150
properties=["domain_controller"],
151
value=1000,
152
firewall=firewall_conf,
153
vulnerabilities=dc_vulnerabilities(default_vulnerabilities()),
154
)
155
}
156
)
157
158
return graph
159
160
161
def new_random_environment(seed: Any) -> m.Environment:
162
"""Create a new simulation environment based on
163
a randomly generated network topology for SMB shares.
164
"""
165
random.seed(seed)
166
clients = random.randrange(5, 10)
167
servers = random.randrange(1, 2)
168
users = random.randrange(20, 100)
169
network = create_network_from_smb_traffic(clients, servers, users)
170
171
return m.Environment(network=network, vulnerability_library=dict([]), identifiers=ENV_IDENTIFIERS)
172
173