Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/CyberBattleSim
Path: blob/main/cyberbattle/simulation/environment_generation.py
597 views
1
# Copyright (c) Microsoft Corporation.
2
# Licensed under the MIT License.
3
4
"""
5
environment_generation.py this function generates a semi random environment for
6
the loonshot simulation v0.
7
"""
8
9
from typing import List, Dict, Set
10
import random
11
import re
12
import networkx as nx
13
from . import model
14
15
# These two lists are lists of potential vulnerabilities. They are split into linux vulnerabilities
16
# and Windows vulnerabilities so i can
17
18
ADMINTAG = model.AdminEscalation().tag
19
SYSTEMTAG = model.SystemEscalation().tag
20
21
potential_windows_vulns = {
22
"UACME43": model.VulnerabilityInfo(
23
description="UACME UAC bypass #43",
24
type=model.VulnerabilityType.LOCAL,
25
URL="https://github.com/hfiref0x/UACME",
26
precondition=model.Precondition(f"Windows&(Win10|Win7)&(~({ADMINTAG}|{SYSTEMTAG}))"),
27
outcome=model.AdminEscalation(),
28
rates=model.Rates(0, 0.2, 1.0),
29
),
30
"UACME45": model.VulnerabilityInfo(
31
description="UACME UAC bypass #45",
32
type=model.VulnerabilityType.LOCAL,
33
URL="https://github.com/hfiref0x/UACME",
34
precondition=model.Precondition(f"Windows&Win10&(~({ADMINTAG}|{SYSTEMTAG}))"),
35
outcome=model.AdminEscalation(),
36
rates=model.Rates(0, 0.2, 1.0),
37
),
38
"UACME52": model.VulnerabilityInfo(
39
description="UACME UAC bypass #52",
40
type=model.VulnerabilityType.LOCAL,
41
URL="https://github.com/hfiref0x/UACME",
42
precondition=model.Precondition(f"Windows&(Win10|Win7)&(~({ADMINTAG}|{SYSTEMTAG}))"),
43
outcome=model.AdminEscalation(),
44
rates=model.Rates(0, 0.2, 1.0),
45
),
46
"UACME55": model.VulnerabilityInfo(
47
description="UACME UAC bypass #55",
48
type=model.VulnerabilityType.LOCAL,
49
URL="https://github.com/hfiref0x/UACME",
50
precondition=model.Precondition(f"Windows&(Win10|Win7)&(~({ADMINTAG}|{SYSTEMTAG}))"),
51
outcome=model.AdminEscalation(),
52
rates=model.Rates(0, 0.2, 1.0),
53
),
54
"UACME61": model.VulnerabilityInfo(
55
description="UACME UAC bypass #61",
56
type=model.VulnerabilityType.LOCAL,
57
URL="https://github.com/hfiref0x/UACME",
58
precondition=model.Precondition(f"Windows&Win10&(~({ADMINTAG}|{SYSTEMTAG}))"),
59
outcome=model.AdminEscalation(),
60
rates=model.Rates(0, 0.2, 1.0),
61
),
62
"MimikatzLogonpasswords": model.VulnerabilityInfo(
63
description="Mimikatz sekurlsa::logonpasswords.",
64
type=model.VulnerabilityType.LOCAL,
65
URL="https://github.com/gentilkiwi/mimikatz",
66
precondition=model.Precondition(f"Windows&({ADMINTAG}|{SYSTEMTAG})"),
67
outcome=model.LeakedCredentials([]),
68
rates=model.Rates(0, 1.0, 1.0),
69
),
70
"MimikatzKerberosExport": model.VulnerabilityInfo(
71
description="Mimikatz Kerberos::list /export." "Exports .kirbi files to be used with pass the ticket",
72
type=model.VulnerabilityType.LOCAL,
73
URL="https://github.com/gentilkiwi/mimikatz",
74
precondition=model.Precondition(f"Windows&DomainJoined&({ADMINTAG}|{SYSTEMTAG})"),
75
outcome=model.LeakedCredentials([]),
76
rates=model.Rates(0, 1.0, 1.0),
77
),
78
"PassTheTicket": model.VulnerabilityInfo(
79
description="Mimikatz Kerberos::ptt /export." "Exports .kirbi files to be used with pass the ticket",
80
type=model.VulnerabilityType.REMOTE,
81
URL="https://github.com/gentilkiwi/mimikatz",
82
precondition=model.Precondition(f"Windows&DomainJoined&KerberosTicketsDumped" f"&({ADMINTAG}|{SYSTEMTAG})"),
83
outcome=model.LeakedCredentials([]),
84
rates=model.Rates(0, 1.0, 1.0),
85
),
86
"RDPBF": model.VulnerabilityInfo(
87
description="RDP Brute Force",
88
type=model.VulnerabilityType.REMOTE,
89
URL="https://attack.mitre.org/techniques/T1110/",
90
precondition=model.Precondition("Windows&PortRDPOpen"),
91
outcome=model.LateralMove(),
92
rates=model.Rates(0, 0.2, 1.0),
93
),
94
"SMBBF": model.VulnerabilityInfo(
95
description="SSH Brute Force",
96
type=model.VulnerabilityType.REMOTE,
97
URL="https://attack.mitre.org/techniques/T1110/",
98
precondition=model.Precondition("(Windows|Linux)&PortSMBOpen"),
99
outcome=model.LateralMove(),
100
rates=model.Rates(0, 0.2, 1.0),
101
),
102
}
103
104
potential_linux_vulns = {
105
"SudoCaching": model.VulnerabilityInfo(
106
description="Escalating privileges from poorly configured sudo on linux/unix machines",
107
type=model.VulnerabilityType.REMOTE,
108
URL="https://attack.mitre.org/techniques/T1206/",
109
precondition=model.Precondition(f"Linux&(~{ADMINTAG})"),
110
outcome=model.AdminEscalation(),
111
rates=model.Rates(0, 1.0, 1.0),
112
),
113
"SSHBF": model.VulnerabilityInfo(
114
description="SSH Brute Force",
115
type=model.VulnerabilityType.REMOTE,
116
URL="https://attack.mitre.org/techniques/T1110/",
117
precondition=model.Precondition("Linux&PortSSHOpen"),
118
outcome=model.LateralMove(),
119
rates=model.Rates(0, 0.2, 1.0),
120
),
121
"SMBBF": model.VulnerabilityInfo(
122
description="SSH Brute Force",
123
type=model.VulnerabilityType.REMOTE,
124
URL="https://attack.mitre.org/techniques/T1110/",
125
precondition=model.Precondition("(Windows|Linux)&PortSMBOpen"),
126
outcome=model.LateralMove(),
127
rates=model.Rates(0, 0.2, 1.0),
128
),
129
}
130
131
# These are potential endpoints that can be open in a game. Note to add any more endpoints simply
132
# add the protocol name to this list.
133
# further note that ports are stored in a tuple. This is because some protoocls
134
# (like SMB) have multiple official ports.
135
potential_ports: List[model.PortName] = ["RDP", "SSH", "HTTP", "HTTPs", "SMB", "SQL", "FTP", "WMI"]
136
137
# These two lists are potential node states. They are split into linux states and windows
138
# states so that we can generate real graphs that aren't just totally random.
139
potential_linux_node_states: List[model.PropertyName] = ["Linux", ADMINTAG, "PortRDPOpen", "PortHTTPOpen", "PortHTTPsOpen", "PortSSHOpen", "PortSMBOpen", "PortFTPOpen", "DomainJoined"]
140
potential_windows_node_states: List[model.PropertyName] = [
141
"Windows",
142
"Win10",
143
"PortRDPOpen",
144
"PortHTTPOpen",
145
"PortHTTPsOpen",
146
"PortSSHOpen",
147
"PortSMBOpen",
148
"PortFTPOpen",
149
"BITSEnabled",
150
"Win7",
151
"DomainJoined",
152
]
153
154
ENV_IDENTIFIERS = model.Identifiers(
155
ports=potential_ports,
156
properties=potential_linux_node_states + potential_windows_node_states,
157
local_vulnerabilities=list(potential_windows_vulns.keys()),
158
remote_vulnerabilities=list(potential_windows_vulns.keys()),
159
)
160
161
162
def create_random_environment(name: str, size: int) -> model.Environment:
163
"""
164
This is the create random environment function. It takes a string for the name
165
of the environment and an int for the size. It returns a randomly genernated
166
environment.
167
168
Note this does not currently support generating credentials.
169
"""
170
if not name:
171
raise ValueError("Please supply a non empty string for the name")
172
173
if size < 1:
174
raise ValueError("Please supply a positive non zero positive" "integer for the size of the environment")
175
graph = nx.DiGraph()
176
nodes: Dict[str, model.NodeInfo] = {}
177
178
# append the linux and windows vulnerability dictionaries
179
local_vuln_lib: Dict[model.VulnerabilityID, model.VulnerabilityInfo] = {**potential_windows_vulns, **potential_linux_vulns}
180
181
os_types: List[str] = ["Linux", "Windows"]
182
for i in range(size):
183
rand_os: str = os_types[random.randint(0, 1)]
184
nodes[str(i)] = create_random_node(rand_os, potential_ports)
185
186
nodes["0"].agent_installed = True
187
188
graph.add_nodes_from([(k, {"data": v}) for (k, v) in list(nodes.items())])
189
190
return model.Environment(network=graph, vulnerability_library=local_vuln_lib, identifiers=ENV_IDENTIFIERS)
191
192
193
def create_random_node(os_type: str, end_points: List[model.PortName]) -> model.NodeInfo:
194
"""
195
This is the create random node function.
196
Currently it takes a string for the OS type and returns a NodeInfo object
197
Options for OS type are currently Linux or Windows,
198
Options for the role are Server or Workstation
199
"""
200
201
if not end_points:
202
raise ValueError("No endpoints supplied")
203
204
if os_type not in ("Windows", "Linux"):
205
raise ValueError("Unsupported OS Type please enter Linux or Windows")
206
207
# get the vulnerability dictionary for the important OS
208
vulnerabilities: model.VulnerabilityLibrary = dict([])
209
if os_type == "Linux":
210
vulnerabilities = select_random_vulnerabilities(os_type, random.randint(1, len(potential_linux_vulns)))
211
else:
212
vulnerabilities = select_random_vulnerabilities(os_type, random.randint(1, len(potential_windows_vulns)))
213
214
firewall: model.FirewallConfiguration = create_firewall_rules(end_points)
215
properties: List[model.PropertyName] = get_properties_from_vulnerabilities(os_type, vulnerabilities)
216
217
return model.NodeInfo(
218
services=[model.ListeningService(name=p) for p in end_points], vulnerabilities=vulnerabilities, value=int(random.random()), properties=properties, firewall=firewall, agent_installed=False
219
)
220
221
222
def select_random_vulnerabilities(os_type: str, num_vulns: int) -> Dict[str, model.VulnerabilityInfo]:
223
"""
224
It takes an a string for the OS type, and an int for the number of
225
vulnerabilities to select.
226
227
It selects num_vulns vulnerabilities from the global list of vulnerabilities for that
228
specific operating system. It returns a dictionary of VulnerabilityInfo objects to
229
the caller.
230
"""
231
232
if num_vulns < 1:
233
raise ValueError("Expected a positive value for num_vulns in select_random_vulnerabilities")
234
235
ret_val: Dict[str, model.VulnerabilityInfo] = {}
236
keys: List[str]
237
if os_type == "Linux":
238
keys = random.sample(list(potential_linux_vulns.keys()), num_vulns)
239
ret_val = {k: potential_linux_vulns[k] for k in keys}
240
elif os_type == "Windows":
241
keys = random.sample(list(potential_windows_vulns.keys()), num_vulns)
242
ret_val = {k: potential_windows_vulns[k] for k in keys}
243
else:
244
raise ValueError("Invalid Operating System supplied to select_random_vulnerabilities")
245
return ret_val
246
247
248
def get_properties_from_vulnerabilities(os_type: str, vulns: Dict[model.NodeID, model.VulnerabilityInfo]) -> List[model.PropertyName]:
249
"""
250
get_properties_from_vulnerabilities function.
251
This function takes a string for os_type and returns a list of PropertyName objects
252
"""
253
ret_val: Set[model.PropertyName] = set()
254
properties: List[model.PropertyName] = []
255
256
if os_type == "Linux":
257
properties = potential_linux_node_states
258
elif os_type == "Windows":
259
properties = potential_windows_node_states
260
261
for prop in properties:
262
for vuln_id, vuln in vulns.items():
263
if re.search(prop, str(vuln.precondition.expression)):
264
ret_val.add(prop)
265
266
return list(ret_val)
267
268
269
def create_firewall_rules(end_points: List[model.PortName]) -> model.FirewallConfiguration:
270
"""
271
This function takes a List of endpoints and returns a FirewallConfiguration
272
273
It iterates through the list of potential ports and if they're in the list passed
274
to the function it adds a firewall rule allowing that port.
275
Otherwise it adds a rule blocking that port.
276
"""
277
278
ret_val: model.FirewallConfiguration = model.FirewallConfiguration()
279
ret_val.incoming.clear()
280
ret_val.outgoing.clear()
281
for protocol in potential_ports:
282
if protocol in end_points:
283
ret_val.incoming.append(model.FirewallRule(protocol, model.RulePermission.ALLOW))
284
ret_val.outgoing.append(model.FirewallRule(protocol, model.RulePermission.ALLOW))
285
else:
286
ret_val.incoming.append(model.FirewallRule(protocol, model.RulePermission.BLOCK))
287
ret_val.outgoing.append(model.FirewallRule(protocol, model.RulePermission.BLOCK))
288
289
return ret_val
290
291