Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/dlm/member.c
29265 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/******************************************************************************
3
*******************************************************************************
4
**
5
** Copyright (C) 2005-2011 Red Hat, Inc. All rights reserved.
6
**
7
**
8
*******************************************************************************
9
******************************************************************************/
10
11
#include "dlm_internal.h"
12
#include "lockspace.h"
13
#include "member.h"
14
#include "recoverd.h"
15
#include "recover.h"
16
#include "rcom.h"
17
#include "config.h"
18
#include "midcomms.h"
19
#include "lowcomms.h"
20
21
int dlm_slots_version(const struct dlm_header *h)
22
{
23
if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS)
24
return 0;
25
return 1;
26
}
27
28
void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
29
struct dlm_member *memb)
30
{
31
struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
32
33
if (!dlm_slots_version(&rc->rc_header))
34
return;
35
36
memb->slot = le16_to_cpu(rf->rf_our_slot);
37
memb->generation = le32_to_cpu(rf->rf_generation);
38
}
39
40
void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
41
{
42
struct dlm_slot *slot;
43
struct rcom_slot *ro;
44
int i;
45
46
ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
47
48
/* ls_slots array is sparse, but not rcom_slots */
49
50
for (i = 0; i < ls->ls_slots_size; i++) {
51
slot = &ls->ls_slots[i];
52
if (!slot->nodeid)
53
continue;
54
ro->ro_nodeid = cpu_to_le32(slot->nodeid);
55
ro->ro_slot = cpu_to_le16(slot->slot);
56
ro++;
57
}
58
}
59
60
#define SLOT_DEBUG_LINE 128
61
62
static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
63
struct rcom_slot *ro0, struct dlm_slot *array,
64
int array_size)
65
{
66
char line[SLOT_DEBUG_LINE];
67
int len = SLOT_DEBUG_LINE - 1;
68
int pos = 0;
69
int ret, i;
70
71
memset(line, 0, sizeof(line));
72
73
if (array) {
74
for (i = 0; i < array_size; i++) {
75
if (!array[i].nodeid)
76
continue;
77
78
ret = snprintf(line + pos, len - pos, " %d:%d",
79
array[i].slot, array[i].nodeid);
80
if (ret >= len - pos)
81
break;
82
pos += ret;
83
}
84
} else if (ro0) {
85
for (i = 0; i < num_slots; i++) {
86
ret = snprintf(line + pos, len - pos, " %d:%d",
87
ro0[i].ro_slot, ro0[i].ro_nodeid);
88
if (ret >= len - pos)
89
break;
90
pos += ret;
91
}
92
}
93
94
log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
95
}
96
97
int dlm_slots_copy_in(struct dlm_ls *ls)
98
{
99
struct dlm_member *memb;
100
struct dlm_rcom *rc = ls->ls_recover_buf;
101
struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
102
struct rcom_slot *ro0, *ro;
103
int our_nodeid = dlm_our_nodeid();
104
int i, num_slots;
105
uint32_t gen;
106
107
if (!dlm_slots_version(&rc->rc_header))
108
return -1;
109
110
gen = le32_to_cpu(rf->rf_generation);
111
if (gen <= ls->ls_generation) {
112
log_error(ls, "dlm_slots_copy_in gen %u old %u",
113
gen, ls->ls_generation);
114
}
115
ls->ls_generation = gen;
116
117
num_slots = le16_to_cpu(rf->rf_num_slots);
118
if (!num_slots)
119
return -1;
120
121
ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
122
123
log_slots(ls, gen, num_slots, ro0, NULL, 0);
124
125
list_for_each_entry(memb, &ls->ls_nodes, list) {
126
for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
127
if (le32_to_cpu(ro->ro_nodeid) != memb->nodeid)
128
continue;
129
memb->slot = le16_to_cpu(ro->ro_slot);
130
memb->slot_prev = memb->slot;
131
break;
132
}
133
134
if (memb->nodeid == our_nodeid) {
135
if (ls->ls_slot && ls->ls_slot != memb->slot) {
136
log_error(ls, "dlm_slots_copy_in our slot "
137
"changed %d %d", ls->ls_slot,
138
memb->slot);
139
return -1;
140
}
141
142
if (!ls->ls_slot)
143
ls->ls_slot = memb->slot;
144
}
145
146
if (!memb->slot) {
147
log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
148
memb->nodeid);
149
return -1;
150
}
151
}
152
153
return 0;
154
}
155
156
/* for any nodes that do not support slots, we will not have set memb->slot
157
in wait_status_all(), so memb->slot will remain -1, and we will not
158
assign slots or set ls_num_slots here */
159
160
int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
161
struct dlm_slot **slots_out, uint32_t *gen_out)
162
{
163
struct dlm_member *memb;
164
struct dlm_slot *array;
165
int our_nodeid = dlm_our_nodeid();
166
int array_size, max_slots, i;
167
int need = 0;
168
int max = 0;
169
int num = 0;
170
uint32_t gen = 0;
171
172
/* our own memb struct will have slot -1 gen 0 */
173
174
list_for_each_entry(memb, &ls->ls_nodes, list) {
175
if (memb->nodeid == our_nodeid) {
176
memb->slot = ls->ls_slot;
177
memb->generation = ls->ls_generation;
178
break;
179
}
180
}
181
182
list_for_each_entry(memb, &ls->ls_nodes, list) {
183
if (memb->generation > gen)
184
gen = memb->generation;
185
186
/* node doesn't support slots */
187
188
if (memb->slot == -1)
189
return -1;
190
191
/* node needs a slot assigned */
192
193
if (!memb->slot)
194
need++;
195
196
/* node has a slot assigned */
197
198
num++;
199
200
if (!max || max < memb->slot)
201
max = memb->slot;
202
203
/* sanity check, once slot is assigned it shouldn't change */
204
205
if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
206
log_error(ls, "nodeid %d slot changed %d %d",
207
memb->nodeid, memb->slot_prev, memb->slot);
208
return -1;
209
}
210
memb->slot_prev = memb->slot;
211
}
212
213
array_size = max + need;
214
array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
215
if (!array)
216
return -ENOMEM;
217
218
num = 0;
219
220
/* fill in slots (offsets) that are used */
221
222
list_for_each_entry(memb, &ls->ls_nodes, list) {
223
if (!memb->slot)
224
continue;
225
226
if (memb->slot > array_size) {
227
log_error(ls, "invalid slot number %d", memb->slot);
228
kfree(array);
229
return -1;
230
}
231
232
array[memb->slot - 1].nodeid = memb->nodeid;
233
array[memb->slot - 1].slot = memb->slot;
234
num++;
235
}
236
237
/* assign new slots from unused offsets */
238
239
list_for_each_entry(memb, &ls->ls_nodes, list) {
240
if (memb->slot)
241
continue;
242
243
for (i = 0; i < array_size; i++) {
244
if (array[i].nodeid)
245
continue;
246
247
memb->slot = i + 1;
248
memb->slot_prev = memb->slot;
249
array[i].nodeid = memb->nodeid;
250
array[i].slot = memb->slot;
251
num++;
252
253
if (!ls->ls_slot && memb->nodeid == our_nodeid)
254
ls->ls_slot = memb->slot;
255
break;
256
}
257
258
if (!memb->slot) {
259
log_error(ls, "no free slot found");
260
kfree(array);
261
return -1;
262
}
263
}
264
265
gen++;
266
267
log_slots(ls, gen, num, NULL, array, array_size);
268
269
max_slots = (DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom) -
270
sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
271
272
if (num > max_slots) {
273
log_error(ls, "num_slots %d exceeds max_slots %d",
274
num, max_slots);
275
kfree(array);
276
return -1;
277
}
278
279
*gen_out = gen;
280
*slots_out = array;
281
*slots_size = array_size;
282
*num_slots = num;
283
return 0;
284
}
285
286
static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
287
{
288
struct dlm_member *memb = NULL;
289
struct list_head *tmp;
290
struct list_head *newlist = &new->list;
291
struct list_head *head = &ls->ls_nodes;
292
293
list_for_each(tmp, head) {
294
memb = list_entry(tmp, struct dlm_member, list);
295
if (new->nodeid < memb->nodeid)
296
break;
297
}
298
299
if (!memb)
300
list_add_tail(newlist, head);
301
else {
302
/* FIXME: can use list macro here */
303
newlist->prev = tmp->prev;
304
newlist->next = tmp;
305
tmp->prev->next = newlist;
306
tmp->prev = newlist;
307
}
308
}
309
310
static int add_remote_member(int nodeid)
311
{
312
int error;
313
314
if (nodeid == dlm_our_nodeid())
315
return 0;
316
317
error = dlm_lowcomms_connect_node(nodeid);
318
if (error < 0)
319
return error;
320
321
dlm_midcomms_add_member(nodeid);
322
return 0;
323
}
324
325
static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
326
{
327
struct dlm_member *memb;
328
int error;
329
330
memb = kzalloc(sizeof(*memb), GFP_NOFS);
331
if (!memb)
332
return -ENOMEM;
333
334
memb->nodeid = node->nodeid;
335
memb->weight = node->weight;
336
memb->comm_seq = node->comm_seq;
337
338
error = add_remote_member(node->nodeid);
339
if (error < 0) {
340
kfree(memb);
341
return error;
342
}
343
344
add_ordered_member(ls, memb);
345
ls->ls_num_nodes++;
346
return 0;
347
}
348
349
static struct dlm_member *find_memb(struct list_head *head, int nodeid)
350
{
351
struct dlm_member *memb;
352
353
list_for_each_entry(memb, head, list) {
354
if (memb->nodeid == nodeid)
355
return memb;
356
}
357
return NULL;
358
}
359
360
int dlm_is_member(struct dlm_ls *ls, int nodeid)
361
{
362
if (find_memb(&ls->ls_nodes, nodeid))
363
return 1;
364
return 0;
365
}
366
367
int dlm_is_removed(struct dlm_ls *ls, int nodeid)
368
{
369
WARN_ON_ONCE(!nodeid || nodeid == -1);
370
371
if (find_memb(&ls->ls_nodes_gone, nodeid))
372
return 1;
373
return 0;
374
}
375
376
static void clear_memb_list(struct list_head *head,
377
void (*after_del)(int nodeid))
378
{
379
struct dlm_member *memb;
380
381
while (!list_empty(head)) {
382
memb = list_entry(head->next, struct dlm_member, list);
383
list_del(&memb->list);
384
if (after_del)
385
after_del(memb->nodeid);
386
kfree(memb);
387
}
388
}
389
390
static void remove_remote_member(int nodeid)
391
{
392
if (nodeid == dlm_our_nodeid())
393
return;
394
395
dlm_midcomms_remove_member(nodeid);
396
}
397
398
void dlm_clear_members(struct dlm_ls *ls)
399
{
400
clear_memb_list(&ls->ls_nodes, remove_remote_member);
401
ls->ls_num_nodes = 0;
402
}
403
404
void dlm_clear_members_gone(struct dlm_ls *ls)
405
{
406
clear_memb_list(&ls->ls_nodes_gone, NULL);
407
}
408
409
static void make_member_array(struct dlm_ls *ls)
410
{
411
struct dlm_member *memb;
412
int i, w, x = 0, total = 0, all_zero = 0, *array;
413
414
kfree(ls->ls_node_array);
415
ls->ls_node_array = NULL;
416
417
list_for_each_entry(memb, &ls->ls_nodes, list) {
418
if (memb->weight)
419
total += memb->weight;
420
}
421
422
/* all nodes revert to weight of 1 if all have weight 0 */
423
424
if (!total) {
425
total = ls->ls_num_nodes;
426
all_zero = 1;
427
}
428
429
ls->ls_total_weight = total;
430
array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
431
if (!array)
432
return;
433
434
list_for_each_entry(memb, &ls->ls_nodes, list) {
435
if (!all_zero && !memb->weight)
436
continue;
437
438
if (all_zero)
439
w = 1;
440
else
441
w = memb->weight;
442
443
DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
444
445
for (i = 0; i < w; i++)
446
array[x++] = memb->nodeid;
447
}
448
449
ls->ls_node_array = array;
450
}
451
452
/* send a status request to all members just to establish comms connections */
453
454
static int ping_members(struct dlm_ls *ls, uint64_t seq)
455
{
456
struct dlm_member *memb;
457
int error = 0;
458
459
list_for_each_entry(memb, &ls->ls_nodes, list) {
460
if (dlm_recovery_stopped(ls)) {
461
error = -EINTR;
462
break;
463
}
464
error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
465
if (error)
466
break;
467
}
468
if (error)
469
log_rinfo(ls, "ping_members aborted %d last nodeid %d",
470
error, ls->ls_recover_nodeid);
471
return error;
472
}
473
474
static void dlm_lsop_recover_prep(struct dlm_ls *ls)
475
{
476
if (!ls->ls_ops || !ls->ls_ops->recover_prep)
477
return;
478
ls->ls_ops->recover_prep(ls->ls_ops_arg);
479
}
480
481
static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb,
482
unsigned int release_recover)
483
{
484
struct dlm_slot slot;
485
uint32_t seq;
486
int error;
487
488
if (!ls->ls_ops || !ls->ls_ops->recover_slot)
489
return;
490
491
/* if there is no comms connection with this node
492
or the present comms connection is newer
493
than the one when this member was added, then
494
we consider the node to have failed (versus
495
being removed due to dlm_release_lockspace) */
496
497
error = dlm_comm_seq(memb->nodeid, &seq, false);
498
499
if (!release_recover && !error && seq == memb->comm_seq)
500
return;
501
502
slot.nodeid = memb->nodeid;
503
slot.slot = memb->slot;
504
505
ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
506
}
507
508
void dlm_lsop_recover_done(struct dlm_ls *ls)
509
{
510
struct dlm_member *memb;
511
struct dlm_slot *slots;
512
int i, num;
513
514
if (!ls->ls_ops || !ls->ls_ops->recover_done)
515
return;
516
517
num = ls->ls_num_nodes;
518
slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
519
if (!slots)
520
return;
521
522
i = 0;
523
list_for_each_entry(memb, &ls->ls_nodes, list) {
524
if (i == num) {
525
log_error(ls, "dlm_lsop_recover_done bad num %d", num);
526
goto out;
527
}
528
slots[i].nodeid = memb->nodeid;
529
slots[i].slot = memb->slot;
530
i++;
531
}
532
533
ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
534
ls->ls_slot, ls->ls_generation);
535
out:
536
kfree(slots);
537
}
538
539
static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
540
int nodeid)
541
{
542
int i;
543
544
for (i = 0; i < rv->nodes_count; i++) {
545
if (rv->nodes[i].nodeid == nodeid)
546
return &rv->nodes[i];
547
}
548
return NULL;
549
}
550
551
int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
552
{
553
struct dlm_member *memb, *safe;
554
struct dlm_config_node *node;
555
int i, error, neg = 0, low = -1;
556
unsigned int release_recover;
557
558
/* previously removed members that we've not finished removing need to
559
* count as a negative change so the "neg" recovery steps will happen
560
*
561
* This functionality must report all member changes to lsops or
562
* midcomms layer and must never return before.
563
*/
564
565
list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
566
log_rinfo(ls, "prev removed member %d", memb->nodeid);
567
neg++;
568
}
569
570
/* move departed members from ls_nodes to ls_nodes_gone */
571
572
list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
573
node = find_config_node(rv, memb->nodeid);
574
if (!node) {
575
log_error(ls, "remove member %d invalid",
576
memb->nodeid);
577
return -EFAULT;
578
}
579
580
if (!node->new && !node->gone)
581
continue;
582
583
release_recover = 0;
584
585
if (node->gone) {
586
release_recover = node->release_recover;
587
log_rinfo(ls, "remove member %d%s", memb->nodeid,
588
release_recover ? " (release_recover)" : "");
589
} else {
590
/* removed and re-added */
591
log_rinfo(ls, "remove member %d comm_seq %u %u",
592
memb->nodeid, memb->comm_seq, node->comm_seq);
593
}
594
595
neg++;
596
list_move(&memb->list, &ls->ls_nodes_gone);
597
remove_remote_member(memb->nodeid);
598
ls->ls_num_nodes--;
599
dlm_lsop_recover_slot(ls, memb, release_recover);
600
}
601
602
/* add new members to ls_nodes */
603
604
for (i = 0; i < rv->nodes_count; i++) {
605
node = &rv->nodes[i];
606
if (node->gone)
607
continue;
608
609
if (dlm_is_member(ls, node->nodeid))
610
continue;
611
error = dlm_add_member(ls, node);
612
if (error)
613
return error;
614
615
log_rinfo(ls, "add member %d", node->nodeid);
616
}
617
618
list_for_each_entry(memb, &ls->ls_nodes, list) {
619
if (low == -1 || memb->nodeid < low)
620
low = memb->nodeid;
621
}
622
ls->ls_low_nodeid = low;
623
624
make_member_array(ls);
625
*neg_out = neg;
626
627
error = ping_members(ls, rv->seq);
628
log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
629
return error;
630
}
631
632
/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
633
dlm_ls_start() is called on any of them to start the new recovery. */
634
635
int dlm_ls_stop(struct dlm_ls *ls)
636
{
637
int new;
638
639
/*
640
* Prevent dlm_recv from being in the middle of something when we do
641
* the stop. This includes ensuring dlm_recv isn't processing a
642
* recovery message (rcom), while dlm_recoverd is aborting and
643
* resetting things from an in-progress recovery. i.e. we want
644
* dlm_recoverd to abort its recovery without worrying about dlm_recv
645
* processing an rcom at the same time. Stopping dlm_recv also makes
646
* it easy for dlm_receive_message() to check locking stopped and add a
647
* message to the requestqueue without races.
648
*/
649
650
write_lock_bh(&ls->ls_recv_active);
651
652
/*
653
* Abort any recovery that's in progress (see RECOVER_STOP,
654
* dlm_recovery_stopped()) and tell any other threads running in the
655
* dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
656
*/
657
658
spin_lock_bh(&ls->ls_recover_lock);
659
set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
660
new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
661
if (new)
662
timer_delete_sync(&ls->ls_scan_timer);
663
ls->ls_recover_seq++;
664
665
/* activate requestqueue and stop processing */
666
write_lock_bh(&ls->ls_requestqueue_lock);
667
set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
668
write_unlock_bh(&ls->ls_requestqueue_lock);
669
spin_unlock_bh(&ls->ls_recover_lock);
670
671
/*
672
* Let dlm_recv run again, now any normal messages will be saved on the
673
* requestqueue for later.
674
*/
675
676
write_unlock_bh(&ls->ls_recv_active);
677
678
/*
679
* This in_recovery lock does two things:
680
* 1) Keeps this function from returning until all threads are out
681
* of locking routines and locking is truly stopped.
682
* 2) Keeps any new requests from being processed until it's unlocked
683
* when recovery is complete.
684
*/
685
686
if (new) {
687
set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
688
wake_up_process(ls->ls_recoverd_task);
689
wait_event(ls->ls_recover_lock_wait,
690
test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
691
}
692
693
/*
694
* The recoverd suspend/resume makes sure that dlm_recoverd (if
695
* running) has noticed RECOVER_STOP above and quit processing the
696
* previous recovery.
697
*/
698
699
dlm_recoverd_suspend(ls);
700
701
spin_lock_bh(&ls->ls_recover_lock);
702
kfree(ls->ls_slots);
703
ls->ls_slots = NULL;
704
ls->ls_num_slots = 0;
705
ls->ls_slots_size = 0;
706
ls->ls_recover_status = 0;
707
spin_unlock_bh(&ls->ls_recover_lock);
708
709
dlm_recoverd_resume(ls);
710
711
if (!ls->ls_recover_begin)
712
ls->ls_recover_begin = jiffies;
713
714
/* call recover_prep ops only once and not multiple times
715
* for each possible dlm_ls_stop() when recovery is already
716
* stopped.
717
*
718
* If we successful was able to clear LSFL_RUNNING bit and
719
* it was set we know it is the first dlm_ls_stop() call.
720
*/
721
if (new)
722
dlm_lsop_recover_prep(ls);
723
724
return 0;
725
}
726
727
int dlm_ls_start(struct dlm_ls *ls)
728
{
729
struct dlm_recover *rv, *rv_old;
730
struct dlm_config_node *nodes = NULL;
731
int error, count;
732
733
rv = kzalloc(sizeof(*rv), GFP_NOFS);
734
if (!rv)
735
return -ENOMEM;
736
737
error = dlm_config_nodes(ls->ls_name, &nodes, &count);
738
if (error < 0)
739
goto fail_rv;
740
741
spin_lock_bh(&ls->ls_recover_lock);
742
743
/* the lockspace needs to be stopped before it can be started */
744
745
if (!dlm_locking_stopped(ls)) {
746
spin_unlock_bh(&ls->ls_recover_lock);
747
log_error(ls, "start ignored: lockspace running");
748
error = -EINVAL;
749
goto fail;
750
}
751
752
rv->nodes = nodes;
753
rv->nodes_count = count;
754
rv->seq = ++ls->ls_recover_seq;
755
rv_old = ls->ls_recover_args;
756
ls->ls_recover_args = rv;
757
spin_unlock_bh(&ls->ls_recover_lock);
758
759
if (rv_old) {
760
log_error(ls, "unused recovery %llx %d",
761
(unsigned long long)rv_old->seq, rv_old->nodes_count);
762
kfree(rv_old->nodes);
763
kfree(rv_old);
764
}
765
766
set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
767
wake_up_process(ls->ls_recoverd_task);
768
return 0;
769
770
fail:
771
kfree(nodes);
772
fail_rv:
773
kfree(rv);
774
return error;
775
}
776
777
778