Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
matterport
GitHub Repository: matterport/Mask_RCNN
Path: blob/master/mrcnn/parallel_model.py
239 views
1
"""
2
Mask R-CNN
3
Multi-GPU Support for Keras.
4
5
Copyright (c) 2017 Matterport, Inc.
6
Licensed under the MIT License (see LICENSE for details)
7
Written by Waleed Abdulla
8
9
Ideas and a small code snippets from these sources:
10
https://github.com/fchollet/keras/issues/2436
11
https://medium.com/@kuza55/transparent-multi-gpu-training-on-tensorflow-with-keras-8b0016fd9012
12
https://github.com/avolkov1/keras_experiments/blob/master/keras_exp/multigpu/
13
https://github.com/fchollet/keras/blob/master/keras/utils/training_utils.py
14
"""
15
16
import tensorflow as tf
17
import keras.backend as K
18
import keras.layers as KL
19
import keras.models as KM
20
21
22
class ParallelModel(KM.Model):
23
"""Subclasses the standard Keras Model and adds multi-GPU support.
24
It works by creating a copy of the model on each GPU. Then it slices
25
the inputs and sends a slice to each copy of the model, and then
26
merges the outputs together and applies the loss on the combined
27
outputs.
28
"""
29
30
def __init__(self, keras_model, gpu_count):
31
"""Class constructor.
32
keras_model: The Keras model to parallelize
33
gpu_count: Number of GPUs. Must be > 1
34
"""
35
self.inner_model = keras_model
36
self.gpu_count = gpu_count
37
merged_outputs = self.make_parallel()
38
super(ParallelModel, self).__init__(inputs=self.inner_model.inputs,
39
outputs=merged_outputs)
40
41
def __getattribute__(self, attrname):
42
"""Redirect loading and saving methods to the inner model. That's where
43
the weights are stored."""
44
if 'load' in attrname or 'save' in attrname:
45
return getattr(self.inner_model, attrname)
46
return super(ParallelModel, self).__getattribute__(attrname)
47
48
def summary(self, *args, **kwargs):
49
"""Override summary() to display summaries of both, the wrapper
50
and inner models."""
51
super(ParallelModel, self).summary(*args, **kwargs)
52
self.inner_model.summary(*args, **kwargs)
53
54
def make_parallel(self):
55
"""Creates a new wrapper model that consists of multiple replicas of
56
the original model placed on different GPUs.
57
"""
58
# Slice inputs. Slice inputs on the CPU to avoid sending a copy
59
# of the full inputs to all GPUs. Saves on bandwidth and memory.
60
input_slices = {name: tf.split(x, self.gpu_count)
61
for name, x in zip(self.inner_model.input_names,
62
self.inner_model.inputs)}
63
64
output_names = self.inner_model.output_names
65
outputs_all = []
66
for i in range(len(self.inner_model.outputs)):
67
outputs_all.append([])
68
69
# Run the model call() on each GPU to place the ops there
70
for i in range(self.gpu_count):
71
with tf.device('/gpu:%d' % i):
72
with tf.name_scope('tower_%d' % i):
73
# Run a slice of inputs through this replica
74
zipped_inputs = zip(self.inner_model.input_names,
75
self.inner_model.inputs)
76
inputs = [
77
KL.Lambda(lambda s: input_slices[name][i],
78
output_shape=lambda s: (None,) + s[1:])(tensor)
79
for name, tensor in zipped_inputs]
80
# Create the model replica and get the outputs
81
outputs = self.inner_model(inputs)
82
if not isinstance(outputs, list):
83
outputs = [outputs]
84
# Save the outputs for merging back together later
85
for l, o in enumerate(outputs):
86
outputs_all[l].append(o)
87
88
# Merge outputs on CPU
89
with tf.device('/cpu:0'):
90
merged = []
91
for outputs, name in zip(outputs_all, output_names):
92
# Concatenate or average outputs?
93
# Outputs usually have a batch dimension and we concatenate
94
# across it. If they don't, then the output is likely a loss
95
# or a metric value that gets averaged across the batch.
96
# Keras expects losses and metrics to be scalars.
97
if K.int_shape(outputs[0]) == ():
98
# Average
99
m = KL.Lambda(lambda o: tf.add_n(o) / len(outputs), name=name)(outputs)
100
else:
101
# Concatenate
102
m = KL.Concatenate(axis=0, name=name)(outputs)
103
merged.append(m)
104
return merged
105
106
107
if __name__ == "__main__":
108
# Testing code below. It creates a simple model to train on MNIST and
109
# tries to run it on 2 GPUs. It saves the graph so it can be viewed
110
# in TensorBoard. Run it as:
111
#
112
# python3 parallel_model.py
113
114
import os
115
import numpy as np
116
import keras.optimizers
117
from keras.datasets import mnist
118
from keras.preprocessing.image import ImageDataGenerator
119
120
GPU_COUNT = 2
121
122
# Root directory of the project
123
ROOT_DIR = os.path.abspath("../")
124
125
# Directory to save logs and trained model
126
MODEL_DIR = os.path.join(ROOT_DIR, "logs")
127
128
def build_model(x_train, num_classes):
129
# Reset default graph. Keras leaves old ops in the graph,
130
# which are ignored for execution but clutter graph
131
# visualization in TensorBoard.
132
tf.reset_default_graph()
133
134
inputs = KL.Input(shape=x_train.shape[1:], name="input_image")
135
x = KL.Conv2D(32, (3, 3), activation='relu', padding="same",
136
name="conv1")(inputs)
137
x = KL.Conv2D(64, (3, 3), activation='relu', padding="same",
138
name="conv2")(x)
139
x = KL.MaxPooling2D(pool_size=(2, 2), name="pool1")(x)
140
x = KL.Flatten(name="flat1")(x)
141
x = KL.Dense(128, activation='relu', name="dense1")(x)
142
x = KL.Dense(num_classes, activation='softmax', name="dense2")(x)
143
144
return KM.Model(inputs, x, "digit_classifier_model")
145
146
# Load MNIST Data
147
(x_train, y_train), (x_test, y_test) = mnist.load_data()
148
x_train = np.expand_dims(x_train, -1).astype('float32') / 255
149
x_test = np.expand_dims(x_test, -1).astype('float32') / 255
150
151
print('x_train shape:', x_train.shape)
152
print('x_test shape:', x_test.shape)
153
154
# Build data generator and model
155
datagen = ImageDataGenerator()
156
model = build_model(x_train, 10)
157
158
# Add multi-GPU support.
159
model = ParallelModel(model, GPU_COUNT)
160
161
optimizer = keras.optimizers.SGD(lr=0.01, momentum=0.9, clipnorm=5.0)
162
163
model.compile(loss='sparse_categorical_crossentropy',
164
optimizer=optimizer, metrics=['accuracy'])
165
166
model.summary()
167
168
# Train
169
model.fit_generator(
170
datagen.flow(x_train, y_train, batch_size=64),
171
steps_per_epoch=50, epochs=10, verbose=1,
172
validation_data=(x_test, y_test),
173
callbacks=[keras.callbacks.TensorBoard(log_dir=MODEL_DIR,
174
write_graph=True)]
175
)
176
177