Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/dlm/lockspace.c
29265 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/******************************************************************************
3
*******************************************************************************
4
**
5
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6
** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
7
**
8
**
9
*******************************************************************************
10
******************************************************************************/
11
12
#include <linux/module.h>
13
14
#include "dlm_internal.h"
15
#include "lockspace.h"
16
#include "member.h"
17
#include "recoverd.h"
18
#include "dir.h"
19
#include "midcomms.h"
20
#include "config.h"
21
#include "memory.h"
22
#include "lock.h"
23
#include "recover.h"
24
#include "requestqueue.h"
25
#include "user.h"
26
#include "ast.h"
27
28
static int ls_count;
29
static struct mutex ls_lock;
30
static struct list_head lslist;
31
static spinlock_t lslist_lock;
32
33
static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34
{
35
ssize_t ret = len;
36
int n;
37
int rc = kstrtoint(buf, 0, &n);
38
39
if (rc)
40
return rc;
41
ls = dlm_find_lockspace_local(ls);
42
if (!ls)
43
return -EINVAL;
44
45
switch (n) {
46
case 0:
47
dlm_ls_stop(ls);
48
break;
49
case 1:
50
dlm_ls_start(ls);
51
break;
52
default:
53
ret = -EINVAL;
54
}
55
dlm_put_lockspace(ls);
56
return ret;
57
}
58
59
static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60
{
61
int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62
63
if (rc)
64
return rc;
65
set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66
wake_up(&ls->ls_uevent_wait);
67
return len;
68
}
69
70
static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71
{
72
return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73
}
74
75
static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76
{
77
int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78
79
if (rc)
80
return rc;
81
return len;
82
}
83
84
static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85
{
86
return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87
}
88
89
static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90
{
91
int val;
92
int rc = kstrtoint(buf, 0, &val);
93
94
if (rc)
95
return rc;
96
if (val == 1)
97
set_bit(LSFL_NODIR, &ls->ls_flags);
98
return len;
99
}
100
101
static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102
{
103
uint32_t status = dlm_recover_status(ls);
104
return snprintf(buf, PAGE_SIZE, "%x\n", status);
105
}
106
107
static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108
{
109
return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110
}
111
112
struct dlm_attr {
113
struct attribute attr;
114
ssize_t (*show)(struct dlm_ls *, char *);
115
ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116
};
117
118
static struct dlm_attr dlm_attr_control = {
119
.attr = {.name = "control", .mode = S_IWUSR},
120
.store = dlm_control_store
121
};
122
123
static struct dlm_attr dlm_attr_event = {
124
.attr = {.name = "event_done", .mode = S_IWUSR},
125
.store = dlm_event_store
126
};
127
128
static struct dlm_attr dlm_attr_id = {
129
.attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130
.show = dlm_id_show,
131
.store = dlm_id_store
132
};
133
134
static struct dlm_attr dlm_attr_nodir = {
135
.attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136
.show = dlm_nodir_show,
137
.store = dlm_nodir_store
138
};
139
140
static struct dlm_attr dlm_attr_recover_status = {
141
.attr = {.name = "recover_status", .mode = S_IRUGO},
142
.show = dlm_recover_status_show
143
};
144
145
static struct dlm_attr dlm_attr_recover_nodeid = {
146
.attr = {.name = "recover_nodeid", .mode = S_IRUGO},
147
.show = dlm_recover_nodeid_show
148
};
149
150
static struct attribute *dlm_attrs[] = {
151
&dlm_attr_control.attr,
152
&dlm_attr_event.attr,
153
&dlm_attr_id.attr,
154
&dlm_attr_nodir.attr,
155
&dlm_attr_recover_status.attr,
156
&dlm_attr_recover_nodeid.attr,
157
NULL,
158
};
159
ATTRIBUTE_GROUPS(dlm);
160
161
static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162
char *buf)
163
{
164
struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
165
struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166
return a->show ? a->show(ls, buf) : 0;
167
}
168
169
static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170
const char *buf, size_t len)
171
{
172
struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
173
struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174
return a->store ? a->store(ls, buf, len) : len;
175
}
176
177
static const struct sysfs_ops dlm_attr_ops = {
178
.show = dlm_attr_show,
179
.store = dlm_attr_store,
180
};
181
182
static struct kobj_type dlm_ktype = {
183
.default_groups = dlm_groups,
184
.sysfs_ops = &dlm_attr_ops,
185
};
186
187
static struct kset *dlm_kset;
188
189
static int do_uevent(struct dlm_ls *ls, int in, unsigned int release_recover)
190
{
191
char message[512] = {};
192
char *envp[] = { message, NULL };
193
194
if (in) {
195
kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
196
} else {
197
snprintf(message, 511, "RELEASE_RECOVER=%u", release_recover);
198
kobject_uevent_env(&ls->ls_kobj, KOBJ_OFFLINE, envp);
199
}
200
201
log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
202
203
/* dlm_controld will see the uevent, do the necessary group management
204
and then write to sysfs to wake us */
205
206
wait_event(ls->ls_uevent_wait,
207
test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
208
209
log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
210
211
return ls->ls_uevent_result;
212
}
213
214
static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
215
{
216
const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
217
218
add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
219
return 0;
220
}
221
222
static const struct kset_uevent_ops dlm_uevent_ops = {
223
.uevent = dlm_uevent,
224
};
225
226
int __init dlm_lockspace_init(void)
227
{
228
ls_count = 0;
229
mutex_init(&ls_lock);
230
INIT_LIST_HEAD(&lslist);
231
spin_lock_init(&lslist_lock);
232
233
dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
234
if (!dlm_kset) {
235
printk(KERN_WARNING "%s: can not create kset\n", __func__);
236
return -ENOMEM;
237
}
238
return 0;
239
}
240
241
void dlm_lockspace_exit(void)
242
{
243
kset_unregister(dlm_kset);
244
}
245
246
struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
247
{
248
struct dlm_ls *ls;
249
250
spin_lock_bh(&lslist_lock);
251
252
list_for_each_entry(ls, &lslist, ls_list) {
253
if (ls->ls_global_id == id) {
254
atomic_inc(&ls->ls_count);
255
goto out;
256
}
257
}
258
ls = NULL;
259
out:
260
spin_unlock_bh(&lslist_lock);
261
return ls;
262
}
263
264
struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
265
{
266
struct dlm_ls *ls = lockspace;
267
268
atomic_inc(&ls->ls_count);
269
return ls;
270
}
271
272
struct dlm_ls *dlm_find_lockspace_device(int minor)
273
{
274
struct dlm_ls *ls;
275
276
spin_lock_bh(&lslist_lock);
277
list_for_each_entry(ls, &lslist, ls_list) {
278
if (ls->ls_device.minor == minor) {
279
atomic_inc(&ls->ls_count);
280
goto out;
281
}
282
}
283
ls = NULL;
284
out:
285
spin_unlock_bh(&lslist_lock);
286
return ls;
287
}
288
289
void dlm_put_lockspace(struct dlm_ls *ls)
290
{
291
if (atomic_dec_and_test(&ls->ls_count))
292
wake_up(&ls->ls_count_wait);
293
}
294
295
static void remove_lockspace(struct dlm_ls *ls)
296
{
297
retry:
298
wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
299
300
spin_lock_bh(&lslist_lock);
301
if (atomic_read(&ls->ls_count) != 0) {
302
spin_unlock_bh(&lslist_lock);
303
goto retry;
304
}
305
306
WARN_ON(ls->ls_create_count != 0);
307
list_del(&ls->ls_list);
308
spin_unlock_bh(&lslist_lock);
309
}
310
311
static int threads_start(void)
312
{
313
int error;
314
315
/* Thread for sending/receiving messages for all lockspace's */
316
error = dlm_midcomms_start();
317
if (error)
318
log_print("cannot start dlm midcomms %d", error);
319
320
return error;
321
}
322
323
static int lkb_idr_free(struct dlm_lkb *lkb)
324
{
325
if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
326
dlm_free_lvb(lkb->lkb_lvbptr);
327
328
dlm_free_lkb(lkb);
329
return 0;
330
}
331
332
static void rhash_free_rsb(void *ptr, void *arg)
333
{
334
struct dlm_rsb *rsb = ptr;
335
336
dlm_free_rsb(rsb);
337
}
338
339
static void free_lockspace(struct work_struct *work)
340
{
341
struct dlm_ls *ls = container_of(work, struct dlm_ls, ls_free_work);
342
struct dlm_lkb *lkb;
343
unsigned long id;
344
345
/*
346
* Free all lkb's in xa
347
*/
348
xa_for_each(&ls->ls_lkbxa, id, lkb) {
349
lkb_idr_free(lkb);
350
}
351
xa_destroy(&ls->ls_lkbxa);
352
353
/*
354
* Free all rsb's on rsbtbl
355
*/
356
rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
357
358
kfree(ls);
359
}
360
361
static int new_lockspace(const char *name, const char *cluster,
362
uint32_t flags, int lvblen,
363
const struct dlm_lockspace_ops *ops, void *ops_arg,
364
int *ops_result, dlm_lockspace_t **lockspace)
365
{
366
struct dlm_ls *ls;
367
int namelen = strlen(name);
368
int error;
369
370
if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
371
return -EINVAL;
372
373
if (lvblen % 8)
374
return -EINVAL;
375
376
if (!try_module_get(THIS_MODULE))
377
return -EINVAL;
378
379
if (!dlm_user_daemon_available()) {
380
log_print("dlm user daemon not available");
381
error = -EUNATCH;
382
goto out;
383
}
384
385
if (ops && ops_result) {
386
if (!dlm_config.ci_recover_callbacks)
387
*ops_result = -EOPNOTSUPP;
388
else
389
*ops_result = 0;
390
}
391
392
if (!cluster)
393
log_print("dlm cluster name '%s' is being used without an application provided cluster name",
394
dlm_config.ci_cluster_name);
395
396
if (dlm_config.ci_recover_callbacks && cluster &&
397
strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
398
log_print("dlm cluster name '%s' does not match "
399
"the application cluster name '%s'",
400
dlm_config.ci_cluster_name, cluster);
401
error = -EBADR;
402
goto out;
403
}
404
405
error = 0;
406
407
spin_lock_bh(&lslist_lock);
408
list_for_each_entry(ls, &lslist, ls_list) {
409
WARN_ON(ls->ls_create_count <= 0);
410
if (ls->ls_namelen != namelen)
411
continue;
412
if (memcmp(ls->ls_name, name, namelen))
413
continue;
414
if (flags & DLM_LSFL_NEWEXCL) {
415
error = -EEXIST;
416
break;
417
}
418
ls->ls_create_count++;
419
*lockspace = ls;
420
error = 1;
421
break;
422
}
423
spin_unlock_bh(&lslist_lock);
424
425
if (error)
426
goto out;
427
428
error = -ENOMEM;
429
430
ls = kzalloc(sizeof(*ls), GFP_NOFS);
431
if (!ls)
432
goto out;
433
memcpy(ls->ls_name, name, namelen);
434
ls->ls_namelen = namelen;
435
ls->ls_lvblen = lvblen;
436
atomic_set(&ls->ls_count, 0);
437
init_waitqueue_head(&ls->ls_count_wait);
438
ls->ls_flags = 0;
439
440
if (ops && dlm_config.ci_recover_callbacks) {
441
ls->ls_ops = ops;
442
ls->ls_ops_arg = ops_arg;
443
}
444
445
if (flags & DLM_LSFL_SOFTIRQ)
446
set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
447
448
/* ls_exflags are forced to match among nodes, and we don't
449
* need to require all nodes to have some flags set
450
*/
451
ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
452
DLM_LSFL_SOFTIRQ));
453
454
INIT_LIST_HEAD(&ls->ls_slow_inactive);
455
INIT_LIST_HEAD(&ls->ls_slow_active);
456
rwlock_init(&ls->ls_rsbtbl_lock);
457
458
error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
459
if (error)
460
goto out_lsfree;
461
462
xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
463
rwlock_init(&ls->ls_lkbxa_lock);
464
465
INIT_LIST_HEAD(&ls->ls_waiters);
466
spin_lock_init(&ls->ls_waiters_lock);
467
INIT_LIST_HEAD(&ls->ls_orphans);
468
spin_lock_init(&ls->ls_orphans_lock);
469
470
INIT_LIST_HEAD(&ls->ls_nodes);
471
INIT_LIST_HEAD(&ls->ls_nodes_gone);
472
ls->ls_num_nodes = 0;
473
ls->ls_low_nodeid = 0;
474
ls->ls_total_weight = 0;
475
ls->ls_node_array = NULL;
476
477
memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
478
ls->ls_local_rsb.res_ls = ls;
479
480
ls->ls_debug_rsb_dentry = NULL;
481
ls->ls_debug_waiters_dentry = NULL;
482
483
init_waitqueue_head(&ls->ls_uevent_wait);
484
ls->ls_uevent_result = 0;
485
init_completion(&ls->ls_recovery_done);
486
ls->ls_recovery_result = -1;
487
488
spin_lock_init(&ls->ls_cb_lock);
489
INIT_LIST_HEAD(&ls->ls_cb_delay);
490
491
INIT_WORK(&ls->ls_free_work, free_lockspace);
492
493
ls->ls_recoverd_task = NULL;
494
mutex_init(&ls->ls_recoverd_active);
495
spin_lock_init(&ls->ls_recover_lock);
496
spin_lock_init(&ls->ls_rcom_spin);
497
get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
498
ls->ls_recover_status = 0;
499
ls->ls_recover_seq = get_random_u64();
500
ls->ls_recover_args = NULL;
501
init_rwsem(&ls->ls_in_recovery);
502
rwlock_init(&ls->ls_recv_active);
503
INIT_LIST_HEAD(&ls->ls_requestqueue);
504
rwlock_init(&ls->ls_requestqueue_lock);
505
spin_lock_init(&ls->ls_clear_proc_locks);
506
507
/* Due backwards compatibility with 3.1 we need to use maximum
508
* possible dlm message size to be sure the message will fit and
509
* not having out of bounds issues. However on sending side 3.2
510
* might send less.
511
*/
512
ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
513
if (!ls->ls_recover_buf) {
514
error = -ENOMEM;
515
goto out_lkbxa;
516
}
517
518
ls->ls_slot = 0;
519
ls->ls_num_slots = 0;
520
ls->ls_slots_size = 0;
521
ls->ls_slots = NULL;
522
523
INIT_LIST_HEAD(&ls->ls_recover_list);
524
spin_lock_init(&ls->ls_recover_list_lock);
525
xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
526
spin_lock_init(&ls->ls_recover_xa_lock);
527
ls->ls_recover_list_count = 0;
528
init_waitqueue_head(&ls->ls_wait_general);
529
INIT_LIST_HEAD(&ls->ls_masters_list);
530
rwlock_init(&ls->ls_masters_lock);
531
INIT_LIST_HEAD(&ls->ls_dir_dump_list);
532
rwlock_init(&ls->ls_dir_dump_lock);
533
534
INIT_LIST_HEAD(&ls->ls_scan_list);
535
spin_lock_init(&ls->ls_scan_lock);
536
timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
537
538
spin_lock_bh(&lslist_lock);
539
ls->ls_create_count = 1;
540
list_add(&ls->ls_list, &lslist);
541
spin_unlock_bh(&lslist_lock);
542
543
if (flags & DLM_LSFL_FS)
544
set_bit(LSFL_FS, &ls->ls_flags);
545
546
error = dlm_callback_start(ls);
547
if (error) {
548
log_error(ls, "can't start dlm_callback %d", error);
549
goto out_delist;
550
}
551
552
init_waitqueue_head(&ls->ls_recover_lock_wait);
553
554
/*
555
* Once started, dlm_recoverd first looks for ls in lslist, then
556
* initializes ls_in_recovery as locked in "down" mode. We need
557
* to wait for the wakeup from dlm_recoverd because in_recovery
558
* has to start out in down mode.
559
*/
560
561
error = dlm_recoverd_start(ls);
562
if (error) {
563
log_error(ls, "can't start dlm_recoverd %d", error);
564
goto out_callback;
565
}
566
567
wait_event(ls->ls_recover_lock_wait,
568
test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
569
570
ls->ls_kobj.kset = dlm_kset;
571
error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
572
"%s", ls->ls_name);
573
if (error)
574
goto out_recoverd;
575
kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
576
577
/* This uevent triggers dlm_controld in userspace to add us to the
578
group of nodes that are members of this lockspace (managed by the
579
cluster infrastructure.) Once it's done that, it tells us who the
580
current lockspace members are (via configfs) and then tells the
581
lockspace to start running (via sysfs) in dlm_ls_start(). */
582
583
error = do_uevent(ls, 1, 0);
584
if (error < 0)
585
goto out_recoverd;
586
587
/* wait until recovery is successful or failed */
588
wait_for_completion(&ls->ls_recovery_done);
589
error = ls->ls_recovery_result;
590
if (error)
591
goto out_members;
592
593
dlm_create_debug_file(ls);
594
595
log_rinfo(ls, "join complete");
596
*lockspace = ls;
597
return 0;
598
599
out_members:
600
do_uevent(ls, 0, 0);
601
dlm_clear_members(ls);
602
kfree(ls->ls_node_array);
603
out_recoverd:
604
dlm_recoverd_stop(ls);
605
out_callback:
606
dlm_callback_stop(ls);
607
out_delist:
608
spin_lock_bh(&lslist_lock);
609
list_del(&ls->ls_list);
610
spin_unlock_bh(&lslist_lock);
611
xa_destroy(&ls->ls_recover_xa);
612
kfree(ls->ls_recover_buf);
613
out_lkbxa:
614
xa_destroy(&ls->ls_lkbxa);
615
rhashtable_destroy(&ls->ls_rsbtbl);
616
out_lsfree:
617
kobject_put(&ls->ls_kobj);
618
kfree(ls);
619
out:
620
module_put(THIS_MODULE);
621
return error;
622
}
623
624
static int __dlm_new_lockspace(const char *name, const char *cluster,
625
uint32_t flags, int lvblen,
626
const struct dlm_lockspace_ops *ops,
627
void *ops_arg, int *ops_result,
628
dlm_lockspace_t **lockspace)
629
{
630
int error = 0;
631
632
mutex_lock(&ls_lock);
633
if (!ls_count)
634
error = threads_start();
635
if (error)
636
goto out;
637
638
error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
639
ops_result, lockspace);
640
if (!error)
641
ls_count++;
642
if (error > 0)
643
error = 0;
644
if (!ls_count) {
645
dlm_midcomms_shutdown();
646
dlm_midcomms_stop();
647
}
648
out:
649
mutex_unlock(&ls_lock);
650
return error;
651
}
652
653
int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
654
int lvblen, const struct dlm_lockspace_ops *ops,
655
void *ops_arg, int *ops_result,
656
dlm_lockspace_t **lockspace)
657
{
658
return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
659
ops, ops_arg, ops_result, lockspace);
660
}
661
662
int dlm_new_user_lockspace(const char *name, const char *cluster,
663
uint32_t flags, int lvblen,
664
const struct dlm_lockspace_ops *ops,
665
void *ops_arg, int *ops_result,
666
dlm_lockspace_t **lockspace)
667
{
668
if (flags & DLM_LSFL_SOFTIRQ)
669
return -EINVAL;
670
671
return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
672
ops_arg, ops_result, lockspace);
673
}
674
675
/* NOTE: We check the lkbxa here rather than the resource table.
676
This is because there may be LKBs queued as ASTs that have been unlinked
677
from their RSBs and are pending deletion once the AST has been delivered */
678
679
static int lockspace_busy(struct dlm_ls *ls, unsigned int release_option)
680
{
681
struct dlm_lkb *lkb;
682
unsigned long id;
683
int rv = 0;
684
685
read_lock_bh(&ls->ls_lkbxa_lock);
686
if (release_option == DLM_RELEASE_NO_LOCKS) {
687
xa_for_each(&ls->ls_lkbxa, id, lkb) {
688
rv = 1;
689
break;
690
}
691
} else if (release_option == DLM_RELEASE_UNUSED) {
692
/* TODO: handle this UNUSED option as NO_LOCKS in later patch */
693
xa_for_each(&ls->ls_lkbxa, id, lkb) {
694
if (lkb->lkb_nodeid == 0 &&
695
lkb->lkb_grmode != DLM_LOCK_IV) {
696
rv = 1;
697
break;
698
}
699
}
700
} else {
701
rv = 0;
702
}
703
read_unlock_bh(&ls->ls_lkbxa_lock);
704
return rv;
705
}
706
707
static int release_lockspace(struct dlm_ls *ls, unsigned int release_option)
708
{
709
int busy, rv;
710
711
busy = lockspace_busy(ls, release_option);
712
713
spin_lock_bh(&lslist_lock);
714
if (ls->ls_create_count == 1) {
715
if (busy) {
716
rv = -EBUSY;
717
} else {
718
/* remove_lockspace takes ls off lslist */
719
ls->ls_create_count = 0;
720
rv = 0;
721
}
722
} else if (ls->ls_create_count > 1) {
723
rv = --ls->ls_create_count;
724
} else {
725
rv = -EINVAL;
726
}
727
spin_unlock_bh(&lslist_lock);
728
729
if (rv) {
730
log_debug(ls, "release_lockspace no remove %d", rv);
731
return rv;
732
}
733
734
if (ls_count == 1)
735
dlm_midcomms_version_wait();
736
737
dlm_device_deregister(ls);
738
739
if (release_option != DLM_RELEASE_NO_EVENT &&
740
dlm_user_daemon_available())
741
do_uevent(ls, 0, (release_option == DLM_RELEASE_RECOVER));
742
743
dlm_recoverd_stop(ls);
744
745
/* clear the LSFL_RUNNING flag to fast up
746
* time_shutdown_sync(), we don't care anymore
747
*/
748
clear_bit(LSFL_RUNNING, &ls->ls_flags);
749
timer_shutdown_sync(&ls->ls_scan_timer);
750
751
if (ls_count == 1) {
752
dlm_clear_members(ls);
753
dlm_midcomms_shutdown();
754
}
755
756
dlm_callback_stop(ls);
757
758
remove_lockspace(ls);
759
760
dlm_delete_debug_file(ls);
761
762
kobject_put(&ls->ls_kobj);
763
764
xa_destroy(&ls->ls_recover_xa);
765
kfree(ls->ls_recover_buf);
766
767
/*
768
* Free structures on any other lists
769
*/
770
771
dlm_purge_requestqueue(ls);
772
kfree(ls->ls_recover_args);
773
dlm_clear_members(ls);
774
dlm_clear_members_gone(ls);
775
kfree(ls->ls_node_array);
776
777
log_rinfo(ls, "%s final free", __func__);
778
779
/* delayed free of data structures see free_lockspace() */
780
queue_work(dlm_wq, &ls->ls_free_work);
781
module_put(THIS_MODULE);
782
return 0;
783
}
784
785
/*
786
* Called when a system has released all its locks and is not going to use the
787
* lockspace any longer. We free everything we're managing for this lockspace.
788
* Remaining nodes will go through the recovery process as if we'd died. The
789
* lockspace must continue to function as usual, participating in recoveries,
790
* until this returns.
791
*
792
* See DLM_RELEASE defines for release_option values and their meaning.
793
*/
794
795
int dlm_release_lockspace(void *lockspace, unsigned int release_option)
796
{
797
struct dlm_ls *ls;
798
int error;
799
800
if (release_option > __DLM_RELEASE_MAX)
801
return -EINVAL;
802
803
ls = dlm_find_lockspace_local(lockspace);
804
if (!ls)
805
return -EINVAL;
806
dlm_put_lockspace(ls);
807
808
mutex_lock(&ls_lock);
809
error = release_lockspace(ls, release_option);
810
if (!error)
811
ls_count--;
812
if (!ls_count)
813
dlm_midcomms_stop();
814
mutex_unlock(&ls_lock);
815
816
return error;
817
}
818
819
void dlm_stop_lockspaces(void)
820
{
821
struct dlm_ls *ls;
822
int count;
823
824
restart:
825
count = 0;
826
spin_lock_bh(&lslist_lock);
827
list_for_each_entry(ls, &lslist, ls_list) {
828
if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
829
count++;
830
continue;
831
}
832
spin_unlock_bh(&lslist_lock);
833
log_error(ls, "no userland control daemon, stopping lockspace");
834
dlm_ls_stop(ls);
835
goto restart;
836
}
837
spin_unlock_bh(&lslist_lock);
838
839
if (count)
840
log_print("dlm user daemon left %d lockspaces", count);
841
}
842
843