Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/CyberBattleSim
Path: blob/main/cyberbattle/_env/defender.py
597 views
1
# Copyright (c) Microsoft Corporation.
2
# Licensed under the MIT License.
3
4
"""
5
Defines stock defender agents for the CyberBattle simulation.
6
"""
7
8
import random
9
import numpy
10
from abc import abstractmethod
11
from cyberbattle.simulation.model import Environment
12
from cyberbattle.simulation.actions import DefenderAgentActions
13
from ..simulation import model
14
15
import logging
16
17
18
class DefenderAgent:
19
"""Define the step function for a defender agent.
20
Gets called after each step executed by the attacker agent."""
21
22
@abstractmethod
23
def step(self, environment: Environment, actions: DefenderAgentActions, t: int):
24
return None
25
26
27
class ScanAndReimageCompromisedMachines(DefenderAgent):
28
"""A defender agent that scans a subset of network nodes
29
detects presence of an attacker on a given node with
30
some fixed probability and if detected re-image the compromised node.
31
32
probability -- probability that an attacker agent is detected when scanned given that the attacker agent is present
33
scan_capacity -- maxium number of machine that a defender agent can scan in one simulation step
34
scan_frequency -- frequencey of the scan in simulation steps
35
"""
36
37
def __init__(self, probability: float, scan_capacity: int, scan_frequency: int):
38
self.probability = probability
39
self.scan_capacity = scan_capacity
40
self.scan_frequency = scan_frequency
41
42
def step(self, environment: Environment, actions: DefenderAgentActions, t: int):
43
if t % self.scan_frequency == 0:
44
# scan nodes at random
45
scanned_nodes = random.choices(list(environment.network.nodes), k=self.scan_capacity)
46
for node_id in scanned_nodes:
47
node_info = environment.get_node(node_id)
48
if node_info.status == model.MachineStatus.Running and node_info.agent_installed:
49
is_malware_detected = numpy.random.random() <= self.probability
50
if is_malware_detected:
51
if node_info.reimagable:
52
logging.info(f"Defender detected malware, reimaging node {node_id}")
53
actions.reimage_node(node_id)
54
else:
55
logging.info(f"Defender detected malware, but node cannot be reimaged {node_id}")
56
57
58
class ExternalRandomEvents(DefenderAgent):
59
"""A 'defender' that randomly alters network node configuration"""
60
61
def step(self, environment: Environment, actions: DefenderAgentActions, t: int):
62
self.patch_vulnerabilities_at_random(environment)
63
self.stop_service_at_random(environment, actions)
64
self.plant_vulnerabilities_at_random(environment)
65
self.firewall_change_remove(environment)
66
self.firewall_change_add(environment)
67
68
def patch_vulnerabilities_at_random(self, environment: Environment, probability: float = 0.1) -> None:
69
# Iterate through every node.
70
for node_id, node_data in environment.nodes():
71
# Have a boolean remove_vulnerability decide if we will remove one.
72
remove_vulnerability = numpy.random.random() <= probability
73
if remove_vulnerability and len(node_data.vulnerabilities) > 0:
74
choice = random.choice(list(node_data.vulnerabilities))
75
node_data.vulnerabilities.pop(choice)
76
77
def stop_service_at_random(self, environment: Environment, actions: DefenderAgentActions, probability: float = 0.1) -> None:
78
for node_id, node_data in environment.nodes():
79
remove_service = numpy.random.random() <= probability
80
if remove_service and len(node_data.services) > 0:
81
service = random.choice(node_data.services)
82
actions.stop_service(node_id, service.name)
83
84
def plant_vulnerabilities_at_random(self, environment: Environment, probability: float = 0.1) -> None:
85
for node_id, node_data in environment.nodes():
86
add_vulnerability = numpy.random.random() <= probability
87
# See all differences between current node vulnerabilities and global ones.
88
new_vulnerabilities = numpy.setdiff1d(list(environment.vulnerability_library.keys()), list(node_data.vulnerabilities.keys()))
89
# If we have decided that we will add a vulnerability and there are new vulnerabilities not already
90
# on the node, then add them.
91
if add_vulnerability and len(new_vulnerabilities) > 0:
92
new_vulnerability = random.choice(new_vulnerabilities)
93
node_data.vulnerabilities[new_vulnerability] = environment.vulnerability_library[new_vulnerability]
94
95
"""
96
TODO: Not sure how to access global (environment) services.
97
def serviceChangeAdd(self, probability: float) -> None:
98
# Iterate through every node.
99
for node_id, node_data in self.__environment.nodes():
100
# Have a boolean addService decide if we will add one.
101
addService = numpy.random.random() <= probability
102
# List all new services we can add.
103
newServices = numpy.setdiff1d(self.__environment.services, node_data.services)
104
# If we have decided to add a service and there are new services to add, go ahead and add them.
105
if addService and len(newServices) > 0:
106
newService = random.choice(newServices)
107
node_data.services.append(newService)
108
return None
109
"""
110
111
def firewall_change_remove(self, environment: Environment, probability: float = 0.1) -> None:
112
# Iterate through every node.
113
for node_id, node_data in environment.nodes():
114
# Have a boolean remove_rule decide if we will remove one.
115
remove_rule = numpy.random.random() <= probability
116
# The following logic sees if there are both incoming and outgoing rules.
117
# If there are, we remove one randomly.
118
if remove_rule and len(node_data.firewall.outgoing) > 0 and len(node_data.firewall.incoming) > 0:
119
incoming = numpy.random.random() <= 0.5
120
if incoming:
121
rule_to_remove = random.choice(node_data.firewall.incoming)
122
node_data.firewall.incoming.remove(rule_to_remove)
123
else:
124
rule_to_remove = random.choice(node_data.firewall.outgoing)
125
node_data.firewall.outgoing.remove(rule_to_remove)
126
# If there are only outgoing rules, we remove one random outgoing rule.
127
elif remove_rule and len(node_data.firewall.outgoing) > 0:
128
rule_to_remove = random.choice(node_data.firewall.outgoing)
129
node_data.firewall.outgoing.remove(rule_to_remove)
130
# If there are only incoming rules, we remove one random incoming rule.
131
elif remove_rule and len(node_data.firewall.incoming) > 0:
132
rule_to_remove = random.choice(node_data.firewall.incoming)
133
node_data.firewall.incoming.remove(rule_to_remove)
134
135
def firewall_change_add(self, environment: Environment, probability: float = 0.1) -> None:
136
# Iterate through every node.
137
for node_id, node_data in environment.nodes():
138
# Have a boolean rule_to_add decide if we will add one.
139
add_rule = numpy.random.random() <= probability
140
if add_rule:
141
# 0 For allow, 1 for block.
142
rule_to_add = model.FirewallRule(port=random.choice(model.SAMPLE_IDENTIFIERS.ports), permission=model.RulePermission.ALLOW)
143
# Randomly decide if we will add an incoming or outgoing rule.
144
incoming = numpy.random.random() <= 0.5
145
if incoming and rule_to_add not in node_data.firewall.incoming:
146
node_data.firewall.incoming.append(rule_to_add)
147
elif not incoming and rule_to_add not in node_data.firewall.incoming:
148
node_data.firewall.outgoing.append(rule_to_add)
149
150