Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/CyberBattleSim
Path: blob/main/cyberbattle/samples/chainpattern/chainpattern.py
597 views
1
# Copyright (c) Microsoft Corporation.
2
# Licensed under the MIT License.
3
4
"""Defines a set of networks following a speficic pattern
5
learnable from the properties associated with the nodes.
6
7
The network pattern is:
8
Start ---> (Linux ---> Windows ---> ... Linux ---> Windows)* ---> Linux[Flag]
9
10
The network is parameterized by the length of the central Linux-Windows chain.
11
The start node leaks the credentials to connect to all other nodes:
12
13
For each `XXX ---> Windows` section, the XXX node has:
14
- a local vulnerability exposing the RDP password to the Windows machine
15
- a bunch of other trap vulnerabilities (high cost with no outcome)
16
For each `XXX ---> Linux` section,
17
- the Windows node has a local vulnerability exposing the SSH password to the Linux machine
18
- a bunch of other trap vulnerabilities (high cost with no outcome)
19
20
The chain is terminated by one node with a flag (reward).
21
22
A Node-Property matrix would be three-valued (0,1,?) and look like this:
23
24
===== Initial state
25
Properties
26
Nodes L W SQL
27
1 1 0 0
28
2 ? ? ?
29
3 ? ? ?
30
...
31
10
32
======= After discovering node 2
33
Properties
34
Nodes L W SQL
35
1 1 0 0
36
2 0 1 1
37
3 ? ? ?
38
...
39
10
40
===========================
41
42
"""
43
44
from cyberbattle.simulation.model import Identifiers, NodeID, NodeInfo
45
from ...simulation import model as m
46
from typing import Dict
47
48
DEFAULT_ALLOW_RULES = [
49
m.FirewallRule("RDP", m.RulePermission.ALLOW),
50
m.FirewallRule("SSH", m.RulePermission.ALLOW),
51
m.FirewallRule("HTTPS", m.RulePermission.ALLOW),
52
m.FirewallRule("HTTP", m.RulePermission.ALLOW),
53
]
54
55
# Environment constants used for all instances of the chain network
56
ENV_IDENTIFIERS = Identifiers(
57
properties=[
58
"Windows",
59
"Linux",
60
"ApacheWebSite",
61
"IIS_2019",
62
"IIS_2020_patched",
63
"MySql",
64
"Ubuntu",
65
"nginx/1.10.3",
66
"SMB_vuln",
67
"SMB_vuln_patched",
68
"SQLServer",
69
"Win10",
70
"Win10Patched",
71
"FLAG:Linux",
72
],
73
ports=["HTTPS", "GIT", "SSH", "RDP", "PING", "MySQL", "SSH-key", "su"],
74
local_vulnerabilities=["ScanBashHistory", "ScanExplorerRecentFiles", "SudoAttempt", "CrackKeepPassX", "CrackKeepPass"],
75
remote_vulnerabilities=["ProbeLinux", "ProbeWindows"],
76
)
77
78
79
def prefix(x: int, name: str):
80
"""Prefix node name with an instance"""
81
return f"{x}_{name}"
82
83
84
def rdp_password(index):
85
"""Generate RDP password for the specified chain link"""
86
return f"WindowsPassword!{index}"
87
88
89
def ssh_password(index):
90
"""Generate SSH password for the specified chain link"""
91
return f"LinuxPassword!{index}"
92
93
94
def create_network_chain_link(n: int) -> Dict[NodeID, NodeInfo]:
95
"""Instantiate one link of the network chain with associated index n"""
96
97
def current(name):
98
return prefix(n, name)
99
100
def next(name):
101
return prefix(n + 1, name)
102
103
return {
104
current("LinuxNode"): m.NodeInfo(
105
services=[m.ListeningService("HTTPS"), m.ListeningService("SSH", allowedCredentials=[ssh_password(n)])],
106
firewall=m.FirewallConfiguration(incoming=DEFAULT_ALLOW_RULES, outgoing=DEFAULT_ALLOW_RULES),
107
value=100,
108
properties=["MySql", "Ubuntu", "nginx/1.10.3"],
109
owned_string="Intermediate chain node owned, no intrinsic value",
110
vulnerabilities=dict(
111
ProbeLinux=m.VulnerabilityInfo(
112
description="Probe to check if the node runs Linux",
113
type=m.VulnerabilityType.REMOTE,
114
outcome=m.ProbeSucceeded(["Ubuntu"]),
115
reward_string="Remote machine is running Linux",
116
cost=5.0,
117
),
118
ProbeWindows=m.VulnerabilityInfo(
119
description="Probe to check if the node runs Windows", type=m.VulnerabilityType.REMOTE, outcome=m.ProbeFailed(), reward_string="Remote machine is not running Windows", cost=5.0
120
),
121
ScanBashHistory=m.VulnerabilityInfo(
122
description="Scan bash history for possible references to other machines",
123
type=m.VulnerabilityType.LOCAL,
124
outcome=m.LeakedNodesId([next("WindowsNode")]),
125
reward_string="Found a reference to a remote Windows node in bash history",
126
cost=1.0,
127
),
128
ScanExplorerRecentFiles=m.VulnerabilityInfo(
129
description="Scan Windows Explorer recent files for possible references to other machines",
130
type=m.VulnerabilityType.LOCAL,
131
outcome=m.ExploitFailed(),
132
reward_string="Trap: feature not supported on Linux",
133
cost=10.0,
134
),
135
SudoAttempt=m.VulnerabilityInfo(
136
description="Attempt to sudo into admin user", type=m.VulnerabilityType.LOCAL, outcome=m.ExploitFailed(), reward_string="Trap: suspicious attempt to run sudo", cost=100.0
137
),
138
CrackKeepPassX=m.VulnerabilityInfo(
139
description="Attempt to crack KeepPassX and look for credentials",
140
type=m.VulnerabilityType.LOCAL,
141
outcome=m.LeakedCredentials(credentials=[m.CachedCredential(node=next("WindowsNode"), port="RDP", credential=rdp_password(n + 1))]),
142
reward_string=f"Discovered password to Windows machine {n+1}",
143
cost=1.0,
144
),
145
),
146
),
147
next("WindowsNode"): m.NodeInfo(
148
services=[m.ListeningService("HTTPS"), m.ListeningService("RDP", allowedCredentials=[rdp_password(n + 1)])],
149
value=100,
150
properties=["Windows", "Win10", "Win10Patched"],
151
vulnerabilities=dict(
152
ProbeLinux=m.VulnerabilityInfo(
153
description="Probe to check if the node runs Linux", type=m.VulnerabilityType.REMOTE, outcome=m.ProbeFailed(), reward_string="Remote machine is not running Linux", cost=1.0
154
),
155
ProbeWindows=m.VulnerabilityInfo(
156
description="Probe to check if the node runs Windows",
157
type=m.VulnerabilityType.REMOTE,
158
outcome=m.ProbeSucceeded(["Windows"]),
159
reward_string="Remote machine is running Windows",
160
cost=1.0,
161
),
162
ScanBashHistory=m.VulnerabilityInfo(
163
description="Scan bash history for possible references to other machines",
164
type=m.VulnerabilityType.LOCAL,
165
outcome=m.ExploitFailed(),
166
reward_string="Trap: feature not supported on Windows!",
167
cost=100.0,
168
),
169
ScanExplorerRecentFiles=m.VulnerabilityInfo(
170
description="Scan Windows Explorer recent files for possible references to other machines",
171
type=m.VulnerabilityType.LOCAL,
172
outcome=m.LeakedNodesId([prefix(n + 2, "LinuxNode")]),
173
reward_string="Found a reference to a remote Linux node in bash history",
174
cost=1.0,
175
),
176
SudoAttempt=m.VulnerabilityInfo(
177
description="Attempt to sudo into admin user", type=m.VulnerabilityType.LOCAL, outcome=m.ExploitFailed(), reward_string="Trap: feature not supported on Windows!", cost=100.0
178
),
179
CrackKeepPassX=m.VulnerabilityInfo(
180
description="Attempt to crack KeepPassX and look for credentials",
181
type=m.VulnerabilityType.LOCAL,
182
outcome=m.ExploitFailed(),
183
reward_string="Trap: feature not supported on Windows!",
184
cost=100.0,
185
),
186
CrackKeepPass=m.VulnerabilityInfo(
187
description="Attempt to crack KeepPass and look for credentials",
188
type=m.VulnerabilityType.LOCAL,
189
outcome=m.LeakedCredentials(credentials=[m.CachedCredential(node=prefix(n + 2, "LinuxNode"), port="SSH", credential=ssh_password(n + 2))]),
190
reward_string=f"Discovered password to Linux machine {n+2}",
191
cost=1.0,
192
),
193
),
194
),
195
}
196
197
198
def create_chain_network(size: int) -> Dict[NodeID, NodeInfo]:
199
"""Create a chain network with the chain section of specified size.
200
Size must be an even number
201
The number of nodes in the network is `size + 2` to account for the start node (0)
202
and final node (size + 1).
203
"""
204
205
if size % 2 == 1:
206
raise ValueError(f"Chain size must be even: {size}")
207
208
final_node_index = size + 1
209
210
nodes = {
211
"start": m.NodeInfo(
212
services=[],
213
value=0,
214
vulnerabilities=dict(
215
ScanExplorerRecentFiles=m.VulnerabilityInfo(
216
description="Scan Windows Explorer recent files for possible references to other machines",
217
type=m.VulnerabilityType.LOCAL,
218
outcome=m.LeakedCredentials(credentials=[m.CachedCredential(node=prefix(1, "LinuxNode"), port="SSH", credential=ssh_password(1))]),
219
reward_string="Found a reference to a remote Linux node in bash history",
220
cost=1.0,
221
)
222
),
223
agent_installed=True,
224
reimagable=False,
225
),
226
prefix(final_node_index, "LinuxNode"): m.NodeInfo(
227
services=[m.ListeningService("HTTPS"), m.ListeningService("SSH", allowedCredentials=[ssh_password(final_node_index)])],
228
value=1000,
229
owned_string="FLAG: flag discovered!",
230
properties=["MySql", "Ubuntu", "nginx/1.10.3", "FLAG:Linux"],
231
vulnerabilities=dict(),
232
),
233
}
234
235
# Add chain links
236
for i in range(1, size, 2):
237
nodes.update(create_network_chain_link(i))
238
239
return nodes
240
241
242
def new_environment(size) -> m.Environment:
243
return m.Environment(network=m.create_network(create_chain_network(size)), vulnerability_library=dict([]), identifiers=ENV_IDENTIFIERS)
244
245