Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/ceph/mds_client.c
29265 views
1
// SPDX-License-Identifier: GPL-2.0
2
#include <linux/ceph/ceph_debug.h>
3
4
#include <linux/fs.h>
5
#include <linux/wait.h>
6
#include <linux/slab.h>
7
#include <linux/gfp.h>
8
#include <linux/sched.h>
9
#include <linux/debugfs.h>
10
#include <linux/seq_file.h>
11
#include <linux/ratelimit.h>
12
#include <linux/bits.h>
13
#include <linux/ktime.h>
14
#include <linux/bitmap.h>
15
#include <linux/mnt_idmapping.h>
16
17
#include "super.h"
18
#include "mds_client.h"
19
#include "crypto.h"
20
21
#include <linux/ceph/ceph_features.h>
22
#include <linux/ceph/messenger.h>
23
#include <linux/ceph/decode.h>
24
#include <linux/ceph/pagelist.h>
25
#include <linux/ceph/auth.h>
26
#include <linux/ceph/debugfs.h>
27
28
#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
29
30
/*
31
* A cluster of MDS (metadata server) daemons is responsible for
32
* managing the file system namespace (the directory hierarchy and
33
* inodes) and for coordinating shared access to storage. Metadata is
34
* partitioning hierarchically across a number of servers, and that
35
* partition varies over time as the cluster adjusts the distribution
36
* in order to balance load.
37
*
38
* The MDS client is primarily responsible to managing synchronous
39
* metadata requests for operations like open, unlink, and so forth.
40
* If there is a MDS failure, we find out about it when we (possibly
41
* request and) receive a new MDS map, and can resubmit affected
42
* requests.
43
*
44
* For the most part, though, we take advantage of a lossless
45
* communications channel to the MDS, and do not need to worry about
46
* timing out or resubmitting requests.
47
*
48
* We maintain a stateful "session" with each MDS we interact with.
49
* Within each session, we sent periodic heartbeat messages to ensure
50
* any capabilities or leases we have been issues remain valid. If
51
* the session times out and goes stale, our leases and capabilities
52
* are no longer valid.
53
*/
54
55
struct ceph_reconnect_state {
56
struct ceph_mds_session *session;
57
int nr_caps, nr_realms;
58
struct ceph_pagelist *pagelist;
59
unsigned msg_version;
60
bool allow_multi;
61
};
62
63
static void __wake_requests(struct ceph_mds_client *mdsc,
64
struct list_head *head);
65
static void ceph_cap_release_work(struct work_struct *work);
66
static void ceph_cap_reclaim_work(struct work_struct *work);
67
68
static const struct ceph_connection_operations mds_con_ops;
69
70
71
/*
72
* mds reply parsing
73
*/
74
75
static int parse_reply_info_quota(void **p, void *end,
76
struct ceph_mds_reply_info_in *info)
77
{
78
u8 struct_v, struct_compat;
79
u32 struct_len;
80
81
ceph_decode_8_safe(p, end, struct_v, bad);
82
ceph_decode_8_safe(p, end, struct_compat, bad);
83
/* struct_v is expected to be >= 1. we only
84
* understand encoding with struct_compat == 1. */
85
if (!struct_v || struct_compat != 1)
86
goto bad;
87
ceph_decode_32_safe(p, end, struct_len, bad);
88
ceph_decode_need(p, end, struct_len, bad);
89
end = *p + struct_len;
90
ceph_decode_64_safe(p, end, info->max_bytes, bad);
91
ceph_decode_64_safe(p, end, info->max_files, bad);
92
*p = end;
93
return 0;
94
bad:
95
return -EIO;
96
}
97
98
/*
99
* parse individual inode info
100
*/
101
static int parse_reply_info_in(void **p, void *end,
102
struct ceph_mds_reply_info_in *info,
103
u64 features)
104
{
105
int err = 0;
106
u8 struct_v = 0;
107
108
if (features == (u64)-1) {
109
u32 struct_len;
110
u8 struct_compat;
111
ceph_decode_8_safe(p, end, struct_v, bad);
112
ceph_decode_8_safe(p, end, struct_compat, bad);
113
/* struct_v is expected to be >= 1. we only understand
114
* encoding with struct_compat == 1. */
115
if (!struct_v || struct_compat != 1)
116
goto bad;
117
ceph_decode_32_safe(p, end, struct_len, bad);
118
ceph_decode_need(p, end, struct_len, bad);
119
end = *p + struct_len;
120
}
121
122
ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
123
info->in = *p;
124
*p += sizeof(struct ceph_mds_reply_inode) +
125
sizeof(*info->in->fragtree.splits) *
126
le32_to_cpu(info->in->fragtree.nsplits);
127
128
ceph_decode_32_safe(p, end, info->symlink_len, bad);
129
ceph_decode_need(p, end, info->symlink_len, bad);
130
info->symlink = *p;
131
*p += info->symlink_len;
132
133
ceph_decode_copy_safe(p, end, &info->dir_layout,
134
sizeof(info->dir_layout), bad);
135
ceph_decode_32_safe(p, end, info->xattr_len, bad);
136
ceph_decode_need(p, end, info->xattr_len, bad);
137
info->xattr_data = *p;
138
*p += info->xattr_len;
139
140
if (features == (u64)-1) {
141
/* inline data */
142
ceph_decode_64_safe(p, end, info->inline_version, bad);
143
ceph_decode_32_safe(p, end, info->inline_len, bad);
144
ceph_decode_need(p, end, info->inline_len, bad);
145
info->inline_data = *p;
146
*p += info->inline_len;
147
/* quota */
148
err = parse_reply_info_quota(p, end, info);
149
if (err < 0)
150
goto out_bad;
151
/* pool namespace */
152
ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
153
if (info->pool_ns_len > 0) {
154
ceph_decode_need(p, end, info->pool_ns_len, bad);
155
info->pool_ns_data = *p;
156
*p += info->pool_ns_len;
157
}
158
159
/* btime */
160
ceph_decode_need(p, end, sizeof(info->btime), bad);
161
ceph_decode_copy(p, &info->btime, sizeof(info->btime));
162
163
/* change attribute */
164
ceph_decode_64_safe(p, end, info->change_attr, bad);
165
166
/* dir pin */
167
if (struct_v >= 2) {
168
ceph_decode_32_safe(p, end, info->dir_pin, bad);
169
} else {
170
info->dir_pin = -ENODATA;
171
}
172
173
/* snapshot birth time, remains zero for v<=2 */
174
if (struct_v >= 3) {
175
ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
176
ceph_decode_copy(p, &info->snap_btime,
177
sizeof(info->snap_btime));
178
} else {
179
memset(&info->snap_btime, 0, sizeof(info->snap_btime));
180
}
181
182
/* snapshot count, remains zero for v<=3 */
183
if (struct_v >= 4) {
184
ceph_decode_64_safe(p, end, info->rsnaps, bad);
185
} else {
186
info->rsnaps = 0;
187
}
188
189
if (struct_v >= 5) {
190
u32 alen;
191
192
ceph_decode_32_safe(p, end, alen, bad);
193
194
while (alen--) {
195
u32 len;
196
197
/* key */
198
ceph_decode_32_safe(p, end, len, bad);
199
ceph_decode_skip_n(p, end, len, bad);
200
/* value */
201
ceph_decode_32_safe(p, end, len, bad);
202
ceph_decode_skip_n(p, end, len, bad);
203
}
204
}
205
206
/* fscrypt flag -- ignore */
207
if (struct_v >= 6)
208
ceph_decode_skip_8(p, end, bad);
209
210
info->fscrypt_auth = NULL;
211
info->fscrypt_auth_len = 0;
212
info->fscrypt_file = NULL;
213
info->fscrypt_file_len = 0;
214
if (struct_v >= 7) {
215
ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad);
216
if (info->fscrypt_auth_len) {
217
info->fscrypt_auth = kmalloc(info->fscrypt_auth_len,
218
GFP_KERNEL);
219
if (!info->fscrypt_auth)
220
return -ENOMEM;
221
ceph_decode_copy_safe(p, end, info->fscrypt_auth,
222
info->fscrypt_auth_len, bad);
223
}
224
ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad);
225
if (info->fscrypt_file_len) {
226
info->fscrypt_file = kmalloc(info->fscrypt_file_len,
227
GFP_KERNEL);
228
if (!info->fscrypt_file)
229
return -ENOMEM;
230
ceph_decode_copy_safe(p, end, info->fscrypt_file,
231
info->fscrypt_file_len, bad);
232
}
233
}
234
*p = end;
235
} else {
236
/* legacy (unversioned) struct */
237
if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
238
ceph_decode_64_safe(p, end, info->inline_version, bad);
239
ceph_decode_32_safe(p, end, info->inline_len, bad);
240
ceph_decode_need(p, end, info->inline_len, bad);
241
info->inline_data = *p;
242
*p += info->inline_len;
243
} else
244
info->inline_version = CEPH_INLINE_NONE;
245
246
if (features & CEPH_FEATURE_MDS_QUOTA) {
247
err = parse_reply_info_quota(p, end, info);
248
if (err < 0)
249
goto out_bad;
250
} else {
251
info->max_bytes = 0;
252
info->max_files = 0;
253
}
254
255
info->pool_ns_len = 0;
256
info->pool_ns_data = NULL;
257
if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
258
ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
259
if (info->pool_ns_len > 0) {
260
ceph_decode_need(p, end, info->pool_ns_len, bad);
261
info->pool_ns_data = *p;
262
*p += info->pool_ns_len;
263
}
264
}
265
266
if (features & CEPH_FEATURE_FS_BTIME) {
267
ceph_decode_need(p, end, sizeof(info->btime), bad);
268
ceph_decode_copy(p, &info->btime, sizeof(info->btime));
269
ceph_decode_64_safe(p, end, info->change_attr, bad);
270
}
271
272
info->dir_pin = -ENODATA;
273
/* info->snap_btime and info->rsnaps remain zero */
274
}
275
return 0;
276
bad:
277
err = -EIO;
278
out_bad:
279
return err;
280
}
281
282
static int parse_reply_info_dir(void **p, void *end,
283
struct ceph_mds_reply_dirfrag **dirfrag,
284
u64 features)
285
{
286
if (features == (u64)-1) {
287
u8 struct_v, struct_compat;
288
u32 struct_len;
289
ceph_decode_8_safe(p, end, struct_v, bad);
290
ceph_decode_8_safe(p, end, struct_compat, bad);
291
/* struct_v is expected to be >= 1. we only understand
292
* encoding whose struct_compat == 1. */
293
if (!struct_v || struct_compat != 1)
294
goto bad;
295
ceph_decode_32_safe(p, end, struct_len, bad);
296
ceph_decode_need(p, end, struct_len, bad);
297
end = *p + struct_len;
298
}
299
300
ceph_decode_need(p, end, sizeof(**dirfrag), bad);
301
*dirfrag = *p;
302
*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
303
if (unlikely(*p > end))
304
goto bad;
305
if (features == (u64)-1)
306
*p = end;
307
return 0;
308
bad:
309
return -EIO;
310
}
311
312
static int parse_reply_info_lease(void **p, void *end,
313
struct ceph_mds_reply_lease **lease,
314
u64 features, u32 *altname_len, u8 **altname)
315
{
316
u8 struct_v;
317
u32 struct_len;
318
void *lend;
319
320
if (features == (u64)-1) {
321
u8 struct_compat;
322
323
ceph_decode_8_safe(p, end, struct_v, bad);
324
ceph_decode_8_safe(p, end, struct_compat, bad);
325
326
/* struct_v is expected to be >= 1. we only understand
327
* encoding whose struct_compat == 1. */
328
if (!struct_v || struct_compat != 1)
329
goto bad;
330
331
ceph_decode_32_safe(p, end, struct_len, bad);
332
} else {
333
struct_len = sizeof(**lease);
334
*altname_len = 0;
335
*altname = NULL;
336
}
337
338
lend = *p + struct_len;
339
ceph_decode_need(p, end, struct_len, bad);
340
*lease = *p;
341
*p += sizeof(**lease);
342
343
if (features == (u64)-1) {
344
if (struct_v >= 2) {
345
ceph_decode_32_safe(p, end, *altname_len, bad);
346
ceph_decode_need(p, end, *altname_len, bad);
347
*altname = *p;
348
*p += *altname_len;
349
} else {
350
*altname = NULL;
351
*altname_len = 0;
352
}
353
}
354
*p = lend;
355
return 0;
356
bad:
357
return -EIO;
358
}
359
360
/*
361
* parse a normal reply, which may contain a (dir+)dentry and/or a
362
* target inode.
363
*/
364
static int parse_reply_info_trace(void **p, void *end,
365
struct ceph_mds_reply_info_parsed *info,
366
u64 features)
367
{
368
int err;
369
370
if (info->head->is_dentry) {
371
err = parse_reply_info_in(p, end, &info->diri, features);
372
if (err < 0)
373
goto out_bad;
374
375
err = parse_reply_info_dir(p, end, &info->dirfrag, features);
376
if (err < 0)
377
goto out_bad;
378
379
ceph_decode_32_safe(p, end, info->dname_len, bad);
380
ceph_decode_need(p, end, info->dname_len, bad);
381
info->dname = *p;
382
*p += info->dname_len;
383
384
err = parse_reply_info_lease(p, end, &info->dlease, features,
385
&info->altname_len, &info->altname);
386
if (err < 0)
387
goto out_bad;
388
}
389
390
if (info->head->is_target) {
391
err = parse_reply_info_in(p, end, &info->targeti, features);
392
if (err < 0)
393
goto out_bad;
394
}
395
396
if (unlikely(*p != end))
397
goto bad;
398
return 0;
399
400
bad:
401
err = -EIO;
402
out_bad:
403
pr_err("problem parsing mds trace %d\n", err);
404
return err;
405
}
406
407
/*
408
* parse readdir results
409
*/
410
static int parse_reply_info_readdir(void **p, void *end,
411
struct ceph_mds_request *req,
412
u64 features)
413
{
414
struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
415
struct ceph_client *cl = req->r_mdsc->fsc->client;
416
u32 num, i = 0;
417
int err;
418
419
err = parse_reply_info_dir(p, end, &info->dir_dir, features);
420
if (err < 0)
421
goto out_bad;
422
423
ceph_decode_need(p, end, sizeof(num) + 2, bad);
424
num = ceph_decode_32(p);
425
{
426
u16 flags = ceph_decode_16(p);
427
info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
428
info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
429
info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
430
info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
431
}
432
if (num == 0)
433
goto done;
434
435
BUG_ON(!info->dir_entries);
436
if ((unsigned long)(info->dir_entries + num) >
437
(unsigned long)info->dir_entries + info->dir_buf_size) {
438
pr_err_client(cl, "dir contents are larger than expected\n");
439
WARN_ON(1);
440
goto bad;
441
}
442
443
info->dir_nr = num;
444
while (num) {
445
struct inode *inode = d_inode(req->r_dentry);
446
struct ceph_inode_info *ci = ceph_inode(inode);
447
struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
448
struct fscrypt_str tname = FSTR_INIT(NULL, 0);
449
struct fscrypt_str oname = FSTR_INIT(NULL, 0);
450
struct ceph_fname fname;
451
u32 altname_len, _name_len;
452
u8 *altname, *_name;
453
454
/* dentry */
455
ceph_decode_32_safe(p, end, _name_len, bad);
456
ceph_decode_need(p, end, _name_len, bad);
457
_name = *p;
458
*p += _name_len;
459
doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name);
460
461
if (info->hash_order)
462
rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
463
_name, _name_len);
464
465
/* dentry lease */
466
err = parse_reply_info_lease(p, end, &rde->lease, features,
467
&altname_len, &altname);
468
if (err)
469
goto out_bad;
470
471
/*
472
* Try to dencrypt the dentry names and update them
473
* in the ceph_mds_reply_dir_entry struct.
474
*/
475
fname.dir = inode;
476
fname.name = _name;
477
fname.name_len = _name_len;
478
fname.ctext = altname;
479
fname.ctext_len = altname_len;
480
/*
481
* The _name_len maybe larger than altname_len, such as
482
* when the human readable name length is in range of
483
* (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE),
484
* then the copy in ceph_fname_to_usr will corrupt the
485
* data if there has no encryption key.
486
*
487
* Just set the no_copy flag and then if there has no
488
* encryption key the oname.name will be assigned to
489
* _name always.
490
*/
491
fname.no_copy = true;
492
if (altname_len == 0) {
493
/*
494
* Set tname to _name, and this will be used
495
* to do the base64_decode in-place. It's
496
* safe because the decoded string should
497
* always be shorter, which is 3/4 of origin
498
* string.
499
*/
500
tname.name = _name;
501
502
/*
503
* Set oname to _name too, and this will be
504
* used to do the dencryption in-place.
505
*/
506
oname.name = _name;
507
oname.len = _name_len;
508
} else {
509
/*
510
* This will do the decryption only in-place
511
* from altname cryptext directly.
512
*/
513
oname.name = altname;
514
oname.len = altname_len;
515
}
516
rde->is_nokey = false;
517
err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey);
518
if (err) {
519
pr_err_client(cl, "unable to decode %.*s, got %d\n",
520
_name_len, _name, err);
521
goto out_bad;
522
}
523
rde->name = oname.name;
524
rde->name_len = oname.len;
525
526
/* inode */
527
err = parse_reply_info_in(p, end, &rde->inode, features);
528
if (err < 0)
529
goto out_bad;
530
/* ceph_readdir_prepopulate() will update it */
531
rde->offset = 0;
532
i++;
533
num--;
534
}
535
536
done:
537
/* Skip over any unrecognized fields */
538
*p = end;
539
return 0;
540
541
bad:
542
err = -EIO;
543
out_bad:
544
pr_err_client(cl, "problem parsing dir contents %d\n", err);
545
return err;
546
}
547
548
/*
549
* parse fcntl F_GETLK results
550
*/
551
static int parse_reply_info_filelock(void **p, void *end,
552
struct ceph_mds_reply_info_parsed *info,
553
u64 features)
554
{
555
if (*p + sizeof(*info->filelock_reply) > end)
556
goto bad;
557
558
info->filelock_reply = *p;
559
560
/* Skip over any unrecognized fields */
561
*p = end;
562
return 0;
563
bad:
564
return -EIO;
565
}
566
567
568
#if BITS_PER_LONG == 64
569
570
#define DELEGATED_INO_AVAILABLE xa_mk_value(1)
571
572
static int ceph_parse_deleg_inos(void **p, void *end,
573
struct ceph_mds_session *s)
574
{
575
struct ceph_client *cl = s->s_mdsc->fsc->client;
576
u32 sets;
577
578
ceph_decode_32_safe(p, end, sets, bad);
579
doutc(cl, "got %u sets of delegated inodes\n", sets);
580
while (sets--) {
581
u64 start, len;
582
583
ceph_decode_64_safe(p, end, start, bad);
584
ceph_decode_64_safe(p, end, len, bad);
585
586
/* Don't accept a delegation of system inodes */
587
if (start < CEPH_INO_SYSTEM_BASE) {
588
pr_warn_ratelimited_client(cl,
589
"ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
590
start, len);
591
continue;
592
}
593
while (len--) {
594
int err = xa_insert(&s->s_delegated_inos, start++,
595
DELEGATED_INO_AVAILABLE,
596
GFP_KERNEL);
597
if (!err) {
598
doutc(cl, "added delegated inode 0x%llx\n", start - 1);
599
} else if (err == -EBUSY) {
600
pr_warn_client(cl,
601
"MDS delegated inode 0x%llx more than once.\n",
602
start - 1);
603
} else {
604
return err;
605
}
606
}
607
}
608
return 0;
609
bad:
610
return -EIO;
611
}
612
613
u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
614
{
615
unsigned long ino;
616
void *val;
617
618
xa_for_each(&s->s_delegated_inos, ino, val) {
619
val = xa_erase(&s->s_delegated_inos, ino);
620
if (val == DELEGATED_INO_AVAILABLE)
621
return ino;
622
}
623
return 0;
624
}
625
626
int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
627
{
628
return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
629
GFP_KERNEL);
630
}
631
#else /* BITS_PER_LONG == 64 */
632
/*
633
* FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
634
* ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
635
* and bottom words?
636
*/
637
static int ceph_parse_deleg_inos(void **p, void *end,
638
struct ceph_mds_session *s)
639
{
640
u32 sets;
641
642
ceph_decode_32_safe(p, end, sets, bad);
643
if (sets)
644
ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
645
return 0;
646
bad:
647
return -EIO;
648
}
649
650
u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
651
{
652
return 0;
653
}
654
655
int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
656
{
657
return 0;
658
}
659
#endif /* BITS_PER_LONG == 64 */
660
661
/*
662
* parse create results
663
*/
664
static int parse_reply_info_create(void **p, void *end,
665
struct ceph_mds_reply_info_parsed *info,
666
u64 features, struct ceph_mds_session *s)
667
{
668
int ret;
669
670
if (features == (u64)-1 ||
671
(features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
672
if (*p == end) {
673
/* Malformed reply? */
674
info->has_create_ino = false;
675
} else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
676
info->has_create_ino = true;
677
/* struct_v, struct_compat, and len */
678
ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
679
ceph_decode_64_safe(p, end, info->ino, bad);
680
ret = ceph_parse_deleg_inos(p, end, s);
681
if (ret)
682
return ret;
683
} else {
684
/* legacy */
685
ceph_decode_64_safe(p, end, info->ino, bad);
686
info->has_create_ino = true;
687
}
688
} else {
689
if (*p != end)
690
goto bad;
691
}
692
693
/* Skip over any unrecognized fields */
694
*p = end;
695
return 0;
696
bad:
697
return -EIO;
698
}
699
700
static int parse_reply_info_getvxattr(void **p, void *end,
701
struct ceph_mds_reply_info_parsed *info,
702
u64 features)
703
{
704
u32 value_len;
705
706
ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
707
ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
708
ceph_decode_skip_32(p, end, bad); /* skip payload length */
709
710
ceph_decode_32_safe(p, end, value_len, bad);
711
712
if (value_len == end - *p) {
713
info->xattr_info.xattr_value = *p;
714
info->xattr_info.xattr_value_len = value_len;
715
*p = end;
716
return value_len;
717
}
718
bad:
719
return -EIO;
720
}
721
722
/*
723
* parse extra results
724
*/
725
static int parse_reply_info_extra(void **p, void *end,
726
struct ceph_mds_request *req,
727
u64 features, struct ceph_mds_session *s)
728
{
729
struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
730
u32 op = le32_to_cpu(info->head->op);
731
732
if (op == CEPH_MDS_OP_GETFILELOCK)
733
return parse_reply_info_filelock(p, end, info, features);
734
else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
735
return parse_reply_info_readdir(p, end, req, features);
736
else if (op == CEPH_MDS_OP_CREATE)
737
return parse_reply_info_create(p, end, info, features, s);
738
else if (op == CEPH_MDS_OP_GETVXATTR)
739
return parse_reply_info_getvxattr(p, end, info, features);
740
else
741
return -EIO;
742
}
743
744
/*
745
* parse entire mds reply
746
*/
747
static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
748
struct ceph_mds_request *req, u64 features)
749
{
750
struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
751
struct ceph_client *cl = s->s_mdsc->fsc->client;
752
void *p, *end;
753
u32 len;
754
int err;
755
756
info->head = msg->front.iov_base;
757
p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
758
end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
759
760
/* trace */
761
ceph_decode_32_safe(&p, end, len, bad);
762
if (len > 0) {
763
ceph_decode_need(&p, end, len, bad);
764
err = parse_reply_info_trace(&p, p+len, info, features);
765
if (err < 0)
766
goto out_bad;
767
}
768
769
/* extra */
770
ceph_decode_32_safe(&p, end, len, bad);
771
if (len > 0) {
772
ceph_decode_need(&p, end, len, bad);
773
err = parse_reply_info_extra(&p, p+len, req, features, s);
774
if (err < 0)
775
goto out_bad;
776
}
777
778
/* snap blob */
779
ceph_decode_32_safe(&p, end, len, bad);
780
info->snapblob_len = len;
781
info->snapblob = p;
782
p += len;
783
784
if (p != end)
785
goto bad;
786
return 0;
787
788
bad:
789
err = -EIO;
790
out_bad:
791
pr_err_client(cl, "mds parse_reply err %d\n", err);
792
ceph_msg_dump(msg);
793
return err;
794
}
795
796
static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
797
{
798
int i;
799
800
kfree(info->diri.fscrypt_auth);
801
kfree(info->diri.fscrypt_file);
802
kfree(info->targeti.fscrypt_auth);
803
kfree(info->targeti.fscrypt_file);
804
if (!info->dir_entries)
805
return;
806
807
for (i = 0; i < info->dir_nr; i++) {
808
struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
809
810
kfree(rde->inode.fscrypt_auth);
811
kfree(rde->inode.fscrypt_file);
812
}
813
free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
814
}
815
816
/*
817
* In async unlink case the kclient won't wait for the first reply
818
* from MDS and just drop all the links and unhash the dentry and then
819
* succeeds immediately.
820
*
821
* For any new create/link/rename,etc requests followed by using the
822
* same file names we must wait for the first reply of the inflight
823
* unlink request, or the MDS possibly will fail these following
824
* requests with -EEXIST if the inflight async unlink request was
825
* delayed for some reasons.
826
*
827
* And the worst case is that for the none async openc request it will
828
* successfully open the file if the CDentry hasn't been unlinked yet,
829
* but later the previous delayed async unlink request will remove the
830
* CDentry. That means the just created file is possibly deleted later
831
* by accident.
832
*
833
* We need to wait for the inflight async unlink requests to finish
834
* when creating new files/directories by using the same file names.
835
*/
836
int ceph_wait_on_conflict_unlink(struct dentry *dentry)
837
{
838
struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb);
839
struct ceph_client *cl = fsc->client;
840
struct dentry *pdentry = dentry->d_parent;
841
struct dentry *udentry, *found = NULL;
842
struct ceph_dentry_info *di;
843
struct qstr dname;
844
u32 hash = dentry->d_name.hash;
845
int err;
846
847
dname.name = dentry->d_name.name;
848
dname.len = dentry->d_name.len;
849
850
rcu_read_lock();
851
hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
852
hnode, hash) {
853
udentry = di->dentry;
854
855
spin_lock(&udentry->d_lock);
856
if (udentry->d_name.hash != hash)
857
goto next;
858
if (unlikely(udentry->d_parent != pdentry))
859
goto next;
860
if (!hash_hashed(&di->hnode))
861
goto next;
862
863
if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
864
pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n",
865
dentry, dentry);
866
867
if (!d_same_name(udentry, pdentry, &dname))
868
goto next;
869
870
found = dget_dlock(udentry);
871
spin_unlock(&udentry->d_lock);
872
break;
873
next:
874
spin_unlock(&udentry->d_lock);
875
}
876
rcu_read_unlock();
877
878
if (likely(!found))
879
return 0;
880
881
doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry,
882
found, found);
883
884
err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
885
TASK_KILLABLE);
886
dput(found);
887
return err;
888
}
889
890
891
/*
892
* sessions
893
*/
894
const char *ceph_session_state_name(int s)
895
{
896
switch (s) {
897
case CEPH_MDS_SESSION_NEW: return "new";
898
case CEPH_MDS_SESSION_OPENING: return "opening";
899
case CEPH_MDS_SESSION_OPEN: return "open";
900
case CEPH_MDS_SESSION_HUNG: return "hung";
901
case CEPH_MDS_SESSION_CLOSING: return "closing";
902
case CEPH_MDS_SESSION_CLOSED: return "closed";
903
case CEPH_MDS_SESSION_RESTARTING: return "restarting";
904
case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
905
case CEPH_MDS_SESSION_REJECTED: return "rejected";
906
default: return "???";
907
}
908
}
909
910
struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
911
{
912
if (refcount_inc_not_zero(&s->s_ref))
913
return s;
914
return NULL;
915
}
916
917
void ceph_put_mds_session(struct ceph_mds_session *s)
918
{
919
if (IS_ERR_OR_NULL(s))
920
return;
921
922
if (refcount_dec_and_test(&s->s_ref)) {
923
if (s->s_auth.authorizer)
924
ceph_auth_destroy_authorizer(s->s_auth.authorizer);
925
WARN_ON(mutex_is_locked(&s->s_mutex));
926
xa_destroy(&s->s_delegated_inos);
927
kfree(s);
928
}
929
}
930
931
/*
932
* called under mdsc->mutex
933
*/
934
struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
935
int mds)
936
{
937
if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
938
return NULL;
939
return ceph_get_mds_session(mdsc->sessions[mds]);
940
}
941
942
static bool __have_session(struct ceph_mds_client *mdsc, int mds)
943
{
944
if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
945
return false;
946
else
947
return true;
948
}
949
950
static int __verify_registered_session(struct ceph_mds_client *mdsc,
951
struct ceph_mds_session *s)
952
{
953
if (s->s_mds >= mdsc->max_sessions ||
954
mdsc->sessions[s->s_mds] != s)
955
return -ENOENT;
956
return 0;
957
}
958
959
/*
960
* create+register a new session for given mds.
961
* called under mdsc->mutex.
962
*/
963
static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
964
int mds)
965
{
966
struct ceph_client *cl = mdsc->fsc->client;
967
struct ceph_mds_session *s;
968
969
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
970
return ERR_PTR(-EIO);
971
972
if (mds >= mdsc->mdsmap->possible_max_rank)
973
return ERR_PTR(-EINVAL);
974
975
s = kzalloc(sizeof(*s), GFP_NOFS);
976
if (!s)
977
return ERR_PTR(-ENOMEM);
978
979
if (mds >= mdsc->max_sessions) {
980
int newmax = 1 << get_count_order(mds + 1);
981
struct ceph_mds_session **sa;
982
983
doutc(cl, "realloc to %d\n", newmax);
984
sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
985
if (!sa)
986
goto fail_realloc;
987
if (mdsc->sessions) {
988
memcpy(sa, mdsc->sessions,
989
mdsc->max_sessions * sizeof(void *));
990
kfree(mdsc->sessions);
991
}
992
mdsc->sessions = sa;
993
mdsc->max_sessions = newmax;
994
}
995
996
doutc(cl, "mds%d\n", mds);
997
s->s_mdsc = mdsc;
998
s->s_mds = mds;
999
s->s_state = CEPH_MDS_SESSION_NEW;
1000
mutex_init(&s->s_mutex);
1001
1002
ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
1003
1004
atomic_set(&s->s_cap_gen, 1);
1005
s->s_cap_ttl = jiffies - 1;
1006
1007
spin_lock_init(&s->s_cap_lock);
1008
INIT_LIST_HEAD(&s->s_caps);
1009
refcount_set(&s->s_ref, 1);
1010
INIT_LIST_HEAD(&s->s_waiting);
1011
INIT_LIST_HEAD(&s->s_unsafe);
1012
xa_init(&s->s_delegated_inos);
1013
INIT_LIST_HEAD(&s->s_cap_releases);
1014
INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
1015
1016
INIT_LIST_HEAD(&s->s_cap_dirty);
1017
INIT_LIST_HEAD(&s->s_cap_flushing);
1018
1019
mdsc->sessions[mds] = s;
1020
atomic_inc(&mdsc->num_sessions);
1021
refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */
1022
1023
ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
1024
ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
1025
1026
return s;
1027
1028
fail_realloc:
1029
kfree(s);
1030
return ERR_PTR(-ENOMEM);
1031
}
1032
1033
/*
1034
* called under mdsc->mutex
1035
*/
1036
static void __unregister_session(struct ceph_mds_client *mdsc,
1037
struct ceph_mds_session *s)
1038
{
1039
doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s);
1040
BUG_ON(mdsc->sessions[s->s_mds] != s);
1041
mdsc->sessions[s->s_mds] = NULL;
1042
ceph_con_close(&s->s_con);
1043
ceph_put_mds_session(s);
1044
atomic_dec(&mdsc->num_sessions);
1045
}
1046
1047
/*
1048
* drop session refs in request.
1049
*
1050
* should be last request ref, or hold mdsc->mutex
1051
*/
1052
static void put_request_session(struct ceph_mds_request *req)
1053
{
1054
if (req->r_session) {
1055
ceph_put_mds_session(req->r_session);
1056
req->r_session = NULL;
1057
}
1058
}
1059
1060
void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
1061
void (*cb)(struct ceph_mds_session *),
1062
bool check_state)
1063
{
1064
int mds;
1065
1066
mutex_lock(&mdsc->mutex);
1067
for (mds = 0; mds < mdsc->max_sessions; ++mds) {
1068
struct ceph_mds_session *s;
1069
1070
s = __ceph_lookup_mds_session(mdsc, mds);
1071
if (!s)
1072
continue;
1073
1074
if (check_state && !check_session_state(s)) {
1075
ceph_put_mds_session(s);
1076
continue;
1077
}
1078
1079
mutex_unlock(&mdsc->mutex);
1080
cb(s);
1081
ceph_put_mds_session(s);
1082
mutex_lock(&mdsc->mutex);
1083
}
1084
mutex_unlock(&mdsc->mutex);
1085
}
1086
1087
void ceph_mdsc_release_request(struct kref *kref)
1088
{
1089
struct ceph_mds_request *req = container_of(kref,
1090
struct ceph_mds_request,
1091
r_kref);
1092
ceph_mdsc_release_dir_caps_async(req);
1093
destroy_reply_info(&req->r_reply_info);
1094
if (req->r_request)
1095
ceph_msg_put(req->r_request);
1096
if (req->r_reply)
1097
ceph_msg_put(req->r_reply);
1098
if (req->r_inode) {
1099
ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1100
iput(req->r_inode);
1101
}
1102
if (req->r_parent) {
1103
ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
1104
iput(req->r_parent);
1105
}
1106
iput(req->r_target_inode);
1107
iput(req->r_new_inode);
1108
if (req->r_dentry)
1109
dput(req->r_dentry);
1110
if (req->r_old_dentry)
1111
dput(req->r_old_dentry);
1112
if (req->r_old_dentry_dir) {
1113
/*
1114
* track (and drop pins for) r_old_dentry_dir
1115
* separately, since r_old_dentry's d_parent may have
1116
* changed between the dir mutex being dropped and
1117
* this request being freed.
1118
*/
1119
ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
1120
CEPH_CAP_PIN);
1121
iput(req->r_old_dentry_dir);
1122
}
1123
kfree(req->r_path1);
1124
kfree(req->r_path2);
1125
put_cred(req->r_cred);
1126
if (req->r_mnt_idmap)
1127
mnt_idmap_put(req->r_mnt_idmap);
1128
if (req->r_pagelist)
1129
ceph_pagelist_release(req->r_pagelist);
1130
kfree(req->r_fscrypt_auth);
1131
kfree(req->r_altname);
1132
put_request_session(req);
1133
ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
1134
WARN_ON_ONCE(!list_empty(&req->r_wait));
1135
kmem_cache_free(ceph_mds_request_cachep, req);
1136
}
1137
1138
DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
1139
1140
/*
1141
* lookup session, bump ref if found.
1142
*
1143
* called under mdsc->mutex.
1144
*/
1145
static struct ceph_mds_request *
1146
lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
1147
{
1148
struct ceph_mds_request *req;
1149
1150
req = lookup_request(&mdsc->request_tree, tid);
1151
if (req)
1152
ceph_mdsc_get_request(req);
1153
1154
return req;
1155
}
1156
1157
/*
1158
* Register an in-flight request, and assign a tid. Link to directory
1159
* are modifying (if any).
1160
*
1161
* Called under mdsc->mutex.
1162
*/
1163
static void __register_request(struct ceph_mds_client *mdsc,
1164
struct ceph_mds_request *req,
1165
struct inode *dir)
1166
{
1167
struct ceph_client *cl = mdsc->fsc->client;
1168
int ret = 0;
1169
1170
req->r_tid = ++mdsc->last_tid;
1171
if (req->r_num_caps) {
1172
ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
1173
req->r_num_caps);
1174
if (ret < 0) {
1175
pr_err_client(cl, "%p failed to reserve caps: %d\n",
1176
req, ret);
1177
/* set req->r_err to fail early from __do_request */
1178
req->r_err = ret;
1179
return;
1180
}
1181
}
1182
doutc(cl, "%p tid %lld\n", req, req->r_tid);
1183
ceph_mdsc_get_request(req);
1184
insert_request(&mdsc->request_tree, req);
1185
1186
req->r_cred = get_current_cred();
1187
if (!req->r_mnt_idmap)
1188
req->r_mnt_idmap = &nop_mnt_idmap;
1189
1190
if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
1191
mdsc->oldest_tid = req->r_tid;
1192
1193
if (dir) {
1194
struct ceph_inode_info *ci = ceph_inode(dir);
1195
1196
ihold(dir);
1197
req->r_unsafe_dir = dir;
1198
spin_lock(&ci->i_unsafe_lock);
1199
list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
1200
spin_unlock(&ci->i_unsafe_lock);
1201
}
1202
}
1203
1204
static void __unregister_request(struct ceph_mds_client *mdsc,
1205
struct ceph_mds_request *req)
1206
{
1207
doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid);
1208
1209
/* Never leave an unregistered request on an unsafe list! */
1210
list_del_init(&req->r_unsafe_item);
1211
1212
if (req->r_tid == mdsc->oldest_tid) {
1213
struct rb_node *p = rb_next(&req->r_node);
1214
mdsc->oldest_tid = 0;
1215
while (p) {
1216
struct ceph_mds_request *next_req =
1217
rb_entry(p, struct ceph_mds_request, r_node);
1218
if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
1219
mdsc->oldest_tid = next_req->r_tid;
1220
break;
1221
}
1222
p = rb_next(p);
1223
}
1224
}
1225
1226
erase_request(&mdsc->request_tree, req);
1227
1228
if (req->r_unsafe_dir) {
1229
struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
1230
spin_lock(&ci->i_unsafe_lock);
1231
list_del_init(&req->r_unsafe_dir_item);
1232
spin_unlock(&ci->i_unsafe_lock);
1233
}
1234
if (req->r_target_inode &&
1235
test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
1236
struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
1237
spin_lock(&ci->i_unsafe_lock);
1238
list_del_init(&req->r_unsafe_target_item);
1239
spin_unlock(&ci->i_unsafe_lock);
1240
}
1241
1242
if (req->r_unsafe_dir) {
1243
iput(req->r_unsafe_dir);
1244
req->r_unsafe_dir = NULL;
1245
}
1246
1247
complete_all(&req->r_safe_completion);
1248
1249
ceph_mdsc_put_request(req);
1250
}
1251
1252
/*
1253
* Walk back up the dentry tree until we hit a dentry representing a
1254
* non-snapshot inode. We do this using the rcu_read_lock (which must be held
1255
* when calling this) to ensure that the objects won't disappear while we're
1256
* working with them. Once we hit a candidate dentry, we attempt to take a
1257
* reference to it, and return that as the result.
1258
*/
1259
static struct inode *get_nonsnap_parent(struct dentry *dentry)
1260
{
1261
struct inode *inode = NULL;
1262
1263
while (dentry && !IS_ROOT(dentry)) {
1264
inode = d_inode_rcu(dentry);
1265
if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1266
break;
1267
dentry = dentry->d_parent;
1268
}
1269
if (inode)
1270
inode = igrab(inode);
1271
return inode;
1272
}
1273
1274
/*
1275
* Choose mds to send request to next. If there is a hint set in the
1276
* request (e.g., due to a prior forward hint from the mds), use that.
1277
* Otherwise, consult frag tree and/or caps to identify the
1278
* appropriate mds. If all else fails, choose randomly.
1279
*
1280
* Called under mdsc->mutex.
1281
*/
1282
static int __choose_mds(struct ceph_mds_client *mdsc,
1283
struct ceph_mds_request *req,
1284
bool *random)
1285
{
1286
struct inode *inode;
1287
struct ceph_inode_info *ci;
1288
struct ceph_cap *cap;
1289
int mode = req->r_direct_mode;
1290
int mds = -1;
1291
u32 hash = req->r_direct_hash;
1292
bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1293
struct ceph_client *cl = mdsc->fsc->client;
1294
1295
if (random)
1296
*random = false;
1297
1298
/*
1299
* is there a specific mds we should try? ignore hint if we have
1300
* no session and the mds is not up (active or recovering).
1301
*/
1302
if (req->r_resend_mds >= 0 &&
1303
(__have_session(mdsc, req->r_resend_mds) ||
1304
ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1305
doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds);
1306
return req->r_resend_mds;
1307
}
1308
1309
if (mode == USE_RANDOM_MDS)
1310
goto random;
1311
1312
inode = NULL;
1313
if (req->r_inode) {
1314
if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1315
inode = req->r_inode;
1316
ihold(inode);
1317
} else {
1318
/* req->r_dentry is non-null for LSSNAP request */
1319
rcu_read_lock();
1320
inode = get_nonsnap_parent(req->r_dentry);
1321
rcu_read_unlock();
1322
doutc(cl, "using snapdir's parent %p %llx.%llx\n",
1323
inode, ceph_vinop(inode));
1324
}
1325
} else if (req->r_dentry) {
1326
/* ignore race with rename; old or new d_parent is okay */
1327
struct dentry *parent;
1328
struct inode *dir;
1329
1330
rcu_read_lock();
1331
parent = READ_ONCE(req->r_dentry->d_parent);
1332
dir = req->r_parent ? : d_inode_rcu(parent);
1333
1334
if (!dir || dir->i_sb != mdsc->fsc->sb) {
1335
/* not this fs or parent went negative */
1336
inode = d_inode(req->r_dentry);
1337
if (inode)
1338
ihold(inode);
1339
} else if (ceph_snap(dir) != CEPH_NOSNAP) {
1340
/* direct snapped/virtual snapdir requests
1341
* based on parent dir inode */
1342
inode = get_nonsnap_parent(parent);
1343
doutc(cl, "using nonsnap parent %p %llx.%llx\n",
1344
inode, ceph_vinop(inode));
1345
} else {
1346
/* dentry target */
1347
inode = d_inode(req->r_dentry);
1348
if (!inode || mode == USE_AUTH_MDS) {
1349
/* dir + name */
1350
inode = igrab(dir);
1351
hash = ceph_dentry_hash(dir, req->r_dentry);
1352
is_hash = true;
1353
} else {
1354
ihold(inode);
1355
}
1356
}
1357
rcu_read_unlock();
1358
}
1359
1360
if (!inode)
1361
goto random;
1362
1363
doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode,
1364
ceph_vinop(inode), (int)is_hash, hash, mode);
1365
ci = ceph_inode(inode);
1366
1367
if (is_hash && S_ISDIR(inode->i_mode)) {
1368
struct ceph_inode_frag frag;
1369
int found;
1370
1371
ceph_choose_frag(ci, hash, &frag, &found);
1372
if (found) {
1373
if (mode == USE_ANY_MDS && frag.ndist > 0) {
1374
u8 r;
1375
1376
/* choose a random replica */
1377
get_random_bytes(&r, 1);
1378
r %= frag.ndist;
1379
mds = frag.dist[r];
1380
doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n",
1381
inode, ceph_vinop(inode), frag.frag,
1382
mds, (int)r, frag.ndist);
1383
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1384
CEPH_MDS_STATE_ACTIVE &&
1385
!ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1386
goto out;
1387
}
1388
1389
/* since this file/dir wasn't known to be
1390
* replicated, then we want to look for the
1391
* authoritative mds. */
1392
if (frag.mds >= 0) {
1393
/* choose auth mds */
1394
mds = frag.mds;
1395
doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n",
1396
inode, ceph_vinop(inode), frag.frag, mds);
1397
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1398
CEPH_MDS_STATE_ACTIVE) {
1399
if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1400
mds))
1401
goto out;
1402
}
1403
}
1404
mode = USE_AUTH_MDS;
1405
}
1406
}
1407
1408
spin_lock(&ci->i_ceph_lock);
1409
cap = NULL;
1410
if (mode == USE_AUTH_MDS)
1411
cap = ci->i_auth_cap;
1412
if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1413
cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1414
if (!cap) {
1415
spin_unlock(&ci->i_ceph_lock);
1416
iput(inode);
1417
goto random;
1418
}
1419
mds = cap->session->s_mds;
1420
doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode,
1421
ceph_vinop(inode), mds,
1422
cap == ci->i_auth_cap ? "auth " : "", cap);
1423
spin_unlock(&ci->i_ceph_lock);
1424
out:
1425
iput(inode);
1426
return mds;
1427
1428
random:
1429
if (random)
1430
*random = true;
1431
1432
mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1433
doutc(cl, "chose random mds%d\n", mds);
1434
return mds;
1435
}
1436
1437
1438
/*
1439
* session messages
1440
*/
1441
struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1442
{
1443
struct ceph_msg *msg;
1444
struct ceph_mds_session_head *h;
1445
1446
msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1447
false);
1448
if (!msg) {
1449
pr_err("ENOMEM creating session %s msg\n",
1450
ceph_session_op_name(op));
1451
return NULL;
1452
}
1453
h = msg->front.iov_base;
1454
h->op = cpu_to_le32(op);
1455
h->seq = cpu_to_le64(seq);
1456
1457
return msg;
1458
}
1459
1460
static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1461
#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1462
static int encode_supported_features(void **p, void *end)
1463
{
1464
static const size_t count = ARRAY_SIZE(feature_bits);
1465
1466
if (count > 0) {
1467
size_t i;
1468
size_t size = FEATURE_BYTES(count);
1469
unsigned long bit;
1470
1471
if (WARN_ON_ONCE(*p + 4 + size > end))
1472
return -ERANGE;
1473
1474
ceph_encode_32(p, size);
1475
memset(*p, 0, size);
1476
for (i = 0; i < count; i++) {
1477
bit = feature_bits[i];
1478
((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1479
}
1480
*p += size;
1481
} else {
1482
if (WARN_ON_ONCE(*p + 4 > end))
1483
return -ERANGE;
1484
1485
ceph_encode_32(p, 0);
1486
}
1487
1488
return 0;
1489
}
1490
1491
static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1492
#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1493
static int encode_metric_spec(void **p, void *end)
1494
{
1495
static const size_t count = ARRAY_SIZE(metric_bits);
1496
1497
/* header */
1498
if (WARN_ON_ONCE(*p + 2 > end))
1499
return -ERANGE;
1500
1501
ceph_encode_8(p, 1); /* version */
1502
ceph_encode_8(p, 1); /* compat */
1503
1504
if (count > 0) {
1505
size_t i;
1506
size_t size = METRIC_BYTES(count);
1507
1508
if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1509
return -ERANGE;
1510
1511
/* metric spec info length */
1512
ceph_encode_32(p, 4 + size);
1513
1514
/* metric spec */
1515
ceph_encode_32(p, size);
1516
memset(*p, 0, size);
1517
for (i = 0; i < count; i++)
1518
((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1519
*p += size;
1520
} else {
1521
if (WARN_ON_ONCE(*p + 4 + 4 > end))
1522
return -ERANGE;
1523
1524
/* metric spec info length */
1525
ceph_encode_32(p, 4);
1526
/* metric spec */
1527
ceph_encode_32(p, 0);
1528
}
1529
1530
return 0;
1531
}
1532
1533
/*
1534
* session message, specialization for CEPH_SESSION_REQUEST_OPEN
1535
* to include additional client metadata fields.
1536
*/
1537
static struct ceph_msg *
1538
create_session_full_msg(struct ceph_mds_client *mdsc, int op, u64 seq)
1539
{
1540
struct ceph_msg *msg;
1541
struct ceph_mds_session_head *h;
1542
int i;
1543
int extra_bytes = 0;
1544
int metadata_key_count = 0;
1545
struct ceph_options *opt = mdsc->fsc->client->options;
1546
struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1547
struct ceph_client *cl = mdsc->fsc->client;
1548
size_t size, count;
1549
void *p, *end;
1550
int ret;
1551
1552
const char* metadata[][2] = {
1553
{"hostname", mdsc->nodename},
1554
{"kernel_version", init_utsname()->release},
1555
{"entity_id", opt->name ? : ""},
1556
{"root", fsopt->server_path ? : "/"},
1557
{NULL, NULL}
1558
};
1559
1560
/* Calculate serialized length of metadata */
1561
extra_bytes = 4; /* map length */
1562
for (i = 0; metadata[i][0]; ++i) {
1563
extra_bytes += 8 + strlen(metadata[i][0]) +
1564
strlen(metadata[i][1]);
1565
metadata_key_count++;
1566
}
1567
1568
/* supported feature */
1569
size = 0;
1570
count = ARRAY_SIZE(feature_bits);
1571
if (count > 0)
1572
size = FEATURE_BYTES(count);
1573
extra_bytes += 4 + size;
1574
1575
/* metric spec */
1576
size = 0;
1577
count = ARRAY_SIZE(metric_bits);
1578
if (count > 0)
1579
size = METRIC_BYTES(count);
1580
extra_bytes += 2 + 4 + 4 + size;
1581
1582
/* flags, mds auth caps and oldest_client_tid */
1583
extra_bytes += 4 + 4 + 8;
1584
1585
/* Allocate the message */
1586
msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1587
GFP_NOFS, false);
1588
if (!msg) {
1589
pr_err_client(cl, "ENOMEM creating session open msg\n");
1590
return ERR_PTR(-ENOMEM);
1591
}
1592
p = msg->front.iov_base;
1593
end = p + msg->front.iov_len;
1594
1595
h = p;
1596
h->op = cpu_to_le32(op);
1597
h->seq = cpu_to_le64(seq);
1598
1599
/*
1600
* Serialize client metadata into waiting buffer space, using
1601
* the format that userspace expects for map<string, string>
1602
*
1603
* ClientSession messages with metadata are v7
1604
*/
1605
msg->hdr.version = cpu_to_le16(7);
1606
msg->hdr.compat_version = cpu_to_le16(1);
1607
1608
/* The write pointer, following the session_head structure */
1609
p += sizeof(*h);
1610
1611
/* Number of entries in the map */
1612
ceph_encode_32(&p, metadata_key_count);
1613
1614
/* Two length-prefixed strings for each entry in the map */
1615
for (i = 0; metadata[i][0]; ++i) {
1616
size_t const key_len = strlen(metadata[i][0]);
1617
size_t const val_len = strlen(metadata[i][1]);
1618
1619
ceph_encode_32(&p, key_len);
1620
memcpy(p, metadata[i][0], key_len);
1621
p += key_len;
1622
ceph_encode_32(&p, val_len);
1623
memcpy(p, metadata[i][1], val_len);
1624
p += val_len;
1625
}
1626
1627
ret = encode_supported_features(&p, end);
1628
if (ret) {
1629
pr_err_client(cl, "encode_supported_features failed!\n");
1630
ceph_msg_put(msg);
1631
return ERR_PTR(ret);
1632
}
1633
1634
ret = encode_metric_spec(&p, end);
1635
if (ret) {
1636
pr_err_client(cl, "encode_metric_spec failed!\n");
1637
ceph_msg_put(msg);
1638
return ERR_PTR(ret);
1639
}
1640
1641
/* version == 5, flags */
1642
ceph_encode_32(&p, 0);
1643
1644
/* version == 6, mds auth caps */
1645
ceph_encode_32(&p, 0);
1646
1647
/* version == 7, oldest_client_tid */
1648
ceph_encode_64(&p, mdsc->oldest_tid);
1649
1650
msg->front.iov_len = p - msg->front.iov_base;
1651
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1652
1653
return msg;
1654
}
1655
1656
/*
1657
* send session open request.
1658
*
1659
* called under mdsc->mutex
1660
*/
1661
static int __open_session(struct ceph_mds_client *mdsc,
1662
struct ceph_mds_session *session)
1663
{
1664
struct ceph_msg *msg;
1665
int mstate;
1666
int mds = session->s_mds;
1667
1668
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
1669
return -EIO;
1670
1671
/* wait for mds to go active? */
1672
mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1673
doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds,
1674
ceph_mds_state_name(mstate));
1675
session->s_state = CEPH_MDS_SESSION_OPENING;
1676
session->s_renew_requested = jiffies;
1677
1678
/* send connect message */
1679
msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_OPEN,
1680
session->s_seq);
1681
if (IS_ERR(msg))
1682
return PTR_ERR(msg);
1683
ceph_con_send(&session->s_con, msg);
1684
return 0;
1685
}
1686
1687
/*
1688
* open sessions for any export targets for the given mds
1689
*
1690
* called under mdsc->mutex
1691
*/
1692
static struct ceph_mds_session *
1693
__open_export_target_session(struct ceph_mds_client *mdsc, int target)
1694
{
1695
struct ceph_mds_session *session;
1696
int ret;
1697
1698
session = __ceph_lookup_mds_session(mdsc, target);
1699
if (!session) {
1700
session = register_session(mdsc, target);
1701
if (IS_ERR(session))
1702
return session;
1703
}
1704
if (session->s_state == CEPH_MDS_SESSION_NEW ||
1705
session->s_state == CEPH_MDS_SESSION_CLOSING) {
1706
ret = __open_session(mdsc, session);
1707
if (ret)
1708
return ERR_PTR(ret);
1709
}
1710
1711
return session;
1712
}
1713
1714
struct ceph_mds_session *
1715
ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1716
{
1717
struct ceph_mds_session *session;
1718
struct ceph_client *cl = mdsc->fsc->client;
1719
1720
doutc(cl, "to mds%d\n", target);
1721
1722
mutex_lock(&mdsc->mutex);
1723
session = __open_export_target_session(mdsc, target);
1724
mutex_unlock(&mdsc->mutex);
1725
1726
return session;
1727
}
1728
1729
static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1730
struct ceph_mds_session *session)
1731
{
1732
struct ceph_mds_info *mi;
1733
struct ceph_mds_session *ts;
1734
int i, mds = session->s_mds;
1735
struct ceph_client *cl = mdsc->fsc->client;
1736
1737
if (mds >= mdsc->mdsmap->possible_max_rank)
1738
return;
1739
1740
mi = &mdsc->mdsmap->m_info[mds];
1741
doutc(cl, "for mds%d (%d targets)\n", session->s_mds,
1742
mi->num_export_targets);
1743
1744
for (i = 0; i < mi->num_export_targets; i++) {
1745
ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1746
ceph_put_mds_session(ts);
1747
}
1748
}
1749
1750
/*
1751
* session caps
1752
*/
1753
1754
static void detach_cap_releases(struct ceph_mds_session *session,
1755
struct list_head *target)
1756
{
1757
struct ceph_client *cl = session->s_mdsc->fsc->client;
1758
1759
lockdep_assert_held(&session->s_cap_lock);
1760
1761
list_splice_init(&session->s_cap_releases, target);
1762
session->s_num_cap_releases = 0;
1763
doutc(cl, "mds%d\n", session->s_mds);
1764
}
1765
1766
static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1767
struct list_head *dispose)
1768
{
1769
while (!list_empty(dispose)) {
1770
struct ceph_cap *cap;
1771
/* zero out the in-progress message */
1772
cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1773
list_del(&cap->session_caps);
1774
ceph_put_cap(mdsc, cap);
1775
}
1776
}
1777
1778
static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1779
struct ceph_mds_session *session)
1780
{
1781
struct ceph_client *cl = mdsc->fsc->client;
1782
struct ceph_mds_request *req;
1783
struct rb_node *p;
1784
1785
doutc(cl, "mds%d\n", session->s_mds);
1786
mutex_lock(&mdsc->mutex);
1787
while (!list_empty(&session->s_unsafe)) {
1788
req = list_first_entry(&session->s_unsafe,
1789
struct ceph_mds_request, r_unsafe_item);
1790
pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n",
1791
req->r_tid);
1792
if (req->r_target_inode)
1793
mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1794
if (req->r_unsafe_dir)
1795
mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1796
__unregister_request(mdsc, req);
1797
}
1798
/* zero r_attempts, so kick_requests() will re-send requests */
1799
p = rb_first(&mdsc->request_tree);
1800
while (p) {
1801
req = rb_entry(p, struct ceph_mds_request, r_node);
1802
p = rb_next(p);
1803
if (req->r_session &&
1804
req->r_session->s_mds == session->s_mds)
1805
req->r_attempts = 0;
1806
}
1807
mutex_unlock(&mdsc->mutex);
1808
}
1809
1810
/*
1811
* Helper to safely iterate over all caps associated with a session, with
1812
* special care taken to handle a racing __ceph_remove_cap().
1813
*
1814
* Caller must hold session s_mutex.
1815
*/
1816
int ceph_iterate_session_caps(struct ceph_mds_session *session,
1817
int (*cb)(struct inode *, int mds, void *),
1818
void *arg)
1819
{
1820
struct ceph_client *cl = session->s_mdsc->fsc->client;
1821
struct list_head *p;
1822
struct ceph_cap *cap;
1823
struct inode *inode, *last_inode = NULL;
1824
struct ceph_cap *old_cap = NULL;
1825
int ret;
1826
1827
doutc(cl, "%p mds%d\n", session, session->s_mds);
1828
spin_lock(&session->s_cap_lock);
1829
p = session->s_caps.next;
1830
while (p != &session->s_caps) {
1831
int mds;
1832
1833
cap = list_entry(p, struct ceph_cap, session_caps);
1834
inode = igrab(&cap->ci->netfs.inode);
1835
if (!inode) {
1836
p = p->next;
1837
continue;
1838
}
1839
session->s_cap_iterator = cap;
1840
mds = cap->mds;
1841
spin_unlock(&session->s_cap_lock);
1842
1843
if (last_inode) {
1844
iput(last_inode);
1845
last_inode = NULL;
1846
}
1847
if (old_cap) {
1848
ceph_put_cap(session->s_mdsc, old_cap);
1849
old_cap = NULL;
1850
}
1851
1852
ret = cb(inode, mds, arg);
1853
last_inode = inode;
1854
1855
spin_lock(&session->s_cap_lock);
1856
p = p->next;
1857
if (!cap->ci) {
1858
doutc(cl, "finishing cap %p removal\n", cap);
1859
BUG_ON(cap->session != session);
1860
cap->session = NULL;
1861
list_del_init(&cap->session_caps);
1862
session->s_nr_caps--;
1863
atomic64_dec(&session->s_mdsc->metric.total_caps);
1864
if (cap->queue_release)
1865
__ceph_queue_cap_release(session, cap);
1866
else
1867
old_cap = cap; /* put_cap it w/o locks held */
1868
}
1869
if (ret < 0)
1870
goto out;
1871
}
1872
ret = 0;
1873
out:
1874
session->s_cap_iterator = NULL;
1875
spin_unlock(&session->s_cap_lock);
1876
1877
iput(last_inode);
1878
if (old_cap)
1879
ceph_put_cap(session->s_mdsc, old_cap);
1880
1881
return ret;
1882
}
1883
1884
static int remove_session_caps_cb(struct inode *inode, int mds, void *arg)
1885
{
1886
struct ceph_inode_info *ci = ceph_inode(inode);
1887
struct ceph_client *cl = ceph_inode_to_client(inode);
1888
bool invalidate = false;
1889
struct ceph_cap *cap;
1890
int iputs = 0;
1891
1892
spin_lock(&ci->i_ceph_lock);
1893
cap = __get_cap_for_mds(ci, mds);
1894
if (cap) {
1895
doutc(cl, " removing cap %p, ci is %p, inode is %p\n",
1896
cap, ci, &ci->netfs.inode);
1897
1898
iputs = ceph_purge_inode_cap(inode, cap, &invalidate);
1899
}
1900
spin_unlock(&ci->i_ceph_lock);
1901
1902
if (cap)
1903
wake_up_all(&ci->i_cap_wq);
1904
if (invalidate)
1905
ceph_queue_invalidate(inode);
1906
while (iputs--)
1907
iput(inode);
1908
return 0;
1909
}
1910
1911
/*
1912
* caller must hold session s_mutex
1913
*/
1914
static void remove_session_caps(struct ceph_mds_session *session)
1915
{
1916
struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1917
struct super_block *sb = fsc->sb;
1918
LIST_HEAD(dispose);
1919
1920
doutc(fsc->client, "on %p\n", session);
1921
ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1922
1923
wake_up_all(&fsc->mdsc->cap_flushing_wq);
1924
1925
spin_lock(&session->s_cap_lock);
1926
if (session->s_nr_caps > 0) {
1927
struct inode *inode;
1928
struct ceph_cap *cap, *prev = NULL;
1929
struct ceph_vino vino;
1930
/*
1931
* iterate_session_caps() skips inodes that are being
1932
* deleted, we need to wait until deletions are complete.
1933
* __wait_on_freeing_inode() is designed for the job,
1934
* but it is not exported, so use lookup inode function
1935
* to access it.
1936
*/
1937
while (!list_empty(&session->s_caps)) {
1938
cap = list_entry(session->s_caps.next,
1939
struct ceph_cap, session_caps);
1940
if (cap == prev)
1941
break;
1942
prev = cap;
1943
vino = cap->ci->i_vino;
1944
spin_unlock(&session->s_cap_lock);
1945
1946
inode = ceph_find_inode(sb, vino);
1947
iput(inode);
1948
1949
spin_lock(&session->s_cap_lock);
1950
}
1951
}
1952
1953
// drop cap expires and unlock s_cap_lock
1954
detach_cap_releases(session, &dispose);
1955
1956
BUG_ON(session->s_nr_caps > 0);
1957
BUG_ON(!list_empty(&session->s_cap_flushing));
1958
spin_unlock(&session->s_cap_lock);
1959
dispose_cap_releases(session->s_mdsc, &dispose);
1960
}
1961
1962
enum {
1963
RECONNECT,
1964
RENEWCAPS,
1965
FORCE_RO,
1966
};
1967
1968
/*
1969
* wake up any threads waiting on this session's caps. if the cap is
1970
* old (didn't get renewed on the client reconnect), remove it now.
1971
*
1972
* caller must hold s_mutex.
1973
*/
1974
static int wake_up_session_cb(struct inode *inode, int mds, void *arg)
1975
{
1976
struct ceph_inode_info *ci = ceph_inode(inode);
1977
unsigned long ev = (unsigned long)arg;
1978
1979
if (ev == RECONNECT) {
1980
spin_lock(&ci->i_ceph_lock);
1981
ci->i_wanted_max_size = 0;
1982
ci->i_requested_max_size = 0;
1983
spin_unlock(&ci->i_ceph_lock);
1984
} else if (ev == RENEWCAPS) {
1985
struct ceph_cap *cap;
1986
1987
spin_lock(&ci->i_ceph_lock);
1988
cap = __get_cap_for_mds(ci, mds);
1989
/* mds did not re-issue stale cap */
1990
if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen))
1991
cap->issued = cap->implemented = CEPH_CAP_PIN;
1992
spin_unlock(&ci->i_ceph_lock);
1993
} else if (ev == FORCE_RO) {
1994
}
1995
wake_up_all(&ci->i_cap_wq);
1996
return 0;
1997
}
1998
1999
static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
2000
{
2001
struct ceph_client *cl = session->s_mdsc->fsc->client;
2002
2003
doutc(cl, "session %p mds%d\n", session, session->s_mds);
2004
ceph_iterate_session_caps(session, wake_up_session_cb,
2005
(void *)(unsigned long)ev);
2006
}
2007
2008
/*
2009
* Send periodic message to MDS renewing all currently held caps. The
2010
* ack will reset the expiration for all caps from this session.
2011
*
2012
* caller holds s_mutex
2013
*/
2014
static int send_renew_caps(struct ceph_mds_client *mdsc,
2015
struct ceph_mds_session *session)
2016
{
2017
struct ceph_client *cl = mdsc->fsc->client;
2018
struct ceph_msg *msg;
2019
int state;
2020
2021
if (time_after_eq(jiffies, session->s_cap_ttl) &&
2022
time_after_eq(session->s_cap_ttl, session->s_renew_requested))
2023
pr_info_client(cl, "mds%d caps stale\n", session->s_mds);
2024
session->s_renew_requested = jiffies;
2025
2026
/* do not try to renew caps until a recovering mds has reconnected
2027
* with its clients. */
2028
state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
2029
if (state < CEPH_MDS_STATE_RECONNECT) {
2030
doutc(cl, "ignoring mds%d (%s)\n", session->s_mds,
2031
ceph_mds_state_name(state));
2032
return 0;
2033
}
2034
2035
doutc(cl, "to mds%d (%s)\n", session->s_mds,
2036
ceph_mds_state_name(state));
2037
msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_RENEWCAPS,
2038
++session->s_renew_seq);
2039
if (IS_ERR(msg))
2040
return PTR_ERR(msg);
2041
ceph_con_send(&session->s_con, msg);
2042
return 0;
2043
}
2044
2045
static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
2046
struct ceph_mds_session *session, u64 seq)
2047
{
2048
struct ceph_client *cl = mdsc->fsc->client;
2049
struct ceph_msg *msg;
2050
2051
doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds,
2052
ceph_session_state_name(session->s_state), seq);
2053
msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
2054
if (!msg)
2055
return -ENOMEM;
2056
ceph_con_send(&session->s_con, msg);
2057
return 0;
2058
}
2059
2060
2061
/*
2062
* Note new cap ttl, and any transition from stale -> not stale (fresh?).
2063
*
2064
* Called under session->s_mutex
2065
*/
2066
static void renewed_caps(struct ceph_mds_client *mdsc,
2067
struct ceph_mds_session *session, int is_renew)
2068
{
2069
struct ceph_client *cl = mdsc->fsc->client;
2070
int was_stale;
2071
int wake = 0;
2072
2073
spin_lock(&session->s_cap_lock);
2074
was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
2075
2076
session->s_cap_ttl = session->s_renew_requested +
2077
mdsc->mdsmap->m_session_timeout*HZ;
2078
2079
if (was_stale) {
2080
if (time_before(jiffies, session->s_cap_ttl)) {
2081
pr_info_client(cl, "mds%d caps renewed\n",
2082
session->s_mds);
2083
wake = 1;
2084
} else {
2085
pr_info_client(cl, "mds%d caps still stale\n",
2086
session->s_mds);
2087
}
2088
}
2089
doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds,
2090
session->s_cap_ttl, was_stale ? "stale" : "fresh",
2091
time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
2092
spin_unlock(&session->s_cap_lock);
2093
2094
if (wake)
2095
wake_up_session_caps(session, RENEWCAPS);
2096
}
2097
2098
/*
2099
* send a session close request
2100
*/
2101
static int request_close_session(struct ceph_mds_session *session)
2102
{
2103
struct ceph_client *cl = session->s_mdsc->fsc->client;
2104
struct ceph_msg *msg;
2105
2106
doutc(cl, "mds%d state %s seq %lld\n", session->s_mds,
2107
ceph_session_state_name(session->s_state), session->s_seq);
2108
msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
2109
session->s_seq);
2110
if (!msg)
2111
return -ENOMEM;
2112
ceph_con_send(&session->s_con, msg);
2113
return 1;
2114
}
2115
2116
/*
2117
* Called with s_mutex held.
2118
*/
2119
static int __close_session(struct ceph_mds_client *mdsc,
2120
struct ceph_mds_session *session)
2121
{
2122
if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
2123
return 0;
2124
session->s_state = CEPH_MDS_SESSION_CLOSING;
2125
return request_close_session(session);
2126
}
2127
2128
static bool drop_negative_children(struct dentry *dentry)
2129
{
2130
struct dentry *child;
2131
bool all_negative = true;
2132
2133
if (!d_is_dir(dentry))
2134
goto out;
2135
2136
spin_lock(&dentry->d_lock);
2137
hlist_for_each_entry(child, &dentry->d_children, d_sib) {
2138
if (d_really_is_positive(child)) {
2139
all_negative = false;
2140
break;
2141
}
2142
}
2143
spin_unlock(&dentry->d_lock);
2144
2145
if (all_negative)
2146
shrink_dcache_parent(dentry);
2147
out:
2148
return all_negative;
2149
}
2150
2151
/*
2152
* Trim old(er) caps.
2153
*
2154
* Because we can't cache an inode without one or more caps, we do
2155
* this indirectly: if a cap is unused, we prune its aliases, at which
2156
* point the inode will hopefully get dropped to.
2157
*
2158
* Yes, this is a bit sloppy. Our only real goal here is to respond to
2159
* memory pressure from the MDS, though, so it needn't be perfect.
2160
*/
2161
static int trim_caps_cb(struct inode *inode, int mds, void *arg)
2162
{
2163
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
2164
struct ceph_client *cl = mdsc->fsc->client;
2165
int *remaining = arg;
2166
struct ceph_inode_info *ci = ceph_inode(inode);
2167
int used, wanted, oissued, mine;
2168
struct ceph_cap *cap;
2169
2170
if (*remaining <= 0)
2171
return -1;
2172
2173
spin_lock(&ci->i_ceph_lock);
2174
cap = __get_cap_for_mds(ci, mds);
2175
if (!cap) {
2176
spin_unlock(&ci->i_ceph_lock);
2177
return 0;
2178
}
2179
mine = cap->issued | cap->implemented;
2180
used = __ceph_caps_used(ci);
2181
wanted = __ceph_caps_file_wanted(ci);
2182
oissued = __ceph_caps_issued_other(ci, cap);
2183
2184
doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n",
2185
inode, ceph_vinop(inode), cap, ceph_cap_string(mine),
2186
ceph_cap_string(oissued), ceph_cap_string(used),
2187
ceph_cap_string(wanted));
2188
if (cap == ci->i_auth_cap) {
2189
if (ci->i_dirty_caps || ci->i_flushing_caps ||
2190
!list_empty(&ci->i_cap_snaps))
2191
goto out;
2192
if ((used | wanted) & CEPH_CAP_ANY_WR)
2193
goto out;
2194
/* Note: it's possible that i_filelock_ref becomes non-zero
2195
* after dropping auth caps. It doesn't hurt because reply
2196
* of lock mds request will re-add auth caps. */
2197
if (atomic_read(&ci->i_filelock_ref) > 0)
2198
goto out;
2199
}
2200
/* The inode has cached pages, but it's no longer used.
2201
* we can safely drop it */
2202
if (S_ISREG(inode->i_mode) &&
2203
wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2204
!(oissued & CEPH_CAP_FILE_CACHE)) {
2205
used = 0;
2206
oissued = 0;
2207
}
2208
if ((used | wanted) & ~oissued & mine)
2209
goto out; /* we need these caps */
2210
2211
if (oissued) {
2212
/* we aren't the only cap.. just remove us */
2213
ceph_remove_cap(mdsc, cap, true);
2214
(*remaining)--;
2215
} else {
2216
struct dentry *dentry;
2217
/* try dropping referring dentries */
2218
spin_unlock(&ci->i_ceph_lock);
2219
dentry = d_find_any_alias(inode);
2220
if (dentry && drop_negative_children(dentry)) {
2221
int count;
2222
dput(dentry);
2223
d_prune_aliases(inode);
2224
count = icount_read(inode);
2225
if (count == 1)
2226
(*remaining)--;
2227
doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n",
2228
inode, ceph_vinop(inode), cap, count);
2229
} else {
2230
dput(dentry);
2231
}
2232
return 0;
2233
}
2234
2235
out:
2236
spin_unlock(&ci->i_ceph_lock);
2237
return 0;
2238
}
2239
2240
/*
2241
* Trim session cap count down to some max number.
2242
*/
2243
int ceph_trim_caps(struct ceph_mds_client *mdsc,
2244
struct ceph_mds_session *session,
2245
int max_caps)
2246
{
2247
struct ceph_client *cl = mdsc->fsc->client;
2248
int trim_caps = session->s_nr_caps - max_caps;
2249
2250
doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds,
2251
session->s_nr_caps, max_caps, trim_caps);
2252
if (trim_caps > 0) {
2253
int remaining = trim_caps;
2254
2255
ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2256
doutc(cl, "mds%d done: %d / %d, trimmed %d\n",
2257
session->s_mds, session->s_nr_caps, max_caps,
2258
trim_caps - remaining);
2259
}
2260
2261
ceph_flush_session_cap_releases(mdsc, session);
2262
return 0;
2263
}
2264
2265
static int check_caps_flush(struct ceph_mds_client *mdsc,
2266
u64 want_flush_tid)
2267
{
2268
struct ceph_client *cl = mdsc->fsc->client;
2269
int ret = 1;
2270
2271
spin_lock(&mdsc->cap_dirty_lock);
2272
if (!list_empty(&mdsc->cap_flush_list)) {
2273
struct ceph_cap_flush *cf =
2274
list_first_entry(&mdsc->cap_flush_list,
2275
struct ceph_cap_flush, g_list);
2276
if (cf->tid <= want_flush_tid) {
2277
doutc(cl, "still flushing tid %llu <= %llu\n",
2278
cf->tid, want_flush_tid);
2279
ret = 0;
2280
}
2281
}
2282
spin_unlock(&mdsc->cap_dirty_lock);
2283
return ret;
2284
}
2285
2286
/*
2287
* flush all dirty inode data to disk.
2288
*
2289
* returns true if we've flushed through want_flush_tid
2290
*/
2291
static void wait_caps_flush(struct ceph_mds_client *mdsc,
2292
u64 want_flush_tid)
2293
{
2294
struct ceph_client *cl = mdsc->fsc->client;
2295
2296
doutc(cl, "want %llu\n", want_flush_tid);
2297
2298
wait_event(mdsc->cap_flushing_wq,
2299
check_caps_flush(mdsc, want_flush_tid));
2300
2301
doutc(cl, "ok, flushed thru %llu\n", want_flush_tid);
2302
}
2303
2304
/*
2305
* called under s_mutex
2306
*/
2307
static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2308
struct ceph_mds_session *session)
2309
{
2310
struct ceph_client *cl = mdsc->fsc->client;
2311
struct ceph_msg *msg = NULL;
2312
struct ceph_mds_cap_release *head;
2313
struct ceph_mds_cap_item *item;
2314
struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2315
struct ceph_cap *cap;
2316
LIST_HEAD(tmp_list);
2317
int num_cap_releases;
2318
__le32 barrier, *cap_barrier;
2319
2320
down_read(&osdc->lock);
2321
barrier = cpu_to_le32(osdc->epoch_barrier);
2322
up_read(&osdc->lock);
2323
2324
spin_lock(&session->s_cap_lock);
2325
again:
2326
list_splice_init(&session->s_cap_releases, &tmp_list);
2327
num_cap_releases = session->s_num_cap_releases;
2328
session->s_num_cap_releases = 0;
2329
spin_unlock(&session->s_cap_lock);
2330
2331
while (!list_empty(&tmp_list)) {
2332
if (!msg) {
2333
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2334
PAGE_SIZE, GFP_NOFS, false);
2335
if (!msg)
2336
goto out_err;
2337
head = msg->front.iov_base;
2338
head->num = cpu_to_le32(0);
2339
msg->front.iov_len = sizeof(*head);
2340
2341
msg->hdr.version = cpu_to_le16(2);
2342
msg->hdr.compat_version = cpu_to_le16(1);
2343
}
2344
2345
cap = list_first_entry(&tmp_list, struct ceph_cap,
2346
session_caps);
2347
list_del(&cap->session_caps);
2348
num_cap_releases--;
2349
2350
head = msg->front.iov_base;
2351
put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2352
&head->num);
2353
item = msg->front.iov_base + msg->front.iov_len;
2354
item->ino = cpu_to_le64(cap->cap_ino);
2355
item->cap_id = cpu_to_le64(cap->cap_id);
2356
item->migrate_seq = cpu_to_le32(cap->mseq);
2357
item->issue_seq = cpu_to_le32(cap->issue_seq);
2358
msg->front.iov_len += sizeof(*item);
2359
2360
ceph_put_cap(mdsc, cap);
2361
2362
if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2363
// Append cap_barrier field
2364
cap_barrier = msg->front.iov_base + msg->front.iov_len;
2365
*cap_barrier = barrier;
2366
msg->front.iov_len += sizeof(*cap_barrier);
2367
2368
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2369
doutc(cl, "mds%d %p\n", session->s_mds, msg);
2370
ceph_con_send(&session->s_con, msg);
2371
msg = NULL;
2372
}
2373
}
2374
2375
BUG_ON(num_cap_releases != 0);
2376
2377
spin_lock(&session->s_cap_lock);
2378
if (!list_empty(&session->s_cap_releases))
2379
goto again;
2380
spin_unlock(&session->s_cap_lock);
2381
2382
if (msg) {
2383
// Append cap_barrier field
2384
cap_barrier = msg->front.iov_base + msg->front.iov_len;
2385
*cap_barrier = barrier;
2386
msg->front.iov_len += sizeof(*cap_barrier);
2387
2388
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2389
doutc(cl, "mds%d %p\n", session->s_mds, msg);
2390
ceph_con_send(&session->s_con, msg);
2391
}
2392
return;
2393
out_err:
2394
pr_err_client(cl, "mds%d, failed to allocate message\n",
2395
session->s_mds);
2396
spin_lock(&session->s_cap_lock);
2397
list_splice(&tmp_list, &session->s_cap_releases);
2398
session->s_num_cap_releases += num_cap_releases;
2399
spin_unlock(&session->s_cap_lock);
2400
}
2401
2402
static void ceph_cap_release_work(struct work_struct *work)
2403
{
2404
struct ceph_mds_session *session =
2405
container_of(work, struct ceph_mds_session, s_cap_release_work);
2406
2407
mutex_lock(&session->s_mutex);
2408
if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2409
session->s_state == CEPH_MDS_SESSION_HUNG)
2410
ceph_send_cap_releases(session->s_mdsc, session);
2411
mutex_unlock(&session->s_mutex);
2412
ceph_put_mds_session(session);
2413
}
2414
2415
void ceph_flush_session_cap_releases(struct ceph_mds_client *mdsc,
2416
struct ceph_mds_session *session)
2417
{
2418
struct ceph_client *cl = mdsc->fsc->client;
2419
if (mdsc->stopping)
2420
return;
2421
2422
ceph_get_mds_session(session);
2423
if (queue_work(mdsc->fsc->cap_wq,
2424
&session->s_cap_release_work)) {
2425
doutc(cl, "cap release work queued\n");
2426
} else {
2427
ceph_put_mds_session(session);
2428
doutc(cl, "failed to queue cap release work\n");
2429
}
2430
}
2431
2432
/*
2433
* caller holds session->s_cap_lock
2434
*/
2435
void __ceph_queue_cap_release(struct ceph_mds_session *session,
2436
struct ceph_cap *cap)
2437
{
2438
list_add_tail(&cap->session_caps, &session->s_cap_releases);
2439
session->s_num_cap_releases++;
2440
2441
if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2442
ceph_flush_session_cap_releases(session->s_mdsc, session);
2443
}
2444
2445
static void ceph_cap_reclaim_work(struct work_struct *work)
2446
{
2447
struct ceph_mds_client *mdsc =
2448
container_of(work, struct ceph_mds_client, cap_reclaim_work);
2449
int ret = ceph_trim_dentries(mdsc);
2450
if (ret == -EAGAIN)
2451
ceph_queue_cap_reclaim_work(mdsc);
2452
}
2453
2454
void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2455
{
2456
struct ceph_client *cl = mdsc->fsc->client;
2457
if (mdsc->stopping)
2458
return;
2459
2460
if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2461
doutc(cl, "caps reclaim work queued\n");
2462
} else {
2463
doutc(cl, "failed to queue caps release work\n");
2464
}
2465
}
2466
2467
void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2468
{
2469
int val;
2470
if (!nr)
2471
return;
2472
val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2473
if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2474
atomic_set(&mdsc->cap_reclaim_pending, 0);
2475
ceph_queue_cap_reclaim_work(mdsc);
2476
}
2477
}
2478
2479
void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc)
2480
{
2481
struct ceph_client *cl = mdsc->fsc->client;
2482
if (mdsc->stopping)
2483
return;
2484
2485
if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) {
2486
doutc(cl, "caps unlink work queued\n");
2487
} else {
2488
doutc(cl, "failed to queue caps unlink work\n");
2489
}
2490
}
2491
2492
static void ceph_cap_unlink_work(struct work_struct *work)
2493
{
2494
struct ceph_mds_client *mdsc =
2495
container_of(work, struct ceph_mds_client, cap_unlink_work);
2496
struct ceph_client *cl = mdsc->fsc->client;
2497
2498
doutc(cl, "begin\n");
2499
spin_lock(&mdsc->cap_delay_lock);
2500
while (!list_empty(&mdsc->cap_unlink_delay_list)) {
2501
struct ceph_inode_info *ci;
2502
struct inode *inode;
2503
2504
ci = list_first_entry(&mdsc->cap_unlink_delay_list,
2505
struct ceph_inode_info,
2506
i_cap_delay_list);
2507
list_del_init(&ci->i_cap_delay_list);
2508
2509
inode = igrab(&ci->netfs.inode);
2510
if (inode) {
2511
spin_unlock(&mdsc->cap_delay_lock);
2512
doutc(cl, "on %p %llx.%llx\n", inode,
2513
ceph_vinop(inode));
2514
ceph_check_caps(ci, CHECK_CAPS_FLUSH);
2515
iput(inode);
2516
spin_lock(&mdsc->cap_delay_lock);
2517
}
2518
}
2519
spin_unlock(&mdsc->cap_delay_lock);
2520
doutc(cl, "done\n");
2521
}
2522
2523
/*
2524
* requests
2525
*/
2526
2527
int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2528
struct inode *dir)
2529
{
2530
struct ceph_inode_info *ci = ceph_inode(dir);
2531
struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2532
struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2533
size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2534
unsigned int num_entries;
2535
int order;
2536
2537
spin_lock(&ci->i_ceph_lock);
2538
num_entries = ci->i_files + ci->i_subdirs;
2539
spin_unlock(&ci->i_ceph_lock);
2540
num_entries = max(num_entries, 1U);
2541
num_entries = min(num_entries, opt->max_readdir);
2542
2543
order = get_order(size * num_entries);
2544
while (order >= 0) {
2545
rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2546
__GFP_NOWARN |
2547
__GFP_ZERO,
2548
order);
2549
if (rinfo->dir_entries)
2550
break;
2551
order--;
2552
}
2553
if (!rinfo->dir_entries)
2554
return -ENOMEM;
2555
2556
num_entries = (PAGE_SIZE << order) / size;
2557
num_entries = min(num_entries, opt->max_readdir);
2558
2559
rinfo->dir_buf_size = PAGE_SIZE << order;
2560
req->r_num_caps = num_entries + 1;
2561
req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2562
req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2563
return 0;
2564
}
2565
2566
/*
2567
* Create an mds request.
2568
*/
2569
struct ceph_mds_request *
2570
ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2571
{
2572
struct ceph_mds_request *req;
2573
2574
req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2575
if (!req)
2576
return ERR_PTR(-ENOMEM);
2577
2578
mutex_init(&req->r_fill_mutex);
2579
req->r_mdsc = mdsc;
2580
req->r_started = jiffies;
2581
req->r_start_latency = ktime_get();
2582
req->r_resend_mds = -1;
2583
INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2584
INIT_LIST_HEAD(&req->r_unsafe_target_item);
2585
req->r_fmode = -1;
2586
req->r_feature_needed = -1;
2587
kref_init(&req->r_kref);
2588
RB_CLEAR_NODE(&req->r_node);
2589
INIT_LIST_HEAD(&req->r_wait);
2590
init_completion(&req->r_completion);
2591
init_completion(&req->r_safe_completion);
2592
INIT_LIST_HEAD(&req->r_unsafe_item);
2593
2594
ktime_get_coarse_real_ts64(&req->r_stamp);
2595
2596
req->r_op = op;
2597
req->r_direct_mode = mode;
2598
return req;
2599
}
2600
2601
/*
2602
* return oldest (lowest) request, tid in request tree, 0 if none.
2603
*
2604
* called under mdsc->mutex.
2605
*/
2606
static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2607
{
2608
if (RB_EMPTY_ROOT(&mdsc->request_tree))
2609
return NULL;
2610
return rb_entry(rb_first(&mdsc->request_tree),
2611
struct ceph_mds_request, r_node);
2612
}
2613
2614
static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2615
{
2616
return mdsc->oldest_tid;
2617
}
2618
2619
#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
2620
static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2621
{
2622
struct inode *dir = req->r_parent;
2623
struct dentry *dentry = req->r_dentry;
2624
const struct qstr *name = req->r_dname;
2625
u8 *cryptbuf = NULL;
2626
u32 len = 0;
2627
int ret = 0;
2628
2629
/* only encode if we have parent and dentry */
2630
if (!dir || !dentry)
2631
goto success;
2632
2633
/* No-op unless this is encrypted */
2634
if (!IS_ENCRYPTED(dir))
2635
goto success;
2636
2637
ret = ceph_fscrypt_prepare_readdir(dir);
2638
if (ret < 0)
2639
return ERR_PTR(ret);
2640
2641
/* No key? Just ignore it. */
2642
if (!fscrypt_has_encryption_key(dir))
2643
goto success;
2644
2645
if (!name)
2646
name = &dentry->d_name;
2647
2648
if (!fscrypt_fname_encrypted_size(dir, name->len, NAME_MAX, &len)) {
2649
WARN_ON_ONCE(1);
2650
return ERR_PTR(-ENAMETOOLONG);
2651
}
2652
2653
/* No need to append altname if name is short enough */
2654
if (len <= CEPH_NOHASH_NAME_MAX) {
2655
len = 0;
2656
goto success;
2657
}
2658
2659
cryptbuf = kmalloc(len, GFP_KERNEL);
2660
if (!cryptbuf)
2661
return ERR_PTR(-ENOMEM);
2662
2663
ret = fscrypt_fname_encrypt(dir, name, cryptbuf, len);
2664
if (ret) {
2665
kfree(cryptbuf);
2666
return ERR_PTR(ret);
2667
}
2668
success:
2669
*plen = len;
2670
return cryptbuf;
2671
}
2672
#else
2673
static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2674
{
2675
*plen = 0;
2676
return NULL;
2677
}
2678
#endif
2679
2680
/**
2681
* ceph_mdsc_build_path - build a path string to a given dentry
2682
* @mdsc: mds client
2683
* @dentry: dentry to which path should be built
2684
* @path_info: output path, length, base ino+snap, and freepath ownership flag
2685
* @for_wire: is this path going to be sent to the MDS?
2686
*
2687
* Build a string that represents the path to the dentry. This is mostly called
2688
* for two different purposes:
2689
*
2690
* 1) we need to build a path string to send to the MDS (for_wire == true)
2691
* 2) we need a path string for local presentation (e.g. debugfs)
2692
* (for_wire == false)
2693
*
2694
* The path is built in reverse, starting with the dentry. Walk back up toward
2695
* the root, building the path until the first non-snapped inode is reached
2696
* (for_wire) or the root inode is reached (!for_wire).
2697
*
2698
* Encode hidden .snap dirs as a double /, i.e.
2699
* foo/.snap/bar -> foo//bar
2700
*/
2701
char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry,
2702
struct ceph_path_info *path_info, int for_wire)
2703
{
2704
struct ceph_client *cl = mdsc->fsc->client;
2705
struct dentry *cur;
2706
struct inode *inode;
2707
char *path;
2708
int pos;
2709
unsigned seq;
2710
u64 base;
2711
2712
if (!dentry)
2713
return ERR_PTR(-EINVAL);
2714
2715
path = __getname();
2716
if (!path)
2717
return ERR_PTR(-ENOMEM);
2718
retry:
2719
pos = PATH_MAX - 1;
2720
path[pos] = '\0';
2721
2722
seq = read_seqbegin(&rename_lock);
2723
cur = dget(dentry);
2724
for (;;) {
2725
struct dentry *parent;
2726
2727
spin_lock(&cur->d_lock);
2728
inode = d_inode(cur);
2729
if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2730
doutc(cl, "path+%d: %p SNAPDIR\n", pos, cur);
2731
spin_unlock(&cur->d_lock);
2732
parent = dget_parent(cur);
2733
} else if (for_wire && inode && dentry != cur &&
2734
ceph_snap(inode) == CEPH_NOSNAP) {
2735
spin_unlock(&cur->d_lock);
2736
pos++; /* get rid of any prepended '/' */
2737
break;
2738
} else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) {
2739
pos -= cur->d_name.len;
2740
if (pos < 0) {
2741
spin_unlock(&cur->d_lock);
2742
break;
2743
}
2744
memcpy(path + pos, cur->d_name.name, cur->d_name.len);
2745
spin_unlock(&cur->d_lock);
2746
parent = dget_parent(cur);
2747
} else {
2748
int len, ret;
2749
char buf[NAME_MAX];
2750
2751
/*
2752
* Proactively copy name into buf, in case we need to
2753
* present it as-is.
2754
*/
2755
memcpy(buf, cur->d_name.name, cur->d_name.len);
2756
len = cur->d_name.len;
2757
spin_unlock(&cur->d_lock);
2758
parent = dget_parent(cur);
2759
2760
ret = ceph_fscrypt_prepare_readdir(d_inode(parent));
2761
if (ret < 0) {
2762
dput(parent);
2763
dput(cur);
2764
return ERR_PTR(ret);
2765
}
2766
2767
if (fscrypt_has_encryption_key(d_inode(parent))) {
2768
len = ceph_encode_encrypted_dname(d_inode(parent),
2769
buf, len);
2770
if (len < 0) {
2771
dput(parent);
2772
dput(cur);
2773
return ERR_PTR(len);
2774
}
2775
}
2776
pos -= len;
2777
if (pos < 0) {
2778
dput(parent);
2779
break;
2780
}
2781
memcpy(path + pos, buf, len);
2782
}
2783
dput(cur);
2784
cur = parent;
2785
2786
/* Are we at the root? */
2787
if (IS_ROOT(cur))
2788
break;
2789
2790
/* Are we out of buffer? */
2791
if (--pos < 0)
2792
break;
2793
2794
path[pos] = '/';
2795
}
2796
inode = d_inode(cur);
2797
base = inode ? ceph_ino(inode) : 0;
2798
dput(cur);
2799
2800
if (read_seqretry(&rename_lock, seq))
2801
goto retry;
2802
2803
if (pos < 0) {
2804
/*
2805
* The path is longer than PATH_MAX and this function
2806
* cannot ever succeed. Creating paths that long is
2807
* possible with Ceph, but Linux cannot use them.
2808
*/
2809
return ERR_PTR(-ENAMETOOLONG);
2810
}
2811
2812
/* Initialize the output structure */
2813
memset(path_info, 0, sizeof(*path_info));
2814
2815
path_info->vino.ino = base;
2816
path_info->pathlen = PATH_MAX - 1 - pos;
2817
path_info->path = path + pos;
2818
path_info->freepath = true;
2819
2820
/* Set snap from dentry if available */
2821
if (d_inode(dentry))
2822
path_info->vino.snap = ceph_snap(d_inode(dentry));
2823
else
2824
path_info->vino.snap = CEPH_NOSNAP;
2825
2826
doutc(cl, "on %p %d built %llx '%.*s'\n", dentry, d_count(dentry),
2827
base, PATH_MAX - 1 - pos, path + pos);
2828
return path + pos;
2829
}
2830
2831
static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry,
2832
struct inode *dir, struct ceph_path_info *path_info,
2833
bool parent_locked)
2834
{
2835
char *path;
2836
2837
rcu_read_lock();
2838
if (!dir)
2839
dir = d_inode_rcu(dentry->d_parent);
2840
if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP &&
2841
!IS_ENCRYPTED(dir)) {
2842
path_info->vino.ino = ceph_ino(dir);
2843
path_info->vino.snap = ceph_snap(dir);
2844
rcu_read_unlock();
2845
path_info->path = dentry->d_name.name;
2846
path_info->pathlen = dentry->d_name.len;
2847
path_info->freepath = false;
2848
return 0;
2849
}
2850
rcu_read_unlock();
2851
path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1);
2852
if (IS_ERR(path))
2853
return PTR_ERR(path);
2854
/*
2855
* ceph_mdsc_build_path already fills path_info, including snap handling.
2856
*/
2857
return 0;
2858
}
2859
2860
static int build_inode_path(struct inode *inode, struct ceph_path_info *path_info)
2861
{
2862
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
2863
struct dentry *dentry;
2864
char *path;
2865
2866
if (ceph_snap(inode) == CEPH_NOSNAP) {
2867
path_info->vino.ino = ceph_ino(inode);
2868
path_info->vino.snap = ceph_snap(inode);
2869
path_info->pathlen = 0;
2870
path_info->freepath = false;
2871
return 0;
2872
}
2873
dentry = d_find_alias(inode);
2874
path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1);
2875
dput(dentry);
2876
if (IS_ERR(path))
2877
return PTR_ERR(path);
2878
/*
2879
* ceph_mdsc_build_path already fills path_info, including snap from dentry.
2880
* Override with inode's snap since that's what this function is for.
2881
*/
2882
path_info->vino.snap = ceph_snap(inode);
2883
return 0;
2884
}
2885
2886
/*
2887
* request arguments may be specified via an inode *, a dentry *, or
2888
* an explicit ino+path.
2889
*/
2890
static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode,
2891
struct dentry *rdentry, struct inode *rdiri,
2892
const char *rpath, u64 rino,
2893
struct ceph_path_info *path_info,
2894
bool parent_locked)
2895
{
2896
struct ceph_client *cl = mdsc->fsc->client;
2897
int r = 0;
2898
2899
/* Initialize the output structure */
2900
memset(path_info, 0, sizeof(*path_info));
2901
2902
if (rinode) {
2903
r = build_inode_path(rinode, path_info);
2904
doutc(cl, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2905
ceph_snap(rinode));
2906
} else if (rdentry) {
2907
r = build_dentry_path(mdsc, rdentry, rdiri, path_info, parent_locked);
2908
doutc(cl, " dentry %p %llx/%.*s\n", rdentry, path_info->vino.ino,
2909
path_info->pathlen, path_info->path);
2910
} else if (rpath || rino) {
2911
path_info->vino.ino = rino;
2912
path_info->vino.snap = CEPH_NOSNAP;
2913
path_info->path = rpath;
2914
path_info->pathlen = rpath ? strlen(rpath) : 0;
2915
path_info->freepath = false;
2916
2917
doutc(cl, " path %.*s\n", path_info->pathlen, rpath);
2918
}
2919
2920
return r;
2921
}
2922
2923
static void encode_mclientrequest_tail(void **p,
2924
const struct ceph_mds_request *req)
2925
{
2926
struct ceph_timespec ts;
2927
int i;
2928
2929
ceph_encode_timespec64(&ts, &req->r_stamp);
2930
ceph_encode_copy(p, &ts, sizeof(ts));
2931
2932
/* v4: gid_list */
2933
ceph_encode_32(p, req->r_cred->group_info->ngroups);
2934
for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2935
ceph_encode_64(p, from_kgid(&init_user_ns,
2936
req->r_cred->group_info->gid[i]));
2937
2938
/* v5: altname */
2939
ceph_encode_32(p, req->r_altname_len);
2940
ceph_encode_copy(p, req->r_altname, req->r_altname_len);
2941
2942
/* v6: fscrypt_auth and fscrypt_file */
2943
if (req->r_fscrypt_auth) {
2944
u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth);
2945
2946
ceph_encode_32(p, authlen);
2947
ceph_encode_copy(p, req->r_fscrypt_auth, authlen);
2948
} else {
2949
ceph_encode_32(p, 0);
2950
}
2951
if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) {
2952
ceph_encode_32(p, sizeof(__le64));
2953
ceph_encode_64(p, req->r_fscrypt_file);
2954
} else {
2955
ceph_encode_32(p, 0);
2956
}
2957
}
2958
2959
static inline u16 mds_supported_head_version(struct ceph_mds_session *session)
2960
{
2961
if (!test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, &session->s_features))
2962
return 1;
2963
2964
if (!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features))
2965
return 2;
2966
2967
return CEPH_MDS_REQUEST_HEAD_VERSION;
2968
}
2969
2970
static struct ceph_mds_request_head_legacy *
2971
find_legacy_request_head(void *p, u64 features)
2972
{
2973
bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2974
struct ceph_mds_request_head *head;
2975
2976
if (legacy)
2977
return (struct ceph_mds_request_head_legacy *)p;
2978
head = (struct ceph_mds_request_head *)p;
2979
return (struct ceph_mds_request_head_legacy *)&head->oldest_client_tid;
2980
}
2981
2982
/*
2983
* called under mdsc->mutex
2984
*/
2985
static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2986
struct ceph_mds_request *req,
2987
bool drop_cap_releases)
2988
{
2989
int mds = session->s_mds;
2990
struct ceph_mds_client *mdsc = session->s_mdsc;
2991
struct ceph_client *cl = mdsc->fsc->client;
2992
struct ceph_msg *msg;
2993
struct ceph_mds_request_head_legacy *lhead;
2994
struct ceph_path_info path_info1 = {0};
2995
struct ceph_path_info path_info2 = {0};
2996
struct dentry *old_dentry = NULL;
2997
int len;
2998
u16 releases;
2999
void *p, *end;
3000
int ret;
3001
bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
3002
u16 request_head_version = mds_supported_head_version(session);
3003
kuid_t caller_fsuid = req->r_cred->fsuid;
3004
kgid_t caller_fsgid = req->r_cred->fsgid;
3005
bool parent_locked = test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
3006
3007
ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry,
3008
req->r_parent, req->r_path1, req->r_ino1.ino,
3009
&path_info1, parent_locked);
3010
if (ret < 0) {
3011
msg = ERR_PTR(ret);
3012
goto out;
3013
}
3014
3015
/*
3016
* When the parent directory's i_rwsem is *not* locked, req->r_parent may
3017
* have become stale (e.g. after a concurrent rename) between the time the
3018
* dentry was looked up and now. If we detect that the stored r_parent
3019
* does not match the inode number we just encoded for the request, switch
3020
* to the correct inode so that the MDS receives a valid parent reference.
3021
*/
3022
if (!parent_locked && req->r_parent && path_info1.vino.ino &&
3023
ceph_ino(req->r_parent) != path_info1.vino.ino) {
3024
struct inode *old_parent = req->r_parent;
3025
struct inode *correct_dir = ceph_get_inode(mdsc->fsc->sb, path_info1.vino, NULL);
3026
if (!IS_ERR(correct_dir)) {
3027
WARN_ONCE(1, "ceph: r_parent mismatch (had %llx wanted %llx) - updating\n",
3028
ceph_ino(old_parent), path_info1.vino.ino);
3029
/*
3030
* Transfer CEPH_CAP_PIN from the old parent to the new one.
3031
* The pin was taken earlier in ceph_mdsc_submit_request().
3032
*/
3033
ceph_put_cap_refs(ceph_inode(old_parent), CEPH_CAP_PIN);
3034
iput(old_parent);
3035
req->r_parent = correct_dir;
3036
ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
3037
}
3038
}
3039
3040
/* If r_old_dentry is set, then assume that its parent is locked */
3041
if (req->r_old_dentry &&
3042
!(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED))
3043
old_dentry = req->r_old_dentry;
3044
ret = set_request_path_attr(mdsc, NULL, old_dentry,
3045
req->r_old_dentry_dir,
3046
req->r_path2, req->r_ino2.ino,
3047
&path_info2, true);
3048
if (ret < 0) {
3049
msg = ERR_PTR(ret);
3050
goto out_free1;
3051
}
3052
3053
req->r_altname = get_fscrypt_altname(req, &req->r_altname_len);
3054
if (IS_ERR(req->r_altname)) {
3055
msg = ERR_CAST(req->r_altname);
3056
req->r_altname = NULL;
3057
goto out_free2;
3058
}
3059
3060
/*
3061
* For old cephs without supporting the 32bit retry/fwd feature
3062
* it will copy the raw memories directly when decoding the
3063
* requests. While new cephs will decode the head depending the
3064
* version member, so we need to make sure it will be compatible
3065
* with them both.
3066
*/
3067
if (legacy)
3068
len = sizeof(struct ceph_mds_request_head_legacy);
3069
else if (request_head_version == 1)
3070
len = offsetofend(struct ceph_mds_request_head, args);
3071
else if (request_head_version == 2)
3072
len = offsetofend(struct ceph_mds_request_head, ext_num_fwd);
3073
else
3074
len = sizeof(struct ceph_mds_request_head);
3075
3076
/* filepaths */
3077
len += 2 * (1 + sizeof(u32) + sizeof(u64));
3078
len += path_info1.pathlen + path_info2.pathlen;
3079
3080
/* cap releases */
3081
len += sizeof(struct ceph_mds_request_release) *
3082
(!!req->r_inode_drop + !!req->r_dentry_drop +
3083
!!req->r_old_inode_drop + !!req->r_old_dentry_drop);
3084
3085
if (req->r_dentry_drop)
3086
len += path_info1.pathlen;
3087
if (req->r_old_dentry_drop)
3088
len += path_info2.pathlen;
3089
3090
/* MClientRequest tail */
3091
3092
/* req->r_stamp */
3093
len += sizeof(struct ceph_timespec);
3094
3095
/* gid list */
3096
len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
3097
3098
/* alternate name */
3099
len += sizeof(u32) + req->r_altname_len;
3100
3101
/* fscrypt_auth */
3102
len += sizeof(u32); // fscrypt_auth
3103
if (req->r_fscrypt_auth)
3104
len += ceph_fscrypt_auth_len(req->r_fscrypt_auth);
3105
3106
/* fscrypt_file */
3107
len += sizeof(u32);
3108
if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags))
3109
len += sizeof(__le64);
3110
3111
msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
3112
if (!msg) {
3113
msg = ERR_PTR(-ENOMEM);
3114
goto out_free2;
3115
}
3116
3117
msg->hdr.tid = cpu_to_le64(req->r_tid);
3118
3119
lhead = find_legacy_request_head(msg->front.iov_base,
3120
session->s_con.peer_features);
3121
3122
if ((req->r_mnt_idmap != &nop_mnt_idmap) &&
3123
!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) {
3124
WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op));
3125
3126
if (enable_unsafe_idmap) {
3127
pr_warn_once_client(cl,
3128
"idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID"
3129
" is not supported by MDS. UID/GID-based restrictions may"
3130
" not work properly.\n");
3131
3132
caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns,
3133
VFSUIDT_INIT(req->r_cred->fsuid));
3134
caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns,
3135
VFSGIDT_INIT(req->r_cred->fsgid));
3136
} else {
3137
pr_err_ratelimited_client(cl,
3138
"idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID"
3139
" is not supported by MDS. Fail request with -EIO.\n");
3140
3141
ret = -EIO;
3142
goto out_err;
3143
}
3144
}
3145
3146
/*
3147
* The ceph_mds_request_head_legacy didn't contain a version field, and
3148
* one was added when we moved the message version from 3->4.
3149
*/
3150
if (legacy) {
3151
msg->hdr.version = cpu_to_le16(3);
3152
p = msg->front.iov_base + sizeof(*lhead);
3153
} else if (request_head_version == 1) {
3154
struct ceph_mds_request_head *nhead = msg->front.iov_base;
3155
3156
msg->hdr.version = cpu_to_le16(4);
3157
nhead->version = cpu_to_le16(1);
3158
p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, args);
3159
} else if (request_head_version == 2) {
3160
struct ceph_mds_request_head *nhead = msg->front.iov_base;
3161
3162
msg->hdr.version = cpu_to_le16(6);
3163
nhead->version = cpu_to_le16(2);
3164
3165
p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, ext_num_fwd);
3166
} else {
3167
struct ceph_mds_request_head *nhead = msg->front.iov_base;
3168
kuid_t owner_fsuid;
3169
kgid_t owner_fsgid;
3170
3171
msg->hdr.version = cpu_to_le16(6);
3172
nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
3173
nhead->struct_len = cpu_to_le32(sizeof(struct ceph_mds_request_head));
3174
3175
if (IS_CEPH_MDS_OP_NEWINODE(req->r_op)) {
3176
owner_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns,
3177
VFSUIDT_INIT(req->r_cred->fsuid));
3178
owner_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns,
3179
VFSGIDT_INIT(req->r_cred->fsgid));
3180
nhead->owner_uid = cpu_to_le32(from_kuid(&init_user_ns, owner_fsuid));
3181
nhead->owner_gid = cpu_to_le32(from_kgid(&init_user_ns, owner_fsgid));
3182
} else {
3183
nhead->owner_uid = cpu_to_le32(-1);
3184
nhead->owner_gid = cpu_to_le32(-1);
3185
}
3186
3187
p = msg->front.iov_base + sizeof(*nhead);
3188
}
3189
3190
end = msg->front.iov_base + msg->front.iov_len;
3191
3192
lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
3193
lhead->op = cpu_to_le32(req->r_op);
3194
lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
3195
caller_fsuid));
3196
lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
3197
caller_fsgid));
3198
lhead->ino = cpu_to_le64(req->r_deleg_ino);
3199
lhead->args = req->r_args;
3200
3201
ceph_encode_filepath(&p, end, path_info1.vino.ino, path_info1.path);
3202
ceph_encode_filepath(&p, end, path_info2.vino.ino, path_info2.path);
3203
3204
/* make note of release offset, in case we need to replay */
3205
req->r_request_release_offset = p - msg->front.iov_base;
3206
3207
/* cap releases */
3208
releases = 0;
3209
if (req->r_inode_drop)
3210
releases += ceph_encode_inode_release(&p,
3211
req->r_inode ? req->r_inode : d_inode(req->r_dentry),
3212
mds, req->r_inode_drop, req->r_inode_unless,
3213
req->r_op == CEPH_MDS_OP_READDIR);
3214
if (req->r_dentry_drop) {
3215
ret = ceph_encode_dentry_release(&p, req->r_dentry,
3216
req->r_parent, mds, req->r_dentry_drop,
3217
req->r_dentry_unless);
3218
if (ret < 0)
3219
goto out_err;
3220
releases += ret;
3221
}
3222
if (req->r_old_dentry_drop) {
3223
ret = ceph_encode_dentry_release(&p, req->r_old_dentry,
3224
req->r_old_dentry_dir, mds,
3225
req->r_old_dentry_drop,
3226
req->r_old_dentry_unless);
3227
if (ret < 0)
3228
goto out_err;
3229
releases += ret;
3230
}
3231
if (req->r_old_inode_drop)
3232
releases += ceph_encode_inode_release(&p,
3233
d_inode(req->r_old_dentry),
3234
mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
3235
3236
if (drop_cap_releases) {
3237
releases = 0;
3238
p = msg->front.iov_base + req->r_request_release_offset;
3239
}
3240
3241
lhead->num_releases = cpu_to_le16(releases);
3242
3243
encode_mclientrequest_tail(&p, req);
3244
3245
if (WARN_ON_ONCE(p > end)) {
3246
ceph_msg_put(msg);
3247
msg = ERR_PTR(-ERANGE);
3248
goto out_free2;
3249
}
3250
3251
msg->front.iov_len = p - msg->front.iov_base;
3252
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3253
3254
if (req->r_pagelist) {
3255
struct ceph_pagelist *pagelist = req->r_pagelist;
3256
ceph_msg_data_add_pagelist(msg, pagelist);
3257
msg->hdr.data_len = cpu_to_le32(pagelist->length);
3258
} else {
3259
msg->hdr.data_len = 0;
3260
}
3261
3262
msg->hdr.data_off = cpu_to_le16(0);
3263
3264
out_free2:
3265
ceph_mdsc_free_path_info(&path_info2);
3266
out_free1:
3267
ceph_mdsc_free_path_info(&path_info1);
3268
out:
3269
return msg;
3270
out_err:
3271
ceph_msg_put(msg);
3272
msg = ERR_PTR(ret);
3273
goto out_free2;
3274
}
3275
3276
/*
3277
* called under mdsc->mutex if error, under no mutex if
3278
* success.
3279
*/
3280
static void complete_request(struct ceph_mds_client *mdsc,
3281
struct ceph_mds_request *req)
3282
{
3283
req->r_end_latency = ktime_get();
3284
3285
if (req->r_callback)
3286
req->r_callback(mdsc, req);
3287
complete_all(&req->r_completion);
3288
}
3289
3290
/*
3291
* called under mdsc->mutex
3292
*/
3293
static int __prepare_send_request(struct ceph_mds_session *session,
3294
struct ceph_mds_request *req,
3295
bool drop_cap_releases)
3296
{
3297
int mds = session->s_mds;
3298
struct ceph_mds_client *mdsc = session->s_mdsc;
3299
struct ceph_client *cl = mdsc->fsc->client;
3300
struct ceph_mds_request_head_legacy *lhead;
3301
struct ceph_mds_request_head *nhead;
3302
struct ceph_msg *msg;
3303
int flags = 0, old_max_retry;
3304
bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD,
3305
&session->s_features);
3306
3307
/*
3308
* Avoid infinite retrying after overflow. The client will
3309
* increase the retry count and if the MDS is old version,
3310
* so we limit to retry at most 256 times.
3311
*/
3312
if (req->r_attempts) {
3313
old_max_retry = sizeof_field(struct ceph_mds_request_head,
3314
num_retry);
3315
old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE);
3316
if ((old_version && req->r_attempts >= old_max_retry) ||
3317
((uint32_t)req->r_attempts >= U32_MAX)) {
3318
pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n",
3319
req->r_tid);
3320
return -EMULTIHOP;
3321
}
3322
}
3323
3324
req->r_attempts++;
3325
if (req->r_inode) {
3326
struct ceph_cap *cap =
3327
ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
3328
3329
if (cap)
3330
req->r_sent_on_mseq = cap->mseq;
3331
else
3332
req->r_sent_on_mseq = -1;
3333
}
3334
doutc(cl, "%p tid %lld %s (attempt %d)\n", req, req->r_tid,
3335
ceph_mds_op_name(req->r_op), req->r_attempts);
3336
3337
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3338
void *p;
3339
3340
/*
3341
* Replay. Do not regenerate message (and rebuild
3342
* paths, etc.); just use the original message.
3343
* Rebuilding paths will break for renames because
3344
* d_move mangles the src name.
3345
*/
3346
msg = req->r_request;
3347
lhead = find_legacy_request_head(msg->front.iov_base,
3348
session->s_con.peer_features);
3349
3350
flags = le32_to_cpu(lhead->flags);
3351
flags |= CEPH_MDS_FLAG_REPLAY;
3352
lhead->flags = cpu_to_le32(flags);
3353
3354
if (req->r_target_inode)
3355
lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
3356
3357
lhead->num_retry = req->r_attempts - 1;
3358
if (!old_version) {
3359
nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3360
nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3361
}
3362
3363
/* remove cap/dentry releases from message */
3364
lhead->num_releases = 0;
3365
3366
p = msg->front.iov_base + req->r_request_release_offset;
3367
encode_mclientrequest_tail(&p, req);
3368
3369
msg->front.iov_len = p - msg->front.iov_base;
3370
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3371
return 0;
3372
}
3373
3374
if (req->r_request) {
3375
ceph_msg_put(req->r_request);
3376
req->r_request = NULL;
3377
}
3378
msg = create_request_message(session, req, drop_cap_releases);
3379
if (IS_ERR(msg)) {
3380
req->r_err = PTR_ERR(msg);
3381
return PTR_ERR(msg);
3382
}
3383
req->r_request = msg;
3384
3385
lhead = find_legacy_request_head(msg->front.iov_base,
3386
session->s_con.peer_features);
3387
lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
3388
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3389
flags |= CEPH_MDS_FLAG_REPLAY;
3390
if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
3391
flags |= CEPH_MDS_FLAG_ASYNC;
3392
if (req->r_parent)
3393
flags |= CEPH_MDS_FLAG_WANT_DENTRY;
3394
lhead->flags = cpu_to_le32(flags);
3395
lhead->num_fwd = req->r_num_fwd;
3396
lhead->num_retry = req->r_attempts - 1;
3397
if (!old_version) {
3398
nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3399
nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd);
3400
nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3401
}
3402
3403
doutc(cl, " r_parent = %p\n", req->r_parent);
3404
return 0;
3405
}
3406
3407
/*
3408
* called under mdsc->mutex
3409
*/
3410
static int __send_request(struct ceph_mds_session *session,
3411
struct ceph_mds_request *req,
3412
bool drop_cap_releases)
3413
{
3414
int err;
3415
3416
err = __prepare_send_request(session, req, drop_cap_releases);
3417
if (!err) {
3418
ceph_msg_get(req->r_request);
3419
ceph_con_send(&session->s_con, req->r_request);
3420
}
3421
3422
return err;
3423
}
3424
3425
/*
3426
* send request, or put it on the appropriate wait list.
3427
*/
3428
static void __do_request(struct ceph_mds_client *mdsc,
3429
struct ceph_mds_request *req)
3430
{
3431
struct ceph_client *cl = mdsc->fsc->client;
3432
struct ceph_mds_session *session = NULL;
3433
int mds = -1;
3434
int err = 0;
3435
bool random;
3436
3437
if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3438
if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
3439
__unregister_request(mdsc, req);
3440
return;
3441
}
3442
3443
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) {
3444
doutc(cl, "metadata corrupted\n");
3445
err = -EIO;
3446
goto finish;
3447
}
3448
if (req->r_timeout &&
3449
time_after_eq(jiffies, req->r_started + req->r_timeout)) {
3450
doutc(cl, "timed out\n");
3451
err = -ETIMEDOUT;
3452
goto finish;
3453
}
3454
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
3455
doutc(cl, "forced umount\n");
3456
err = -EIO;
3457
goto finish;
3458
}
3459
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
3460
if (mdsc->mdsmap_err) {
3461
err = mdsc->mdsmap_err;
3462
doutc(cl, "mdsmap err %d\n", err);
3463
goto finish;
3464
}
3465
if (mdsc->mdsmap->m_epoch == 0) {
3466
doutc(cl, "no mdsmap, waiting for map\n");
3467
list_add(&req->r_wait, &mdsc->waiting_for_map);
3468
return;
3469
}
3470
if (!(mdsc->fsc->mount_options->flags &
3471
CEPH_MOUNT_OPT_MOUNTWAIT) &&
3472
!ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
3473
err = -EHOSTUNREACH;
3474
goto finish;
3475
}
3476
}
3477
3478
put_request_session(req);
3479
3480
mds = __choose_mds(mdsc, req, &random);
3481
if (mds < 0 ||
3482
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
3483
if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3484
err = -EJUKEBOX;
3485
goto finish;
3486
}
3487
doutc(cl, "no mds or not active, waiting for map\n");
3488
list_add(&req->r_wait, &mdsc->waiting_for_map);
3489
return;
3490
}
3491
3492
/* get, open session */
3493
session = __ceph_lookup_mds_session(mdsc, mds);
3494
if (!session) {
3495
session = register_session(mdsc, mds);
3496
if (IS_ERR(session)) {
3497
err = PTR_ERR(session);
3498
goto finish;
3499
}
3500
}
3501
req->r_session = ceph_get_mds_session(session);
3502
3503
doutc(cl, "mds%d session %p state %s\n", mds, session,
3504
ceph_session_state_name(session->s_state));
3505
3506
/*
3507
* The old ceph will crash the MDSs when see unknown OPs
3508
*/
3509
if (req->r_feature_needed > 0 &&
3510
!test_bit(req->r_feature_needed, &session->s_features)) {
3511
err = -EOPNOTSUPP;
3512
goto out_session;
3513
}
3514
3515
if (session->s_state != CEPH_MDS_SESSION_OPEN &&
3516
session->s_state != CEPH_MDS_SESSION_HUNG) {
3517
/*
3518
* We cannot queue async requests since the caps and delegated
3519
* inodes are bound to the session. Just return -EJUKEBOX and
3520
* let the caller retry a sync request in that case.
3521
*/
3522
if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3523
err = -EJUKEBOX;
3524
goto out_session;
3525
}
3526
3527
/*
3528
* If the session has been REJECTED, then return a hard error,
3529
* unless it's a CLEANRECOVER mount, in which case we'll queue
3530
* it to the mdsc queue.
3531
*/
3532
if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
3533
if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
3534
list_add(&req->r_wait, &mdsc->waiting_for_map);
3535
else
3536
err = -EACCES;
3537
goto out_session;
3538
}
3539
3540
if (session->s_state == CEPH_MDS_SESSION_NEW ||
3541
session->s_state == CEPH_MDS_SESSION_CLOSING) {
3542
err = __open_session(mdsc, session);
3543
if (err)
3544
goto out_session;
3545
/* retry the same mds later */
3546
if (random)
3547
req->r_resend_mds = mds;
3548
}
3549
list_add(&req->r_wait, &session->s_waiting);
3550
goto out_session;
3551
}
3552
3553
/* send request */
3554
req->r_resend_mds = -1; /* forget any previous mds hint */
3555
3556
if (req->r_request_started == 0) /* note request start time */
3557
req->r_request_started = jiffies;
3558
3559
/*
3560
* For async create we will choose the auth MDS of frag in parent
3561
* directory to send the request and usually this works fine, but
3562
* if the migrated the dirtory to another MDS before it could handle
3563
* it the request will be forwarded.
3564
*
3565
* And then the auth cap will be changed.
3566
*/
3567
if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) {
3568
struct ceph_dentry_info *di = ceph_dentry(req->r_dentry);
3569
struct ceph_inode_info *ci;
3570
struct ceph_cap *cap;
3571
3572
/*
3573
* The request maybe handled very fast and the new inode
3574
* hasn't been linked to the dentry yet. We need to wait
3575
* for the ceph_finish_async_create(), which shouldn't be
3576
* stuck too long or fail in thoery, to finish when forwarding
3577
* the request.
3578
*/
3579
if (!d_inode(req->r_dentry)) {
3580
err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT,
3581
TASK_KILLABLE);
3582
if (err) {
3583
mutex_lock(&req->r_fill_mutex);
3584
set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3585
mutex_unlock(&req->r_fill_mutex);
3586
goto out_session;
3587
}
3588
}
3589
3590
ci = ceph_inode(d_inode(req->r_dentry));
3591
3592
spin_lock(&ci->i_ceph_lock);
3593
cap = ci->i_auth_cap;
3594
if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) {
3595
doutc(cl, "session changed for auth cap %d -> %d\n",
3596
cap->session->s_mds, session->s_mds);
3597
3598
/* Remove the auth cap from old session */
3599
spin_lock(&cap->session->s_cap_lock);
3600
cap->session->s_nr_caps--;
3601
list_del_init(&cap->session_caps);
3602
spin_unlock(&cap->session->s_cap_lock);
3603
3604
/* Add the auth cap to the new session */
3605
cap->mds = mds;
3606
cap->session = session;
3607
spin_lock(&session->s_cap_lock);
3608
session->s_nr_caps++;
3609
list_add_tail(&cap->session_caps, &session->s_caps);
3610
spin_unlock(&session->s_cap_lock);
3611
3612
change_auth_cap_ses(ci, session);
3613
}
3614
spin_unlock(&ci->i_ceph_lock);
3615
}
3616
3617
err = __send_request(session, req, false);
3618
3619
out_session:
3620
ceph_put_mds_session(session);
3621
finish:
3622
if (err) {
3623
doutc(cl, "early error %d\n", err);
3624
req->r_err = err;
3625
complete_request(mdsc, req);
3626
__unregister_request(mdsc, req);
3627
}
3628
return;
3629
}
3630
3631
/*
3632
* called under mdsc->mutex
3633
*/
3634
static void __wake_requests(struct ceph_mds_client *mdsc,
3635
struct list_head *head)
3636
{
3637
struct ceph_client *cl = mdsc->fsc->client;
3638
struct ceph_mds_request *req;
3639
LIST_HEAD(tmp_list);
3640
3641
list_splice_init(head, &tmp_list);
3642
3643
while (!list_empty(&tmp_list)) {
3644
req = list_entry(tmp_list.next,
3645
struct ceph_mds_request, r_wait);
3646
list_del_init(&req->r_wait);
3647
doutc(cl, " wake request %p tid %llu\n", req,
3648
req->r_tid);
3649
__do_request(mdsc, req);
3650
}
3651
}
3652
3653
/*
3654
* Wake up threads with requests pending for @mds, so that they can
3655
* resubmit their requests to a possibly different mds.
3656
*/
3657
static void kick_requests(struct ceph_mds_client *mdsc, int mds)
3658
{
3659
struct ceph_client *cl = mdsc->fsc->client;
3660
struct ceph_mds_request *req;
3661
struct rb_node *p = rb_first(&mdsc->request_tree);
3662
3663
doutc(cl, "kick_requests mds%d\n", mds);
3664
while (p) {
3665
req = rb_entry(p, struct ceph_mds_request, r_node);
3666
p = rb_next(p);
3667
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3668
continue;
3669
if (req->r_attempts > 0)
3670
continue; /* only new requests */
3671
if (req->r_session &&
3672
req->r_session->s_mds == mds) {
3673
doutc(cl, " kicking tid %llu\n", req->r_tid);
3674
list_del_init(&req->r_wait);
3675
__do_request(mdsc, req);
3676
}
3677
}
3678
}
3679
3680
int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
3681
struct ceph_mds_request *req)
3682
{
3683
struct ceph_client *cl = mdsc->fsc->client;
3684
int err = 0;
3685
3686
/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
3687
if (req->r_inode)
3688
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
3689
if (req->r_parent) {
3690
struct ceph_inode_info *ci = ceph_inode(req->r_parent);
3691
int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
3692
CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
3693
spin_lock(&ci->i_ceph_lock);
3694
ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
3695
__ceph_touch_fmode(ci, mdsc, fmode);
3696
spin_unlock(&ci->i_ceph_lock);
3697
}
3698
if (req->r_old_dentry_dir)
3699
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
3700
CEPH_CAP_PIN);
3701
3702
if (req->r_inode) {
3703
err = ceph_wait_on_async_create(req->r_inode);
3704
if (err) {
3705
doutc(cl, "wait for async create returned: %d\n", err);
3706
return err;
3707
}
3708
}
3709
3710
if (!err && req->r_old_inode) {
3711
err = ceph_wait_on_async_create(req->r_old_inode);
3712
if (err) {
3713
doutc(cl, "wait for async create returned: %d\n", err);
3714
return err;
3715
}
3716
}
3717
3718
doutc(cl, "submit_request on %p for inode %p\n", req, dir);
3719
mutex_lock(&mdsc->mutex);
3720
__register_request(mdsc, req, dir);
3721
__do_request(mdsc, req);
3722
err = req->r_err;
3723
mutex_unlock(&mdsc->mutex);
3724
return err;
3725
}
3726
3727
int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3728
struct ceph_mds_request *req,
3729
ceph_mds_request_wait_callback_t wait_func)
3730
{
3731
struct ceph_client *cl = mdsc->fsc->client;
3732
int err;
3733
3734
/* wait */
3735
doutc(cl, "do_request waiting\n");
3736
if (wait_func) {
3737
err = wait_func(mdsc, req);
3738
} else {
3739
long timeleft = wait_for_completion_killable_timeout(
3740
&req->r_completion,
3741
ceph_timeout_jiffies(req->r_timeout));
3742
if (timeleft > 0)
3743
err = 0;
3744
else if (!timeleft)
3745
err = -ETIMEDOUT; /* timed out */
3746
else
3747
err = timeleft; /* killed */
3748
}
3749
doutc(cl, "do_request waited, got %d\n", err);
3750
mutex_lock(&mdsc->mutex);
3751
3752
/* only abort if we didn't race with a real reply */
3753
if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3754
err = le32_to_cpu(req->r_reply_info.head->result);
3755
} else if (err < 0) {
3756
doutc(cl, "aborted request %lld with %d\n", req->r_tid, err);
3757
3758
/*
3759
* ensure we aren't running concurrently with
3760
* ceph_fill_trace or ceph_readdir_prepopulate, which
3761
* rely on locks (dir mutex) held by our caller.
3762
*/
3763
mutex_lock(&req->r_fill_mutex);
3764
req->r_err = err;
3765
set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3766
mutex_unlock(&req->r_fill_mutex);
3767
3768
if (req->r_parent &&
3769
(req->r_op & CEPH_MDS_OP_WRITE))
3770
ceph_invalidate_dir_request(req);
3771
} else {
3772
err = req->r_err;
3773
}
3774
3775
mutex_unlock(&mdsc->mutex);
3776
return err;
3777
}
3778
3779
/*
3780
* Synchrously perform an mds request. Take care of all of the
3781
* session setup, forwarding, retry details.
3782
*/
3783
int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3784
struct inode *dir,
3785
struct ceph_mds_request *req)
3786
{
3787
struct ceph_client *cl = mdsc->fsc->client;
3788
int err;
3789
3790
doutc(cl, "do_request on %p\n", req);
3791
3792
/* issue */
3793
err = ceph_mdsc_submit_request(mdsc, dir, req);
3794
if (!err)
3795
err = ceph_mdsc_wait_request(mdsc, req, NULL);
3796
doutc(cl, "do_request %p done, result %d\n", req, err);
3797
return err;
3798
}
3799
3800
/*
3801
* Invalidate dir's completeness, dentry lease state on an aborted MDS
3802
* namespace request.
3803
*/
3804
void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3805
{
3806
struct inode *dir = req->r_parent;
3807
struct inode *old_dir = req->r_old_dentry_dir;
3808
struct ceph_client *cl = req->r_mdsc->fsc->client;
3809
3810
doutc(cl, "invalidate_dir_request %p %p (complete, lease(s))\n",
3811
dir, old_dir);
3812
3813
ceph_dir_clear_complete(dir);
3814
if (old_dir)
3815
ceph_dir_clear_complete(old_dir);
3816
if (req->r_dentry)
3817
ceph_invalidate_dentry_lease(req->r_dentry);
3818
if (req->r_old_dentry)
3819
ceph_invalidate_dentry_lease(req->r_old_dentry);
3820
}
3821
3822
/*
3823
* Handle mds reply.
3824
*
3825
* We take the session mutex and parse and process the reply immediately.
3826
* This preserves the logical ordering of replies, capabilities, etc., sent
3827
* by the MDS as they are applied to our local cache.
3828
*/
3829
static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3830
{
3831
struct ceph_mds_client *mdsc = session->s_mdsc;
3832
struct ceph_client *cl = mdsc->fsc->client;
3833
struct ceph_mds_request *req;
3834
struct ceph_mds_reply_head *head = msg->front.iov_base;
3835
struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
3836
struct ceph_snap_realm *realm;
3837
u64 tid;
3838
int err, result;
3839
int mds = session->s_mds;
3840
bool close_sessions = false;
3841
3842
if (msg->front.iov_len < sizeof(*head)) {
3843
pr_err_client(cl, "got corrupt (short) reply\n");
3844
ceph_msg_dump(msg);
3845
return;
3846
}
3847
3848
/* get request, session */
3849
tid = le64_to_cpu(msg->hdr.tid);
3850
mutex_lock(&mdsc->mutex);
3851
req = lookup_get_request(mdsc, tid);
3852
if (!req) {
3853
doutc(cl, "on unknown tid %llu\n", tid);
3854
mutex_unlock(&mdsc->mutex);
3855
return;
3856
}
3857
doutc(cl, "handle_reply %p\n", req);
3858
3859
/* correct session? */
3860
if (req->r_session != session) {
3861
pr_err_client(cl, "got %llu on session mds%d not mds%d\n",
3862
tid, session->s_mds,
3863
req->r_session ? req->r_session->s_mds : -1);
3864
mutex_unlock(&mdsc->mutex);
3865
goto out;
3866
}
3867
3868
/* dup? */
3869
if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3870
(test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3871
pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n",
3872
head->safe ? "safe" : "unsafe", tid, mds);
3873
mutex_unlock(&mdsc->mutex);
3874
goto out;
3875
}
3876
if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3877
pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n",
3878
tid, mds);
3879
mutex_unlock(&mdsc->mutex);
3880
goto out;
3881
}
3882
3883
result = le32_to_cpu(head->result);
3884
3885
if (head->safe) {
3886
set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3887
__unregister_request(mdsc, req);
3888
3889
/* last request during umount? */
3890
if (mdsc->stopping && !__get_oldest_req(mdsc))
3891
complete_all(&mdsc->safe_umount_waiters);
3892
3893
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3894
/*
3895
* We already handled the unsafe response, now do the
3896
* cleanup. No need to examine the response; the MDS
3897
* doesn't include any result info in the safe
3898
* response. And even if it did, there is nothing
3899
* useful we could do with a revised return value.
3900
*/
3901
doutc(cl, "got safe reply %llu, mds%d\n", tid, mds);
3902
3903
mutex_unlock(&mdsc->mutex);
3904
goto out;
3905
}
3906
} else {
3907
set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3908
list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3909
}
3910
3911
doutc(cl, "tid %lld result %d\n", tid, result);
3912
if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3913
err = parse_reply_info(session, msg, req, (u64)-1);
3914
else
3915
err = parse_reply_info(session, msg, req,
3916
session->s_con.peer_features);
3917
mutex_unlock(&mdsc->mutex);
3918
3919
/* Must find target inode outside of mutexes to avoid deadlocks */
3920
rinfo = &req->r_reply_info;
3921
if ((err >= 0) && rinfo->head->is_target) {
3922
struct inode *in = xchg(&req->r_new_inode, NULL);
3923
struct ceph_vino tvino = {
3924
.ino = le64_to_cpu(rinfo->targeti.in->ino),
3925
.snap = le64_to_cpu(rinfo->targeti.in->snapid)
3926
};
3927
3928
/*
3929
* If we ended up opening an existing inode, discard
3930
* r_new_inode
3931
*/
3932
if (req->r_op == CEPH_MDS_OP_CREATE &&
3933
!req->r_reply_info.has_create_ino) {
3934
/* This should never happen on an async create */
3935
WARN_ON_ONCE(req->r_deleg_ino);
3936
iput(in);
3937
in = NULL;
3938
}
3939
3940
in = ceph_get_inode(mdsc->fsc->sb, tvino, in);
3941
if (IS_ERR(in)) {
3942
err = PTR_ERR(in);
3943
mutex_lock(&session->s_mutex);
3944
goto out_err;
3945
}
3946
req->r_target_inode = in;
3947
}
3948
3949
mutex_lock(&session->s_mutex);
3950
if (err < 0) {
3951
pr_err_client(cl, "got corrupt reply mds%d(tid:%lld)\n",
3952
mds, tid);
3953
ceph_msg_dump(msg);
3954
goto out_err;
3955
}
3956
3957
/* snap trace */
3958
realm = NULL;
3959
if (rinfo->snapblob_len) {
3960
down_write(&mdsc->snap_rwsem);
3961
err = ceph_update_snap_trace(mdsc, rinfo->snapblob,
3962
rinfo->snapblob + rinfo->snapblob_len,
3963
le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3964
&realm);
3965
if (err) {
3966
up_write(&mdsc->snap_rwsem);
3967
close_sessions = true;
3968
if (err == -EIO)
3969
ceph_msg_dump(msg);
3970
goto out_err;
3971
}
3972
downgrade_write(&mdsc->snap_rwsem);
3973
} else {
3974
down_read(&mdsc->snap_rwsem);
3975
}
3976
3977
/* insert trace into our cache */
3978
mutex_lock(&req->r_fill_mutex);
3979
current->journal_info = req;
3980
err = ceph_fill_trace(mdsc->fsc->sb, req);
3981
if (err == 0) {
3982
if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3983
req->r_op == CEPH_MDS_OP_LSSNAP))
3984
err = ceph_readdir_prepopulate(req, req->r_session);
3985
}
3986
current->journal_info = NULL;
3987
mutex_unlock(&req->r_fill_mutex);
3988
3989
up_read(&mdsc->snap_rwsem);
3990
if (realm)
3991
ceph_put_snap_realm(mdsc, realm);
3992
3993
if (err == 0) {
3994
if (req->r_target_inode &&
3995
test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3996
struct ceph_inode_info *ci =
3997
ceph_inode(req->r_target_inode);
3998
spin_lock(&ci->i_unsafe_lock);
3999
list_add_tail(&req->r_unsafe_target_item,
4000
&ci->i_unsafe_iops);
4001
spin_unlock(&ci->i_unsafe_lock);
4002
}
4003
4004
ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
4005
}
4006
out_err:
4007
mutex_lock(&mdsc->mutex);
4008
if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
4009
if (err) {
4010
req->r_err = err;
4011
} else {
4012
req->r_reply = ceph_msg_get(msg);
4013
set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
4014
}
4015
} else {
4016
doutc(cl, "reply arrived after request %lld was aborted\n", tid);
4017
}
4018
mutex_unlock(&mdsc->mutex);
4019
4020
mutex_unlock(&session->s_mutex);
4021
4022
/* kick calling process */
4023
complete_request(mdsc, req);
4024
4025
ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
4026
req->r_end_latency, err);
4027
out:
4028
ceph_mdsc_put_request(req);
4029
4030
/* Defer closing the sessions after s_mutex lock being released */
4031
if (close_sessions)
4032
ceph_mdsc_close_sessions(mdsc);
4033
return;
4034
}
4035
4036
4037
4038
/*
4039
* handle mds notification that our request has been forwarded.
4040
*/
4041
static void handle_forward(struct ceph_mds_client *mdsc,
4042
struct ceph_mds_session *session,
4043
struct ceph_msg *msg)
4044
{
4045
struct ceph_client *cl = mdsc->fsc->client;
4046
struct ceph_mds_request *req;
4047
u64 tid = le64_to_cpu(msg->hdr.tid);
4048
u32 next_mds;
4049
u32 fwd_seq;
4050
int err = -EINVAL;
4051
void *p = msg->front.iov_base;
4052
void *end = p + msg->front.iov_len;
4053
bool aborted = false;
4054
4055
ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4056
next_mds = ceph_decode_32(&p);
4057
fwd_seq = ceph_decode_32(&p);
4058
4059
mutex_lock(&mdsc->mutex);
4060
req = lookup_get_request(mdsc, tid);
4061
if (!req) {
4062
mutex_unlock(&mdsc->mutex);
4063
doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds);
4064
return; /* dup reply? */
4065
}
4066
4067
if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
4068
doutc(cl, "forward tid %llu aborted, unregistering\n", tid);
4069
__unregister_request(mdsc, req);
4070
} else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) {
4071
/*
4072
* Avoid infinite retrying after overflow.
4073
*
4074
* The MDS will increase the fwd count and in client side
4075
* if the num_fwd is less than the one saved in request
4076
* that means the MDS is an old version and overflowed of
4077
* 8 bits.
4078
*/
4079
mutex_lock(&req->r_fill_mutex);
4080
req->r_err = -EMULTIHOP;
4081
set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
4082
mutex_unlock(&req->r_fill_mutex);
4083
aborted = true;
4084
pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n",
4085
tid);
4086
} else {
4087
/* resend. forward race not possible; mds would drop */
4088
doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds);
4089
BUG_ON(req->r_err);
4090
BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
4091
req->r_attempts = 0;
4092
req->r_num_fwd = fwd_seq;
4093
req->r_resend_mds = next_mds;
4094
put_request_session(req);
4095
__do_request(mdsc, req);
4096
}
4097
mutex_unlock(&mdsc->mutex);
4098
4099
/* kick calling process */
4100
if (aborted)
4101
complete_request(mdsc, req);
4102
ceph_mdsc_put_request(req);
4103
return;
4104
4105
bad:
4106
pr_err_client(cl, "decode error err=%d\n", err);
4107
ceph_msg_dump(msg);
4108
}
4109
4110
static int __decode_session_metadata(void **p, void *end,
4111
bool *blocklisted)
4112
{
4113
/* map<string,string> */
4114
u32 n;
4115
bool err_str;
4116
ceph_decode_32_safe(p, end, n, bad);
4117
while (n-- > 0) {
4118
u32 len;
4119
ceph_decode_32_safe(p, end, len, bad);
4120
ceph_decode_need(p, end, len, bad);
4121
err_str = !strncmp(*p, "error_string", len);
4122
*p += len;
4123
ceph_decode_32_safe(p, end, len, bad);
4124
ceph_decode_need(p, end, len, bad);
4125
/*
4126
* Match "blocklisted (blacklisted)" from newer MDSes,
4127
* or "blacklisted" from older MDSes.
4128
*/
4129
if (err_str && strnstr(*p, "blacklisted", len))
4130
*blocklisted = true;
4131
*p += len;
4132
}
4133
return 0;
4134
bad:
4135
return -1;
4136
}
4137
4138
/*
4139
* handle a mds session control message
4140
*/
4141
static void handle_session(struct ceph_mds_session *session,
4142
struct ceph_msg *msg)
4143
{
4144
struct ceph_mds_client *mdsc = session->s_mdsc;
4145
struct ceph_client *cl = mdsc->fsc->client;
4146
int mds = session->s_mds;
4147
int msg_version = le16_to_cpu(msg->hdr.version);
4148
void *p = msg->front.iov_base;
4149
void *end = p + msg->front.iov_len;
4150
struct ceph_mds_session_head *h;
4151
struct ceph_mds_cap_auth *cap_auths = NULL;
4152
u32 op, cap_auths_num = 0;
4153
u64 seq, features = 0;
4154
int wake = 0;
4155
bool blocklisted = false;
4156
u32 i;
4157
4158
4159
/* decode */
4160
ceph_decode_need(&p, end, sizeof(*h), bad);
4161
h = p;
4162
p += sizeof(*h);
4163
4164
op = le32_to_cpu(h->op);
4165
seq = le64_to_cpu(h->seq);
4166
4167
if (msg_version >= 3) {
4168
u32 len;
4169
/* version >= 2 and < 5, decode metadata, skip otherwise
4170
* as it's handled via flags.
4171
*/
4172
if (msg_version >= 5)
4173
ceph_decode_skip_map(&p, end, string, string, bad);
4174
else if (__decode_session_metadata(&p, end, &blocklisted) < 0)
4175
goto bad;
4176
4177
/* version >= 3, feature bits */
4178
ceph_decode_32_safe(&p, end, len, bad);
4179
if (len) {
4180
ceph_decode_64_safe(&p, end, features, bad);
4181
p += len - sizeof(features);
4182
}
4183
}
4184
4185
if (msg_version >= 5) {
4186
u32 flags, len;
4187
4188
/* version >= 4 */
4189
ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */
4190
ceph_decode_32_safe(&p, end, len, bad); /* len */
4191
ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */
4192
4193
/* version >= 5, flags */
4194
ceph_decode_32_safe(&p, end, flags, bad);
4195
if (flags & CEPH_SESSION_BLOCKLISTED) {
4196
pr_warn_client(cl, "mds%d session blocklisted\n",
4197
session->s_mds);
4198
blocklisted = true;
4199
}
4200
}
4201
4202
if (msg_version >= 6) {
4203
ceph_decode_32_safe(&p, end, cap_auths_num, bad);
4204
doutc(cl, "cap_auths_num %d\n", cap_auths_num);
4205
4206
if (cap_auths_num && op != CEPH_SESSION_OPEN) {
4207
WARN_ON_ONCE(op != CEPH_SESSION_OPEN);
4208
goto skip_cap_auths;
4209
}
4210
4211
cap_auths = kcalloc(cap_auths_num,
4212
sizeof(struct ceph_mds_cap_auth),
4213
GFP_KERNEL);
4214
if (!cap_auths) {
4215
pr_err_client(cl, "No memory for cap_auths\n");
4216
return;
4217
}
4218
4219
for (i = 0; i < cap_auths_num; i++) {
4220
u32 _len, j;
4221
4222
/* struct_v, struct_compat, and struct_len in MDSCapAuth */
4223
ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad);
4224
4225
/* struct_v, struct_compat, and struct_len in MDSCapMatch */
4226
ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad);
4227
ceph_decode_64_safe(&p, end, cap_auths[i].match.uid, bad);
4228
ceph_decode_32_safe(&p, end, _len, bad);
4229
if (_len) {
4230
cap_auths[i].match.gids = kcalloc(_len, sizeof(u32),
4231
GFP_KERNEL);
4232
if (!cap_auths[i].match.gids) {
4233
pr_err_client(cl, "No memory for gids\n");
4234
goto fail;
4235
}
4236
4237
cap_auths[i].match.num_gids = _len;
4238
for (j = 0; j < _len; j++)
4239
ceph_decode_32_safe(&p, end,
4240
cap_auths[i].match.gids[j],
4241
bad);
4242
}
4243
4244
ceph_decode_32_safe(&p, end, _len, bad);
4245
if (_len) {
4246
cap_auths[i].match.path = kcalloc(_len + 1, sizeof(char),
4247
GFP_KERNEL);
4248
if (!cap_auths[i].match.path) {
4249
pr_err_client(cl, "No memory for path\n");
4250
goto fail;
4251
}
4252
ceph_decode_copy(&p, cap_auths[i].match.path, _len);
4253
4254
/* Remove the tailing '/' */
4255
while (_len && cap_auths[i].match.path[_len - 1] == '/') {
4256
cap_auths[i].match.path[_len - 1] = '\0';
4257
_len -= 1;
4258
}
4259
}
4260
4261
ceph_decode_32_safe(&p, end, _len, bad);
4262
if (_len) {
4263
cap_auths[i].match.fs_name = kcalloc(_len + 1, sizeof(char),
4264
GFP_KERNEL);
4265
if (!cap_auths[i].match.fs_name) {
4266
pr_err_client(cl, "No memory for fs_name\n");
4267
goto fail;
4268
}
4269
ceph_decode_copy(&p, cap_auths[i].match.fs_name, _len);
4270
}
4271
4272
ceph_decode_8_safe(&p, end, cap_auths[i].match.root_squash, bad);
4273
ceph_decode_8_safe(&p, end, cap_auths[i].readable, bad);
4274
ceph_decode_8_safe(&p, end, cap_auths[i].writeable, bad);
4275
doutc(cl, "uid %lld, num_gids %u, path %s, fs_name %s, root_squash %d, readable %d, writeable %d\n",
4276
cap_auths[i].match.uid, cap_auths[i].match.num_gids,
4277
cap_auths[i].match.path, cap_auths[i].match.fs_name,
4278
cap_auths[i].match.root_squash,
4279
cap_auths[i].readable, cap_auths[i].writeable);
4280
}
4281
}
4282
4283
skip_cap_auths:
4284
mutex_lock(&mdsc->mutex);
4285
if (op == CEPH_SESSION_OPEN) {
4286
if (mdsc->s_cap_auths) {
4287
for (i = 0; i < mdsc->s_cap_auths_num; i++) {
4288
kfree(mdsc->s_cap_auths[i].match.gids);
4289
kfree(mdsc->s_cap_auths[i].match.path);
4290
kfree(mdsc->s_cap_auths[i].match.fs_name);
4291
}
4292
kfree(mdsc->s_cap_auths);
4293
}
4294
mdsc->s_cap_auths_num = cap_auths_num;
4295
mdsc->s_cap_auths = cap_auths;
4296
}
4297
if (op == CEPH_SESSION_CLOSE) {
4298
ceph_get_mds_session(session);
4299
__unregister_session(mdsc, session);
4300
}
4301
/* FIXME: this ttl calculation is generous */
4302
session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
4303
mutex_unlock(&mdsc->mutex);
4304
4305
mutex_lock(&session->s_mutex);
4306
4307
doutc(cl, "mds%d %s %p state %s seq %llu\n", mds,
4308
ceph_session_op_name(op), session,
4309
ceph_session_state_name(session->s_state), seq);
4310
4311
if (session->s_state == CEPH_MDS_SESSION_HUNG) {
4312
session->s_state = CEPH_MDS_SESSION_OPEN;
4313
pr_info_client(cl, "mds%d came back\n", session->s_mds);
4314
}
4315
4316
switch (op) {
4317
case CEPH_SESSION_OPEN:
4318
if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4319
pr_info_client(cl, "mds%d reconnect success\n",
4320
session->s_mds);
4321
4322
session->s_features = features;
4323
if (session->s_state == CEPH_MDS_SESSION_OPEN) {
4324
pr_notice_client(cl, "mds%d is already opened\n",
4325
session->s_mds);
4326
} else {
4327
session->s_state = CEPH_MDS_SESSION_OPEN;
4328
renewed_caps(mdsc, session, 0);
4329
if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT,
4330
&session->s_features))
4331
metric_schedule_delayed(&mdsc->metric);
4332
}
4333
4334
/*
4335
* The connection maybe broken and the session in client
4336
* side has been reinitialized, need to update the seq
4337
* anyway.
4338
*/
4339
if (!session->s_seq && seq)
4340
session->s_seq = seq;
4341
4342
wake = 1;
4343
if (mdsc->stopping)
4344
__close_session(mdsc, session);
4345
break;
4346
4347
case CEPH_SESSION_RENEWCAPS:
4348
if (session->s_renew_seq == seq)
4349
renewed_caps(mdsc, session, 1);
4350
break;
4351
4352
case CEPH_SESSION_CLOSE:
4353
if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4354
pr_info_client(cl, "mds%d reconnect denied\n",
4355
session->s_mds);
4356
session->s_state = CEPH_MDS_SESSION_CLOSED;
4357
cleanup_session_requests(mdsc, session);
4358
remove_session_caps(session);
4359
wake = 2; /* for good measure */
4360
wake_up_all(&mdsc->session_close_wq);
4361
break;
4362
4363
case CEPH_SESSION_STALE:
4364
pr_info_client(cl, "mds%d caps went stale, renewing\n",
4365
session->s_mds);
4366
atomic_inc(&session->s_cap_gen);
4367
session->s_cap_ttl = jiffies - 1;
4368
send_renew_caps(mdsc, session);
4369
break;
4370
4371
case CEPH_SESSION_RECALL_STATE:
4372
ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
4373
break;
4374
4375
case CEPH_SESSION_FLUSHMSG:
4376
/* flush cap releases */
4377
spin_lock(&session->s_cap_lock);
4378
if (session->s_num_cap_releases)
4379
ceph_flush_session_cap_releases(mdsc, session);
4380
spin_unlock(&session->s_cap_lock);
4381
4382
send_flushmsg_ack(mdsc, session, seq);
4383
break;
4384
4385
case CEPH_SESSION_FORCE_RO:
4386
doutc(cl, "force_session_readonly %p\n", session);
4387
spin_lock(&session->s_cap_lock);
4388
session->s_readonly = true;
4389
spin_unlock(&session->s_cap_lock);
4390
wake_up_session_caps(session, FORCE_RO);
4391
break;
4392
4393
case CEPH_SESSION_REJECT:
4394
WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
4395
pr_info_client(cl, "mds%d rejected session\n",
4396
session->s_mds);
4397
session->s_state = CEPH_MDS_SESSION_REJECTED;
4398
cleanup_session_requests(mdsc, session);
4399
remove_session_caps(session);
4400
if (blocklisted)
4401
mdsc->fsc->blocklisted = true;
4402
wake = 2; /* for good measure */
4403
break;
4404
4405
default:
4406
pr_err_client(cl, "bad op %d mds%d\n", op, mds);
4407
WARN_ON(1);
4408
}
4409
4410
mutex_unlock(&session->s_mutex);
4411
if (wake) {
4412
mutex_lock(&mdsc->mutex);
4413
__wake_requests(mdsc, &session->s_waiting);
4414
if (wake == 2)
4415
kick_requests(mdsc, mds);
4416
mutex_unlock(&mdsc->mutex);
4417
}
4418
if (op == CEPH_SESSION_CLOSE)
4419
ceph_put_mds_session(session);
4420
return;
4421
4422
bad:
4423
pr_err_client(cl, "corrupt message mds%d len %d\n", mds,
4424
(int)msg->front.iov_len);
4425
ceph_msg_dump(msg);
4426
fail:
4427
for (i = 0; i < cap_auths_num; i++) {
4428
kfree(cap_auths[i].match.gids);
4429
kfree(cap_auths[i].match.path);
4430
kfree(cap_auths[i].match.fs_name);
4431
}
4432
kfree(cap_auths);
4433
return;
4434
}
4435
4436
void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
4437
{
4438
struct ceph_client *cl = req->r_mdsc->fsc->client;
4439
int dcaps;
4440
4441
dcaps = xchg(&req->r_dir_caps, 0);
4442
if (dcaps) {
4443
doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4444
ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
4445
}
4446
}
4447
4448
void ceph_mdsc_release_dir_caps_async(struct ceph_mds_request *req)
4449
{
4450
struct ceph_client *cl = req->r_mdsc->fsc->client;
4451
int dcaps;
4452
4453
dcaps = xchg(&req->r_dir_caps, 0);
4454
if (dcaps) {
4455
doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4456
ceph_put_cap_refs_async(ceph_inode(req->r_parent), dcaps);
4457
}
4458
}
4459
4460
/*
4461
* called under session->mutex.
4462
*/
4463
static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
4464
struct ceph_mds_session *session)
4465
{
4466
struct ceph_mds_request *req, *nreq;
4467
struct rb_node *p;
4468
4469
doutc(mdsc->fsc->client, "mds%d\n", session->s_mds);
4470
4471
mutex_lock(&mdsc->mutex);
4472
list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
4473
__send_request(session, req, true);
4474
4475
/*
4476
* also re-send old requests when MDS enters reconnect stage. So that MDS
4477
* can process completed request in clientreplay stage.
4478
*/
4479
p = rb_first(&mdsc->request_tree);
4480
while (p) {
4481
req = rb_entry(p, struct ceph_mds_request, r_node);
4482
p = rb_next(p);
4483
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
4484
continue;
4485
if (req->r_attempts == 0)
4486
continue; /* only old requests */
4487
if (!req->r_session)
4488
continue;
4489
if (req->r_session->s_mds != session->s_mds)
4490
continue;
4491
4492
ceph_mdsc_release_dir_caps_async(req);
4493
4494
__send_request(session, req, true);
4495
}
4496
mutex_unlock(&mdsc->mutex);
4497
}
4498
4499
static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
4500
{
4501
struct ceph_msg *reply;
4502
struct ceph_pagelist *_pagelist;
4503
struct page *page;
4504
__le32 *addr;
4505
int err = -ENOMEM;
4506
4507
if (!recon_state->allow_multi)
4508
return -ENOSPC;
4509
4510
/* can't handle message that contains both caps and realm */
4511
BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
4512
4513
/* pre-allocate new pagelist */
4514
_pagelist = ceph_pagelist_alloc(GFP_NOFS);
4515
if (!_pagelist)
4516
return -ENOMEM;
4517
4518
reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4519
if (!reply)
4520
goto fail_msg;
4521
4522
/* placeholder for nr_caps */
4523
err = ceph_pagelist_encode_32(_pagelist, 0);
4524
if (err < 0)
4525
goto fail;
4526
4527
if (recon_state->nr_caps) {
4528
/* currently encoding caps */
4529
err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
4530
if (err)
4531
goto fail;
4532
} else {
4533
/* placeholder for nr_realms (currently encoding relams) */
4534
err = ceph_pagelist_encode_32(_pagelist, 0);
4535
if (err < 0)
4536
goto fail;
4537
}
4538
4539
err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
4540
if (err)
4541
goto fail;
4542
4543
page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
4544
addr = kmap_atomic(page);
4545
if (recon_state->nr_caps) {
4546
/* currently encoding caps */
4547
*addr = cpu_to_le32(recon_state->nr_caps);
4548
} else {
4549
/* currently encoding relams */
4550
*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
4551
}
4552
kunmap_atomic(addr);
4553
4554
reply->hdr.version = cpu_to_le16(5);
4555
reply->hdr.compat_version = cpu_to_le16(4);
4556
4557
reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
4558
ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
4559
4560
ceph_con_send(&recon_state->session->s_con, reply);
4561
ceph_pagelist_release(recon_state->pagelist);
4562
4563
recon_state->pagelist = _pagelist;
4564
recon_state->nr_caps = 0;
4565
recon_state->nr_realms = 0;
4566
recon_state->msg_version = 5;
4567
return 0;
4568
fail:
4569
ceph_msg_put(reply);
4570
fail_msg:
4571
ceph_pagelist_release(_pagelist);
4572
return err;
4573
}
4574
4575
static struct dentry* d_find_primary(struct inode *inode)
4576
{
4577
struct dentry *alias, *dn = NULL;
4578
4579
if (hlist_empty(&inode->i_dentry))
4580
return NULL;
4581
4582
spin_lock(&inode->i_lock);
4583
if (hlist_empty(&inode->i_dentry))
4584
goto out_unlock;
4585
4586
if (S_ISDIR(inode->i_mode)) {
4587
alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
4588
if (!IS_ROOT(alias))
4589
dn = dget(alias);
4590
goto out_unlock;
4591
}
4592
4593
hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
4594
spin_lock(&alias->d_lock);
4595
if (!d_unhashed(alias) &&
4596
(ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
4597
dn = dget_dlock(alias);
4598
}
4599
spin_unlock(&alias->d_lock);
4600
if (dn)
4601
break;
4602
}
4603
out_unlock:
4604
spin_unlock(&inode->i_lock);
4605
return dn;
4606
}
4607
4608
/*
4609
* Encode information about a cap for a reconnect with the MDS.
4610
*/
4611
static int reconnect_caps_cb(struct inode *inode, int mds, void *arg)
4612
{
4613
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
4614
struct ceph_client *cl = ceph_inode_to_client(inode);
4615
union {
4616
struct ceph_mds_cap_reconnect v2;
4617
struct ceph_mds_cap_reconnect_v1 v1;
4618
} rec;
4619
struct ceph_inode_info *ci = ceph_inode(inode);
4620
struct ceph_reconnect_state *recon_state = arg;
4621
struct ceph_pagelist *pagelist = recon_state->pagelist;
4622
struct dentry *dentry;
4623
struct ceph_cap *cap;
4624
struct ceph_path_info path_info = {0};
4625
int err;
4626
u64 snap_follows;
4627
4628
dentry = d_find_primary(inode);
4629
if (dentry) {
4630
/* set pathbase to parent dir when msg_version >= 2 */
4631
char *path = ceph_mdsc_build_path(mdsc, dentry, &path_info,
4632
recon_state->msg_version >= 2);
4633
dput(dentry);
4634
if (IS_ERR(path)) {
4635
err = PTR_ERR(path);
4636
goto out_err;
4637
}
4638
}
4639
4640
spin_lock(&ci->i_ceph_lock);
4641
cap = __get_cap_for_mds(ci, mds);
4642
if (!cap) {
4643
spin_unlock(&ci->i_ceph_lock);
4644
err = 0;
4645
goto out_err;
4646
}
4647
doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode,
4648
ceph_vinop(inode), cap, cap->cap_id,
4649
ceph_cap_string(cap->issued));
4650
4651
cap->seq = 0; /* reset cap seq */
4652
cap->issue_seq = 0; /* and issue_seq */
4653
cap->mseq = 0; /* and migrate_seq */
4654
cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
4655
4656
/* These are lost when the session goes away */
4657
if (S_ISDIR(inode->i_mode)) {
4658
if (cap->issued & CEPH_CAP_DIR_CREATE) {
4659
ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
4660
memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
4661
}
4662
cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
4663
}
4664
4665
if (recon_state->msg_version >= 2) {
4666
rec.v2.cap_id = cpu_to_le64(cap->cap_id);
4667
rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4668
rec.v2.issued = cpu_to_le32(cap->issued);
4669
rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4670
rec.v2.pathbase = cpu_to_le64(path_info.vino.ino);
4671
rec.v2.flock_len = (__force __le32)
4672
((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
4673
} else {
4674
struct timespec64 ts;
4675
4676
rec.v1.cap_id = cpu_to_le64(cap->cap_id);
4677
rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4678
rec.v1.issued = cpu_to_le32(cap->issued);
4679
rec.v1.size = cpu_to_le64(i_size_read(inode));
4680
ts = inode_get_mtime(inode);
4681
ceph_encode_timespec64(&rec.v1.mtime, &ts);
4682
ts = inode_get_atime(inode);
4683
ceph_encode_timespec64(&rec.v1.atime, &ts);
4684
rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4685
rec.v1.pathbase = cpu_to_le64(path_info.vino.ino);
4686
}
4687
4688
if (list_empty(&ci->i_cap_snaps)) {
4689
snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
4690
} else {
4691
struct ceph_cap_snap *capsnap =
4692
list_first_entry(&ci->i_cap_snaps,
4693
struct ceph_cap_snap, ci_item);
4694
snap_follows = capsnap->follows;
4695
}
4696
spin_unlock(&ci->i_ceph_lock);
4697
4698
if (recon_state->msg_version >= 2) {
4699
int num_fcntl_locks, num_flock_locks;
4700
struct ceph_filelock *flocks = NULL;
4701
size_t struct_len, total_len = sizeof(u64);
4702
u8 struct_v = 0;
4703
4704
encode_again:
4705
if (rec.v2.flock_len) {
4706
ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
4707
} else {
4708
num_fcntl_locks = 0;
4709
num_flock_locks = 0;
4710
}
4711
if (num_fcntl_locks + num_flock_locks > 0) {
4712
flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
4713
sizeof(struct ceph_filelock),
4714
GFP_NOFS);
4715
if (!flocks) {
4716
err = -ENOMEM;
4717
goto out_err;
4718
}
4719
err = ceph_encode_locks_to_buffer(inode, flocks,
4720
num_fcntl_locks,
4721
num_flock_locks);
4722
if (err) {
4723
kfree(flocks);
4724
flocks = NULL;
4725
if (err == -ENOSPC)
4726
goto encode_again;
4727
goto out_err;
4728
}
4729
} else {
4730
kfree(flocks);
4731
flocks = NULL;
4732
}
4733
4734
if (recon_state->msg_version >= 3) {
4735
/* version, compat_version and struct_len */
4736
total_len += 2 * sizeof(u8) + sizeof(u32);
4737
struct_v = 2;
4738
}
4739
/*
4740
* number of encoded locks is stable, so copy to pagelist
4741
*/
4742
struct_len = 2 * sizeof(u32) +
4743
(num_fcntl_locks + num_flock_locks) *
4744
sizeof(struct ceph_filelock);
4745
rec.v2.flock_len = cpu_to_le32(struct_len);
4746
4747
struct_len += sizeof(u32) + path_info.pathlen + sizeof(rec.v2);
4748
4749
if (struct_v >= 2)
4750
struct_len += sizeof(u64); /* snap_follows */
4751
4752
total_len += struct_len;
4753
4754
if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
4755
err = send_reconnect_partial(recon_state);
4756
if (err)
4757
goto out_freeflocks;
4758
pagelist = recon_state->pagelist;
4759
}
4760
4761
err = ceph_pagelist_reserve(pagelist, total_len);
4762
if (err)
4763
goto out_freeflocks;
4764
4765
ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4766
if (recon_state->msg_version >= 3) {
4767
ceph_pagelist_encode_8(pagelist, struct_v);
4768
ceph_pagelist_encode_8(pagelist, 1);
4769
ceph_pagelist_encode_32(pagelist, struct_len);
4770
}
4771
ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen);
4772
ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
4773
ceph_locks_to_pagelist(flocks, pagelist,
4774
num_fcntl_locks, num_flock_locks);
4775
if (struct_v >= 2)
4776
ceph_pagelist_encode_64(pagelist, snap_follows);
4777
out_freeflocks:
4778
kfree(flocks);
4779
} else {
4780
err = ceph_pagelist_reserve(pagelist,
4781
sizeof(u64) + sizeof(u32) +
4782
path_info.pathlen + sizeof(rec.v1));
4783
if (err)
4784
goto out_err;
4785
4786
ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4787
ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen);
4788
ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
4789
}
4790
4791
out_err:
4792
ceph_mdsc_free_path_info(&path_info);
4793
if (!err)
4794
recon_state->nr_caps++;
4795
return err;
4796
}
4797
4798
static int encode_snap_realms(struct ceph_mds_client *mdsc,
4799
struct ceph_reconnect_state *recon_state)
4800
{
4801
struct rb_node *p;
4802
struct ceph_pagelist *pagelist = recon_state->pagelist;
4803
struct ceph_client *cl = mdsc->fsc->client;
4804
int err = 0;
4805
4806
if (recon_state->msg_version >= 4) {
4807
err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
4808
if (err < 0)
4809
goto fail;
4810
}
4811
4812
/*
4813
* snaprealms. we provide mds with the ino, seq (version), and
4814
* parent for all of our realms. If the mds has any newer info,
4815
* it will tell us.
4816
*/
4817
for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
4818
struct ceph_snap_realm *realm =
4819
rb_entry(p, struct ceph_snap_realm, node);
4820
struct ceph_mds_snaprealm_reconnect sr_rec;
4821
4822
if (recon_state->msg_version >= 4) {
4823
size_t need = sizeof(u8) * 2 + sizeof(u32) +
4824
sizeof(sr_rec);
4825
4826
if (pagelist->length + need > RECONNECT_MAX_SIZE) {
4827
err = send_reconnect_partial(recon_state);
4828
if (err)
4829
goto fail;
4830
pagelist = recon_state->pagelist;
4831
}
4832
4833
err = ceph_pagelist_reserve(pagelist, need);
4834
if (err)
4835
goto fail;
4836
4837
ceph_pagelist_encode_8(pagelist, 1);
4838
ceph_pagelist_encode_8(pagelist, 1);
4839
ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
4840
}
4841
4842
doutc(cl, " adding snap realm %llx seq %lld parent %llx\n",
4843
realm->ino, realm->seq, realm->parent_ino);
4844
sr_rec.ino = cpu_to_le64(realm->ino);
4845
sr_rec.seq = cpu_to_le64(realm->seq);
4846
sr_rec.parent = cpu_to_le64(realm->parent_ino);
4847
4848
err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
4849
if (err)
4850
goto fail;
4851
4852
recon_state->nr_realms++;
4853
}
4854
fail:
4855
return err;
4856
}
4857
4858
4859
/*
4860
* If an MDS fails and recovers, clients need to reconnect in order to
4861
* reestablish shared state. This includes all caps issued through
4862
* this session _and_ the snap_realm hierarchy. Because it's not
4863
* clear which snap realms the mds cares about, we send everything we
4864
* know about.. that ensures we'll then get any new info the
4865
* recovering MDS might have.
4866
*
4867
* This is a relatively heavyweight operation, but it's rare.
4868
*/
4869
static void send_mds_reconnect(struct ceph_mds_client *mdsc,
4870
struct ceph_mds_session *session)
4871
{
4872
struct ceph_client *cl = mdsc->fsc->client;
4873
struct ceph_msg *reply;
4874
int mds = session->s_mds;
4875
int err = -ENOMEM;
4876
struct ceph_reconnect_state recon_state = {
4877
.session = session,
4878
};
4879
LIST_HEAD(dispose);
4880
4881
pr_info_client(cl, "mds%d reconnect start\n", mds);
4882
4883
recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
4884
if (!recon_state.pagelist)
4885
goto fail_nopagelist;
4886
4887
reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4888
if (!reply)
4889
goto fail_nomsg;
4890
4891
xa_destroy(&session->s_delegated_inos);
4892
4893
mutex_lock(&session->s_mutex);
4894
session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4895
session->s_seq = 0;
4896
4897
doutc(cl, "session %p state %s\n", session,
4898
ceph_session_state_name(session->s_state));
4899
4900
atomic_inc(&session->s_cap_gen);
4901
4902
spin_lock(&session->s_cap_lock);
4903
/* don't know if session is readonly */
4904
session->s_readonly = 0;
4905
/*
4906
* notify __ceph_remove_cap() that we are composing cap reconnect.
4907
* If a cap get released before being added to the cap reconnect,
4908
* __ceph_remove_cap() should skip queuing cap release.
4909
*/
4910
session->s_cap_reconnect = 1;
4911
/* drop old cap expires; we're about to reestablish that state */
4912
detach_cap_releases(session, &dispose);
4913
spin_unlock(&session->s_cap_lock);
4914
dispose_cap_releases(mdsc, &dispose);
4915
4916
/* trim unused caps to reduce MDS's cache rejoin time */
4917
if (mdsc->fsc->sb->s_root)
4918
shrink_dcache_parent(mdsc->fsc->sb->s_root);
4919
4920
ceph_con_close(&session->s_con);
4921
ceph_con_open(&session->s_con,
4922
CEPH_ENTITY_TYPE_MDS, mds,
4923
ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4924
4925
/* replay unsafe requests */
4926
replay_unsafe_requests(mdsc, session);
4927
4928
ceph_early_kick_flushing_caps(mdsc, session);
4929
4930
down_read(&mdsc->snap_rwsem);
4931
4932
/* placeholder for nr_caps */
4933
err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4934
if (err)
4935
goto fail;
4936
4937
if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4938
recon_state.msg_version = 3;
4939
recon_state.allow_multi = true;
4940
} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4941
recon_state.msg_version = 3;
4942
} else {
4943
recon_state.msg_version = 2;
4944
}
4945
/* traverse this session's caps */
4946
err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4947
4948
spin_lock(&session->s_cap_lock);
4949
session->s_cap_reconnect = 0;
4950
spin_unlock(&session->s_cap_lock);
4951
4952
if (err < 0)
4953
goto fail;
4954
4955
/* check if all realms can be encoded into current message */
4956
if (mdsc->num_snap_realms) {
4957
size_t total_len =
4958
recon_state.pagelist->length +
4959
mdsc->num_snap_realms *
4960
sizeof(struct ceph_mds_snaprealm_reconnect);
4961
if (recon_state.msg_version >= 4) {
4962
/* number of realms */
4963
total_len += sizeof(u32);
4964
/* version, compat_version and struct_len */
4965
total_len += mdsc->num_snap_realms *
4966
(2 * sizeof(u8) + sizeof(u32));
4967
}
4968
if (total_len > RECONNECT_MAX_SIZE) {
4969
if (!recon_state.allow_multi) {
4970
err = -ENOSPC;
4971
goto fail;
4972
}
4973
if (recon_state.nr_caps) {
4974
err = send_reconnect_partial(&recon_state);
4975
if (err)
4976
goto fail;
4977
}
4978
recon_state.msg_version = 5;
4979
}
4980
}
4981
4982
err = encode_snap_realms(mdsc, &recon_state);
4983
if (err < 0)
4984
goto fail;
4985
4986
if (recon_state.msg_version >= 5) {
4987
err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4988
if (err < 0)
4989
goto fail;
4990
}
4991
4992
if (recon_state.nr_caps || recon_state.nr_realms) {
4993
struct page *page =
4994
list_first_entry(&recon_state.pagelist->head,
4995
struct page, lru);
4996
__le32 *addr = kmap_atomic(page);
4997
if (recon_state.nr_caps) {
4998
WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4999
*addr = cpu_to_le32(recon_state.nr_caps);
5000
} else if (recon_state.msg_version >= 4) {
5001
*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
5002
}
5003
kunmap_atomic(addr);
5004
}
5005
5006
reply->hdr.version = cpu_to_le16(recon_state.msg_version);
5007
if (recon_state.msg_version >= 4)
5008
reply->hdr.compat_version = cpu_to_le16(4);
5009
5010
reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
5011
ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
5012
5013
ceph_con_send(&session->s_con, reply);
5014
5015
mutex_unlock(&session->s_mutex);
5016
5017
mutex_lock(&mdsc->mutex);
5018
__wake_requests(mdsc, &session->s_waiting);
5019
mutex_unlock(&mdsc->mutex);
5020
5021
up_read(&mdsc->snap_rwsem);
5022
ceph_pagelist_release(recon_state.pagelist);
5023
return;
5024
5025
fail:
5026
ceph_msg_put(reply);
5027
up_read(&mdsc->snap_rwsem);
5028
mutex_unlock(&session->s_mutex);
5029
fail_nomsg:
5030
ceph_pagelist_release(recon_state.pagelist);
5031
fail_nopagelist:
5032
pr_err_client(cl, "error %d preparing reconnect for mds%d\n",
5033
err, mds);
5034
return;
5035
}
5036
5037
5038
/*
5039
* compare old and new mdsmaps, kicking requests
5040
* and closing out old connections as necessary
5041
*
5042
* called under mdsc->mutex.
5043
*/
5044
static void check_new_map(struct ceph_mds_client *mdsc,
5045
struct ceph_mdsmap *newmap,
5046
struct ceph_mdsmap *oldmap)
5047
{
5048
int i, j, err;
5049
int oldstate, newstate;
5050
struct ceph_mds_session *s;
5051
unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
5052
struct ceph_client *cl = mdsc->fsc->client;
5053
5054
doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch);
5055
5056
if (newmap->m_info) {
5057
for (i = 0; i < newmap->possible_max_rank; i++) {
5058
for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
5059
set_bit(newmap->m_info[i].export_targets[j], targets);
5060
}
5061
}
5062
5063
for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
5064
if (!mdsc->sessions[i])
5065
continue;
5066
s = mdsc->sessions[i];
5067
oldstate = ceph_mdsmap_get_state(oldmap, i);
5068
newstate = ceph_mdsmap_get_state(newmap, i);
5069
5070
doutc(cl, "mds%d state %s%s -> %s%s (session %s)\n",
5071
i, ceph_mds_state_name(oldstate),
5072
ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
5073
ceph_mds_state_name(newstate),
5074
ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
5075
ceph_session_state_name(s->s_state));
5076
5077
if (i >= newmap->possible_max_rank) {
5078
/* force close session for stopped mds */
5079
ceph_get_mds_session(s);
5080
__unregister_session(mdsc, s);
5081
__wake_requests(mdsc, &s->s_waiting);
5082
mutex_unlock(&mdsc->mutex);
5083
5084
mutex_lock(&s->s_mutex);
5085
cleanup_session_requests(mdsc, s);
5086
remove_session_caps(s);
5087
mutex_unlock(&s->s_mutex);
5088
5089
ceph_put_mds_session(s);
5090
5091
mutex_lock(&mdsc->mutex);
5092
kick_requests(mdsc, i);
5093
continue;
5094
}
5095
5096
if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
5097
ceph_mdsmap_get_addr(newmap, i),
5098
sizeof(struct ceph_entity_addr))) {
5099
/* just close it */
5100
mutex_unlock(&mdsc->mutex);
5101
mutex_lock(&s->s_mutex);
5102
mutex_lock(&mdsc->mutex);
5103
ceph_con_close(&s->s_con);
5104
mutex_unlock(&s->s_mutex);
5105
s->s_state = CEPH_MDS_SESSION_RESTARTING;
5106
} else if (oldstate == newstate) {
5107
continue; /* nothing new with this mds */
5108
}
5109
5110
/*
5111
* send reconnect?
5112
*/
5113
if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
5114
newstate >= CEPH_MDS_STATE_RECONNECT) {
5115
mutex_unlock(&mdsc->mutex);
5116
clear_bit(i, targets);
5117
send_mds_reconnect(mdsc, s);
5118
mutex_lock(&mdsc->mutex);
5119
}
5120
5121
/*
5122
* kick request on any mds that has gone active.
5123
*/
5124
if (oldstate < CEPH_MDS_STATE_ACTIVE &&
5125
newstate >= CEPH_MDS_STATE_ACTIVE) {
5126
if (oldstate != CEPH_MDS_STATE_CREATING &&
5127
oldstate != CEPH_MDS_STATE_STARTING)
5128
pr_info_client(cl, "mds%d recovery completed\n",
5129
s->s_mds);
5130
kick_requests(mdsc, i);
5131
mutex_unlock(&mdsc->mutex);
5132
mutex_lock(&s->s_mutex);
5133
mutex_lock(&mdsc->mutex);
5134
ceph_kick_flushing_caps(mdsc, s);
5135
mutex_unlock(&s->s_mutex);
5136
wake_up_session_caps(s, RECONNECT);
5137
}
5138
}
5139
5140
/*
5141
* Only open and reconnect sessions that don't exist yet.
5142
*/
5143
for (i = 0; i < newmap->possible_max_rank; i++) {
5144
/*
5145
* In case the import MDS is crashed just after
5146
* the EImportStart journal is flushed, so when
5147
* a standby MDS takes over it and is replaying
5148
* the EImportStart journal the new MDS daemon
5149
* will wait the client to reconnect it, but the
5150
* client may never register/open the session yet.
5151
*
5152
* Will try to reconnect that MDS daemon if the
5153
* rank number is in the export targets array and
5154
* is the up:reconnect state.
5155
*/
5156
newstate = ceph_mdsmap_get_state(newmap, i);
5157
if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
5158
continue;
5159
5160
/*
5161
* The session maybe registered and opened by some
5162
* requests which were choosing random MDSes during
5163
* the mdsc->mutex's unlock/lock gap below in rare
5164
* case. But the related MDS daemon will just queue
5165
* that requests and be still waiting for the client's
5166
* reconnection request in up:reconnect state.
5167
*/
5168
s = __ceph_lookup_mds_session(mdsc, i);
5169
if (likely(!s)) {
5170
s = __open_export_target_session(mdsc, i);
5171
if (IS_ERR(s)) {
5172
err = PTR_ERR(s);
5173
pr_err_client(cl,
5174
"failed to open export target session, err %d\n",
5175
err);
5176
continue;
5177
}
5178
}
5179
doutc(cl, "send reconnect to export target mds.%d\n", i);
5180
mutex_unlock(&mdsc->mutex);
5181
send_mds_reconnect(mdsc, s);
5182
ceph_put_mds_session(s);
5183
mutex_lock(&mdsc->mutex);
5184
}
5185
5186
for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
5187
s = mdsc->sessions[i];
5188
if (!s)
5189
continue;
5190
if (!ceph_mdsmap_is_laggy(newmap, i))
5191
continue;
5192
if (s->s_state == CEPH_MDS_SESSION_OPEN ||
5193
s->s_state == CEPH_MDS_SESSION_HUNG ||
5194
s->s_state == CEPH_MDS_SESSION_CLOSING) {
5195
doutc(cl, " connecting to export targets of laggy mds%d\n", i);
5196
__open_export_target_sessions(mdsc, s);
5197
}
5198
}
5199
}
5200
5201
5202
5203
/*
5204
* leases
5205
*/
5206
5207
/*
5208
* caller must hold session s_mutex, dentry->d_lock
5209
*/
5210
void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
5211
{
5212
struct ceph_dentry_info *di = ceph_dentry(dentry);
5213
5214
ceph_put_mds_session(di->lease_session);
5215
di->lease_session = NULL;
5216
}
5217
5218
static void handle_lease(struct ceph_mds_client *mdsc,
5219
struct ceph_mds_session *session,
5220
struct ceph_msg *msg)
5221
{
5222
struct ceph_client *cl = mdsc->fsc->client;
5223
struct super_block *sb = mdsc->fsc->sb;
5224
struct inode *inode;
5225
struct dentry *parent, *dentry;
5226
struct ceph_dentry_info *di;
5227
int mds = session->s_mds;
5228
struct ceph_mds_lease *h = msg->front.iov_base;
5229
u32 seq;
5230
struct ceph_vino vino;
5231
struct qstr dname;
5232
int release = 0;
5233
5234
doutc(cl, "from mds%d\n", mds);
5235
5236
if (!ceph_inc_mds_stopping_blocker(mdsc, session))
5237
return;
5238
5239
/* decode */
5240
if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
5241
goto bad;
5242
vino.ino = le64_to_cpu(h->ino);
5243
vino.snap = CEPH_NOSNAP;
5244
seq = le32_to_cpu(h->seq);
5245
dname.len = get_unaligned_le32(h + 1);
5246
if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
5247
goto bad;
5248
dname.name = (void *)(h + 1) + sizeof(u32);
5249
5250
/* lookup inode */
5251
inode = ceph_find_inode(sb, vino);
5252
doutc(cl, "%s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action),
5253
vino.ino, inode, dname.len, dname.name);
5254
5255
mutex_lock(&session->s_mutex);
5256
if (!inode) {
5257
doutc(cl, "no inode %llx\n", vino.ino);
5258
goto release;
5259
}
5260
5261
/* dentry */
5262
parent = d_find_alias(inode);
5263
if (!parent) {
5264
doutc(cl, "no parent dentry on inode %p\n", inode);
5265
WARN_ON(1);
5266
goto release; /* hrm... */
5267
}
5268
dname.hash = full_name_hash(parent, dname.name, dname.len);
5269
dentry = d_lookup(parent, &dname);
5270
dput(parent);
5271
if (!dentry)
5272
goto release;
5273
5274
spin_lock(&dentry->d_lock);
5275
di = ceph_dentry(dentry);
5276
switch (h->action) {
5277
case CEPH_MDS_LEASE_REVOKE:
5278
if (di->lease_session == session) {
5279
if (ceph_seq_cmp(di->lease_seq, seq) > 0)
5280
h->seq = cpu_to_le32(di->lease_seq);
5281
__ceph_mdsc_drop_dentry_lease(dentry);
5282
}
5283
release = 1;
5284
break;
5285
5286
case CEPH_MDS_LEASE_RENEW:
5287
if (di->lease_session == session &&
5288
di->lease_gen == atomic_read(&session->s_cap_gen) &&
5289
di->lease_renew_from &&
5290
di->lease_renew_after == 0) {
5291
unsigned long duration =
5292
msecs_to_jiffies(le32_to_cpu(h->duration_ms));
5293
5294
di->lease_seq = seq;
5295
di->time = di->lease_renew_from + duration;
5296
di->lease_renew_after = di->lease_renew_from +
5297
(duration >> 1);
5298
di->lease_renew_from = 0;
5299
}
5300
break;
5301
}
5302
spin_unlock(&dentry->d_lock);
5303
dput(dentry);
5304
5305
if (!release)
5306
goto out;
5307
5308
release:
5309
/* let's just reuse the same message */
5310
h->action = CEPH_MDS_LEASE_REVOKE_ACK;
5311
ceph_msg_get(msg);
5312
ceph_con_send(&session->s_con, msg);
5313
5314
out:
5315
mutex_unlock(&session->s_mutex);
5316
iput(inode);
5317
5318
ceph_dec_mds_stopping_blocker(mdsc);
5319
return;
5320
5321
bad:
5322
ceph_dec_mds_stopping_blocker(mdsc);
5323
5324
pr_err_client(cl, "corrupt lease message\n");
5325
ceph_msg_dump(msg);
5326
}
5327
5328
void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
5329
struct dentry *dentry, char action,
5330
u32 seq)
5331
{
5332
struct ceph_client *cl = session->s_mdsc->fsc->client;
5333
struct ceph_msg *msg;
5334
struct ceph_mds_lease *lease;
5335
struct inode *dir;
5336
int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
5337
5338
doutc(cl, "identry %p %s to mds%d\n", dentry, ceph_lease_op_name(action),
5339
session->s_mds);
5340
5341
msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
5342
if (!msg)
5343
return;
5344
lease = msg->front.iov_base;
5345
lease->action = action;
5346
lease->seq = cpu_to_le32(seq);
5347
5348
spin_lock(&dentry->d_lock);
5349
dir = d_inode(dentry->d_parent);
5350
lease->ino = cpu_to_le64(ceph_ino(dir));
5351
lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
5352
5353
put_unaligned_le32(dentry->d_name.len, lease + 1);
5354
memcpy((void *)(lease + 1) + 4,
5355
dentry->d_name.name, dentry->d_name.len);
5356
spin_unlock(&dentry->d_lock);
5357
5358
ceph_con_send(&session->s_con, msg);
5359
}
5360
5361
/*
5362
* lock unlock the session, to wait ongoing session activities
5363
*/
5364
static void lock_unlock_session(struct ceph_mds_session *s)
5365
{
5366
mutex_lock(&s->s_mutex);
5367
mutex_unlock(&s->s_mutex);
5368
}
5369
5370
static void maybe_recover_session(struct ceph_mds_client *mdsc)
5371
{
5372
struct ceph_client *cl = mdsc->fsc->client;
5373
struct ceph_fs_client *fsc = mdsc->fsc;
5374
5375
if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
5376
return;
5377
5378
if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
5379
return;
5380
5381
if (!READ_ONCE(fsc->blocklisted))
5382
return;
5383
5384
pr_info_client(cl, "auto reconnect after blocklisted\n");
5385
ceph_force_reconnect(fsc->sb);
5386
}
5387
5388
bool check_session_state(struct ceph_mds_session *s)
5389
{
5390
struct ceph_client *cl = s->s_mdsc->fsc->client;
5391
5392
switch (s->s_state) {
5393
case CEPH_MDS_SESSION_OPEN:
5394
if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
5395
s->s_state = CEPH_MDS_SESSION_HUNG;
5396
pr_info_client(cl, "mds%d hung\n", s->s_mds);
5397
}
5398
break;
5399
case CEPH_MDS_SESSION_CLOSING:
5400
case CEPH_MDS_SESSION_NEW:
5401
case CEPH_MDS_SESSION_RESTARTING:
5402
case CEPH_MDS_SESSION_CLOSED:
5403
case CEPH_MDS_SESSION_REJECTED:
5404
return false;
5405
}
5406
5407
return true;
5408
}
5409
5410
/*
5411
* If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
5412
* then we need to retransmit that request.
5413
*/
5414
void inc_session_sequence(struct ceph_mds_session *s)
5415
{
5416
struct ceph_client *cl = s->s_mdsc->fsc->client;
5417
5418
lockdep_assert_held(&s->s_mutex);
5419
5420
s->s_seq++;
5421
5422
if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
5423
int ret;
5424
5425
doutc(cl, "resending session close request for mds%d\n", s->s_mds);
5426
ret = request_close_session(s);
5427
if (ret < 0)
5428
pr_err_client(cl, "unable to close session to mds%d: %d\n",
5429
s->s_mds, ret);
5430
}
5431
}
5432
5433
/*
5434
* delayed work -- periodically trim expired leases, renew caps with mds. If
5435
* the @delay parameter is set to 0 or if it's more than 5 secs, the default
5436
* workqueue delay value of 5 secs will be used.
5437
*/
5438
static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
5439
{
5440
unsigned long max_delay = HZ * 5;
5441
5442
/* 5 secs default delay */
5443
if (!delay || (delay > max_delay))
5444
delay = max_delay;
5445
schedule_delayed_work(&mdsc->delayed_work,
5446
round_jiffies_relative(delay));
5447
}
5448
5449
static void delayed_work(struct work_struct *work)
5450
{
5451
struct ceph_mds_client *mdsc =
5452
container_of(work, struct ceph_mds_client, delayed_work.work);
5453
unsigned long delay;
5454
int renew_interval;
5455
int renew_caps;
5456
int i;
5457
5458
doutc(mdsc->fsc->client, "mdsc delayed_work\n");
5459
5460
if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
5461
return;
5462
5463
mutex_lock(&mdsc->mutex);
5464
renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
5465
renew_caps = time_after_eq(jiffies, HZ*renew_interval +
5466
mdsc->last_renew_caps);
5467
if (renew_caps)
5468
mdsc->last_renew_caps = jiffies;
5469
5470
for (i = 0; i < mdsc->max_sessions; i++) {
5471
struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
5472
if (!s)
5473
continue;
5474
5475
if (!check_session_state(s)) {
5476
ceph_put_mds_session(s);
5477
continue;
5478
}
5479
mutex_unlock(&mdsc->mutex);
5480
5481
ceph_flush_session_cap_releases(mdsc, s);
5482
5483
mutex_lock(&s->s_mutex);
5484
if (renew_caps)
5485
send_renew_caps(mdsc, s);
5486
else
5487
ceph_con_keepalive(&s->s_con);
5488
if (s->s_state == CEPH_MDS_SESSION_OPEN ||
5489
s->s_state == CEPH_MDS_SESSION_HUNG)
5490
ceph_send_cap_releases(mdsc, s);
5491
mutex_unlock(&s->s_mutex);
5492
ceph_put_mds_session(s);
5493
5494
mutex_lock(&mdsc->mutex);
5495
}
5496
mutex_unlock(&mdsc->mutex);
5497
5498
delay = ceph_check_delayed_caps(mdsc);
5499
5500
ceph_queue_cap_reclaim_work(mdsc);
5501
5502
ceph_trim_snapid_map(mdsc);
5503
5504
maybe_recover_session(mdsc);
5505
5506
schedule_delayed(mdsc, delay);
5507
}
5508
5509
int ceph_mdsc_init(struct ceph_fs_client *fsc)
5510
5511
{
5512
struct ceph_mds_client *mdsc;
5513
int err;
5514
5515
mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
5516
if (!mdsc)
5517
return -ENOMEM;
5518
mdsc->fsc = fsc;
5519
mutex_init(&mdsc->mutex);
5520
mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
5521
if (!mdsc->mdsmap) {
5522
err = -ENOMEM;
5523
goto err_mdsc;
5524
}
5525
5526
init_completion(&mdsc->safe_umount_waiters);
5527
spin_lock_init(&mdsc->stopping_lock);
5528
atomic_set(&mdsc->stopping_blockers, 0);
5529
init_completion(&mdsc->stopping_waiter);
5530
atomic64_set(&mdsc->dirty_folios, 0);
5531
init_waitqueue_head(&mdsc->flush_end_wq);
5532
init_waitqueue_head(&mdsc->session_close_wq);
5533
INIT_LIST_HEAD(&mdsc->waiting_for_map);
5534
mdsc->quotarealms_inodes = RB_ROOT;
5535
mutex_init(&mdsc->quotarealms_inodes_mutex);
5536
init_rwsem(&mdsc->snap_rwsem);
5537
mdsc->snap_realms = RB_ROOT;
5538
INIT_LIST_HEAD(&mdsc->snap_empty);
5539
spin_lock_init(&mdsc->snap_empty_lock);
5540
mdsc->request_tree = RB_ROOT;
5541
INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
5542
mdsc->last_renew_caps = jiffies;
5543
INIT_LIST_HEAD(&mdsc->cap_delay_list);
5544
#ifdef CONFIG_DEBUG_FS
5545
INIT_LIST_HEAD(&mdsc->cap_wait_list);
5546
#endif
5547
spin_lock_init(&mdsc->cap_delay_lock);
5548
INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list);
5549
INIT_LIST_HEAD(&mdsc->snap_flush_list);
5550
spin_lock_init(&mdsc->snap_flush_lock);
5551
mdsc->last_cap_flush_tid = 1;
5552
INIT_LIST_HEAD(&mdsc->cap_flush_list);
5553
INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
5554
spin_lock_init(&mdsc->cap_dirty_lock);
5555
init_waitqueue_head(&mdsc->cap_flushing_wq);
5556
INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
5557
INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work);
5558
err = ceph_metric_init(&mdsc->metric);
5559
if (err)
5560
goto err_mdsmap;
5561
5562
spin_lock_init(&mdsc->dentry_list_lock);
5563
INIT_LIST_HEAD(&mdsc->dentry_leases);
5564
INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
5565
5566
ceph_caps_init(mdsc);
5567
ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
5568
5569
spin_lock_init(&mdsc->snapid_map_lock);
5570
mdsc->snapid_map_tree = RB_ROOT;
5571
INIT_LIST_HEAD(&mdsc->snapid_map_lru);
5572
5573
init_rwsem(&mdsc->pool_perm_rwsem);
5574
mdsc->pool_perm_tree = RB_ROOT;
5575
5576
strscpy(mdsc->nodename, utsname()->nodename,
5577
sizeof(mdsc->nodename));
5578
5579
fsc->mdsc = mdsc;
5580
return 0;
5581
5582
err_mdsmap:
5583
kfree(mdsc->mdsmap);
5584
err_mdsc:
5585
kfree(mdsc);
5586
return err;
5587
}
5588
5589
/*
5590
* Wait for safe replies on open mds requests. If we time out, drop
5591
* all requests from the tree to avoid dangling dentry refs.
5592
*/
5593
static void wait_requests(struct ceph_mds_client *mdsc)
5594
{
5595
struct ceph_client *cl = mdsc->fsc->client;
5596
struct ceph_options *opts = mdsc->fsc->client->options;
5597
struct ceph_mds_request *req;
5598
5599
mutex_lock(&mdsc->mutex);
5600
if (__get_oldest_req(mdsc)) {
5601
mutex_unlock(&mdsc->mutex);
5602
5603
doutc(cl, "waiting for requests\n");
5604
wait_for_completion_timeout(&mdsc->safe_umount_waiters,
5605
ceph_timeout_jiffies(opts->mount_timeout));
5606
5607
/* tear down remaining requests */
5608
mutex_lock(&mdsc->mutex);
5609
while ((req = __get_oldest_req(mdsc))) {
5610
doutc(cl, "timed out on tid %llu\n", req->r_tid);
5611
list_del_init(&req->r_wait);
5612
__unregister_request(mdsc, req);
5613
}
5614
}
5615
mutex_unlock(&mdsc->mutex);
5616
doutc(cl, "done\n");
5617
}
5618
5619
void send_flush_mdlog(struct ceph_mds_session *s)
5620
{
5621
struct ceph_client *cl = s->s_mdsc->fsc->client;
5622
struct ceph_msg *msg;
5623
5624
/*
5625
* Pre-luminous MDS crashes when it sees an unknown session request
5626
*/
5627
if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
5628
return;
5629
5630
mutex_lock(&s->s_mutex);
5631
doutc(cl, "request mdlog flush to mds%d (%s)s seq %lld\n",
5632
s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
5633
msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
5634
s->s_seq);
5635
if (!msg) {
5636
pr_err_client(cl, "failed to request mdlog flush to mds%d (%s) seq %lld\n",
5637
s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
5638
} else {
5639
ceph_con_send(&s->s_con, msg);
5640
}
5641
mutex_unlock(&s->s_mutex);
5642
}
5643
5644
static int ceph_mds_auth_match(struct ceph_mds_client *mdsc,
5645
struct ceph_mds_cap_auth *auth,
5646
const struct cred *cred,
5647
char *tpath)
5648
{
5649
u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid);
5650
u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid);
5651
struct ceph_client *cl = mdsc->fsc->client;
5652
const char *spath = mdsc->fsc->mount_options->server_path;
5653
bool gid_matched = false;
5654
u32 gid, tlen, len;
5655
int i, j;
5656
5657
doutc(cl, "match.uid %lld\n", auth->match.uid);
5658
if (auth->match.uid != MDS_AUTH_UID_ANY) {
5659
if (auth->match.uid != caller_uid)
5660
return 0;
5661
if (auth->match.num_gids) {
5662
for (i = 0; i < auth->match.num_gids; i++) {
5663
if (caller_gid == auth->match.gids[i])
5664
gid_matched = true;
5665
}
5666
if (!gid_matched && cred->group_info->ngroups) {
5667
for (i = 0; i < cred->group_info->ngroups; i++) {
5668
gid = from_kgid(&init_user_ns,
5669
cred->group_info->gid[i]);
5670
for (j = 0; j < auth->match.num_gids; j++) {
5671
if (gid == auth->match.gids[j]) {
5672
gid_matched = true;
5673
break;
5674
}
5675
}
5676
if (gid_matched)
5677
break;
5678
}
5679
}
5680
if (!gid_matched)
5681
return 0;
5682
}
5683
}
5684
5685
/* path match */
5686
if (auth->match.path) {
5687
if (!tpath)
5688
return 0;
5689
5690
tlen = strlen(tpath);
5691
len = strlen(auth->match.path);
5692
if (len) {
5693
char *_tpath = tpath;
5694
bool free_tpath = false;
5695
int m, n;
5696
5697
doutc(cl, "server path %s, tpath %s, match.path %s\n",
5698
spath, tpath, auth->match.path);
5699
if (spath && (m = strlen(spath)) != 1) {
5700
/* mount path + '/' + tpath + an extra space */
5701
n = m + 1 + tlen + 1;
5702
_tpath = kmalloc(n, GFP_NOFS);
5703
if (!_tpath)
5704
return -ENOMEM;
5705
/* remove the leading '/' */
5706
snprintf(_tpath, n, "%s/%s", spath + 1, tpath);
5707
free_tpath = true;
5708
tlen = strlen(_tpath);
5709
}
5710
5711
/*
5712
* Please note the tailing '/' for match.path has already
5713
* been removed when parsing.
5714
*
5715
* Remove the tailing '/' for the target path.
5716
*/
5717
while (tlen && _tpath[tlen - 1] == '/') {
5718
_tpath[tlen - 1] = '\0';
5719
tlen -= 1;
5720
}
5721
doutc(cl, "_tpath %s\n", _tpath);
5722
5723
/*
5724
* In case first == _tpath && tlen == len:
5725
* match.path=/foo --> /foo _path=/foo --> match
5726
* match.path=/foo/ --> /foo _path=/foo --> match
5727
*
5728
* In case first == _tmatch.path && tlen > len:
5729
* match.path=/foo/ --> /foo _path=/foo/ --> match
5730
* match.path=/foo --> /foo _path=/foo/ --> match
5731
* match.path=/foo/ --> /foo _path=/foo/d --> match
5732
* match.path=/foo --> /foo _path=/food --> mismatch
5733
*
5734
* All the other cases --> mismatch
5735
*/
5736
bool path_matched = true;
5737
char *first = strstr(_tpath, auth->match.path);
5738
if (first != _tpath ||
5739
(tlen > len && _tpath[len] != '/')) {
5740
path_matched = false;
5741
}
5742
5743
if (free_tpath)
5744
kfree(_tpath);
5745
5746
if (!path_matched)
5747
return 0;
5748
}
5749
}
5750
5751
doutc(cl, "matched\n");
5752
return 1;
5753
}
5754
5755
int ceph_mds_check_access(struct ceph_mds_client *mdsc, char *tpath, int mask)
5756
{
5757
const struct cred *cred = get_current_cred();
5758
u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid);
5759
u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid);
5760
struct ceph_mds_cap_auth *rw_perms_s = NULL;
5761
struct ceph_client *cl = mdsc->fsc->client;
5762
bool root_squash_perms = true;
5763
int i, err;
5764
5765
doutc(cl, "tpath '%s', mask %d, caller_uid %d, caller_gid %d\n",
5766
tpath, mask, caller_uid, caller_gid);
5767
5768
for (i = 0; i < mdsc->s_cap_auths_num; i++) {
5769
struct ceph_mds_cap_auth *s = &mdsc->s_cap_auths[i];
5770
5771
err = ceph_mds_auth_match(mdsc, s, cred, tpath);
5772
if (err < 0) {
5773
put_cred(cred);
5774
return err;
5775
} else if (err > 0) {
5776
/* always follow the last auth caps' permission */
5777
root_squash_perms = true;
5778
rw_perms_s = NULL;
5779
if ((mask & MAY_WRITE) && s->writeable &&
5780
s->match.root_squash && (!caller_uid || !caller_gid))
5781
root_squash_perms = false;
5782
5783
if (((mask & MAY_WRITE) && !s->writeable) ||
5784
((mask & MAY_READ) && !s->readable))
5785
rw_perms_s = s;
5786
}
5787
}
5788
5789
put_cred(cred);
5790
5791
doutc(cl, "root_squash_perms %d, rw_perms_s %p\n", root_squash_perms,
5792
rw_perms_s);
5793
if (root_squash_perms && rw_perms_s == NULL) {
5794
doutc(cl, "access allowed\n");
5795
return 0;
5796
}
5797
5798
if (!root_squash_perms) {
5799
doutc(cl, "root_squash is enabled and user(%d %d) isn't allowed to write",
5800
caller_uid, caller_gid);
5801
}
5802
if (rw_perms_s) {
5803
doutc(cl, "mds auth caps readable/writeable %d/%d while request r/w %d/%d",
5804
rw_perms_s->readable, rw_perms_s->writeable,
5805
!!(mask & MAY_READ), !!(mask & MAY_WRITE));
5806
}
5807
doutc(cl, "access denied\n");
5808
return -EACCES;
5809
}
5810
5811
/*
5812
* called before mount is ro, and before dentries are torn down.
5813
* (hmm, does this still race with new lookups?)
5814
*/
5815
void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
5816
{
5817
doutc(mdsc->fsc->client, "begin\n");
5818
mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
5819
5820
ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
5821
ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
5822
ceph_flush_dirty_caps(mdsc);
5823
wait_requests(mdsc);
5824
5825
/*
5826
* wait for reply handlers to drop their request refs and
5827
* their inode/dcache refs
5828
*/
5829
ceph_msgr_flush();
5830
5831
ceph_cleanup_quotarealms_inodes(mdsc);
5832
doutc(mdsc->fsc->client, "done\n");
5833
}
5834
5835
/*
5836
* flush the mdlog and wait for all write mds requests to flush.
5837
*/
5838
static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc,
5839
u64 want_tid)
5840
{
5841
struct ceph_client *cl = mdsc->fsc->client;
5842
struct ceph_mds_request *req = NULL, *nextreq;
5843
struct ceph_mds_session *last_session = NULL;
5844
struct rb_node *n;
5845
5846
mutex_lock(&mdsc->mutex);
5847
doutc(cl, "want %lld\n", want_tid);
5848
restart:
5849
req = __get_oldest_req(mdsc);
5850
while (req && req->r_tid <= want_tid) {
5851
/* find next request */
5852
n = rb_next(&req->r_node);
5853
if (n)
5854
nextreq = rb_entry(n, struct ceph_mds_request, r_node);
5855
else
5856
nextreq = NULL;
5857
if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
5858
(req->r_op & CEPH_MDS_OP_WRITE)) {
5859
struct ceph_mds_session *s = req->r_session;
5860
5861
if (!s) {
5862
req = nextreq;
5863
continue;
5864
}
5865
5866
/* write op */
5867
ceph_mdsc_get_request(req);
5868
if (nextreq)
5869
ceph_mdsc_get_request(nextreq);
5870
s = ceph_get_mds_session(s);
5871
mutex_unlock(&mdsc->mutex);
5872
5873
/* send flush mdlog request to MDS */
5874
if (last_session != s) {
5875
send_flush_mdlog(s);
5876
ceph_put_mds_session(last_session);
5877
last_session = s;
5878
} else {
5879
ceph_put_mds_session(s);
5880
}
5881
doutc(cl, "wait on %llu (want %llu)\n",
5882
req->r_tid, want_tid);
5883
wait_for_completion(&req->r_safe_completion);
5884
5885
mutex_lock(&mdsc->mutex);
5886
ceph_mdsc_put_request(req);
5887
if (!nextreq)
5888
break; /* next dne before, so we're done! */
5889
if (RB_EMPTY_NODE(&nextreq->r_node)) {
5890
/* next request was removed from tree */
5891
ceph_mdsc_put_request(nextreq);
5892
goto restart;
5893
}
5894
ceph_mdsc_put_request(nextreq); /* won't go away */
5895
}
5896
req = nextreq;
5897
}
5898
mutex_unlock(&mdsc->mutex);
5899
ceph_put_mds_session(last_session);
5900
doutc(cl, "done\n");
5901
}
5902
5903
void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
5904
{
5905
struct ceph_client *cl = mdsc->fsc->client;
5906
u64 want_tid, want_flush;
5907
5908
if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
5909
return;
5910
5911
doutc(cl, "sync\n");
5912
mutex_lock(&mdsc->mutex);
5913
want_tid = mdsc->last_tid;
5914
mutex_unlock(&mdsc->mutex);
5915
5916
ceph_flush_dirty_caps(mdsc);
5917
ceph_flush_cap_releases(mdsc);
5918
spin_lock(&mdsc->cap_dirty_lock);
5919
want_flush = mdsc->last_cap_flush_tid;
5920
if (!list_empty(&mdsc->cap_flush_list)) {
5921
struct ceph_cap_flush *cf =
5922
list_last_entry(&mdsc->cap_flush_list,
5923
struct ceph_cap_flush, g_list);
5924
cf->wake = true;
5925
}
5926
spin_unlock(&mdsc->cap_dirty_lock);
5927
5928
doutc(cl, "sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
5929
5930
flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid);
5931
wait_caps_flush(mdsc, want_flush);
5932
}
5933
5934
/*
5935
* true if all sessions are closed, or we force unmount
5936
*/
5937
static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
5938
{
5939
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
5940
return true;
5941
return atomic_read(&mdsc->num_sessions) <= skipped;
5942
}
5943
5944
/*
5945
* called after sb is ro or when metadata corrupted.
5946
*/
5947
void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
5948
{
5949
struct ceph_options *opts = mdsc->fsc->client->options;
5950
struct ceph_client *cl = mdsc->fsc->client;
5951
struct ceph_mds_session *session;
5952
int i;
5953
int skipped = 0;
5954
5955
doutc(cl, "begin\n");
5956
5957
/* close sessions */
5958
mutex_lock(&mdsc->mutex);
5959
for (i = 0; i < mdsc->max_sessions; i++) {
5960
session = __ceph_lookup_mds_session(mdsc, i);
5961
if (!session)
5962
continue;
5963
mutex_unlock(&mdsc->mutex);
5964
mutex_lock(&session->s_mutex);
5965
if (__close_session(mdsc, session) <= 0)
5966
skipped++;
5967
mutex_unlock(&session->s_mutex);
5968
ceph_put_mds_session(session);
5969
mutex_lock(&mdsc->mutex);
5970
}
5971
mutex_unlock(&mdsc->mutex);
5972
5973
doutc(cl, "waiting for sessions to close\n");
5974
wait_event_timeout(mdsc->session_close_wq,
5975
done_closing_sessions(mdsc, skipped),
5976
ceph_timeout_jiffies(opts->mount_timeout));
5977
5978
/* tear down remaining sessions */
5979
mutex_lock(&mdsc->mutex);
5980
for (i = 0; i < mdsc->max_sessions; i++) {
5981
if (mdsc->sessions[i]) {
5982
session = ceph_get_mds_session(mdsc->sessions[i]);
5983
__unregister_session(mdsc, session);
5984
mutex_unlock(&mdsc->mutex);
5985
mutex_lock(&session->s_mutex);
5986
remove_session_caps(session);
5987
mutex_unlock(&session->s_mutex);
5988
ceph_put_mds_session(session);
5989
mutex_lock(&mdsc->mutex);
5990
}
5991
}
5992
WARN_ON(!list_empty(&mdsc->cap_delay_list));
5993
mutex_unlock(&mdsc->mutex);
5994
5995
ceph_cleanup_snapid_map(mdsc);
5996
ceph_cleanup_global_and_empty_realms(mdsc);
5997
5998
cancel_work_sync(&mdsc->cap_reclaim_work);
5999
cancel_work_sync(&mdsc->cap_unlink_work);
6000
cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
6001
6002
doutc(cl, "done\n");
6003
}
6004
6005
void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
6006
{
6007
struct ceph_mds_session *session;
6008
int mds;
6009
6010
doutc(mdsc->fsc->client, "force umount\n");
6011
6012
mutex_lock(&mdsc->mutex);
6013
for (mds = 0; mds < mdsc->max_sessions; mds++) {
6014
session = __ceph_lookup_mds_session(mdsc, mds);
6015
if (!session)
6016
continue;
6017
6018
if (session->s_state == CEPH_MDS_SESSION_REJECTED)
6019
__unregister_session(mdsc, session);
6020
__wake_requests(mdsc, &session->s_waiting);
6021
mutex_unlock(&mdsc->mutex);
6022
6023
mutex_lock(&session->s_mutex);
6024
__close_session(mdsc, session);
6025
if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
6026
cleanup_session_requests(mdsc, session);
6027
remove_session_caps(session);
6028
}
6029
mutex_unlock(&session->s_mutex);
6030
ceph_put_mds_session(session);
6031
6032
mutex_lock(&mdsc->mutex);
6033
kick_requests(mdsc, mds);
6034
}
6035
__wake_requests(mdsc, &mdsc->waiting_for_map);
6036
mutex_unlock(&mdsc->mutex);
6037
}
6038
6039
static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
6040
{
6041
doutc(mdsc->fsc->client, "stop\n");
6042
/*
6043
* Make sure the delayed work stopped before releasing
6044
* the resources.
6045
*
6046
* Because the cancel_delayed_work_sync() will only
6047
* guarantee that the work finishes executing. But the
6048
* delayed work will re-arm itself again after that.
6049
*/
6050
flush_delayed_work(&mdsc->delayed_work);
6051
6052
if (mdsc->mdsmap)
6053
ceph_mdsmap_destroy(mdsc->mdsmap);
6054
kfree(mdsc->sessions);
6055
ceph_caps_finalize(mdsc);
6056
6057
if (mdsc->s_cap_auths) {
6058
int i;
6059
6060
for (i = 0; i < mdsc->s_cap_auths_num; i++) {
6061
kfree(mdsc->s_cap_auths[i].match.gids);
6062
kfree(mdsc->s_cap_auths[i].match.path);
6063
kfree(mdsc->s_cap_auths[i].match.fs_name);
6064
}
6065
kfree(mdsc->s_cap_auths);
6066
}
6067
6068
ceph_pool_perm_destroy(mdsc);
6069
}
6070
6071
void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
6072
{
6073
struct ceph_mds_client *mdsc = fsc->mdsc;
6074
doutc(fsc->client, "%p\n", mdsc);
6075
6076
if (!mdsc)
6077
return;
6078
6079
/* flush out any connection work with references to us */
6080
ceph_msgr_flush();
6081
6082
ceph_mdsc_stop(mdsc);
6083
6084
ceph_metric_destroy(&mdsc->metric);
6085
6086
fsc->mdsc = NULL;
6087
kfree(mdsc);
6088
doutc(fsc->client, "%p done\n", mdsc);
6089
}
6090
6091
void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
6092
{
6093
struct ceph_fs_client *fsc = mdsc->fsc;
6094
struct ceph_client *cl = fsc->client;
6095
const char *mds_namespace = fsc->mount_options->mds_namespace;
6096
void *p = msg->front.iov_base;
6097
void *end = p + msg->front.iov_len;
6098
u32 epoch;
6099
u32 num_fs;
6100
u32 mount_fscid = (u32)-1;
6101
int err = -EINVAL;
6102
6103
ceph_decode_need(&p, end, sizeof(u32), bad);
6104
epoch = ceph_decode_32(&p);
6105
6106
doutc(cl, "epoch %u\n", epoch);
6107
6108
/* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
6109
ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
6110
6111
ceph_decode_32_safe(&p, end, num_fs, bad);
6112
while (num_fs-- > 0) {
6113
void *info_p, *info_end;
6114
u32 info_len;
6115
u32 fscid, namelen;
6116
6117
ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
6118
p += 2; // info_v, info_cv
6119
info_len = ceph_decode_32(&p);
6120
ceph_decode_need(&p, end, info_len, bad);
6121
info_p = p;
6122
info_end = p + info_len;
6123
p = info_end;
6124
6125
ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
6126
fscid = ceph_decode_32(&info_p);
6127
namelen = ceph_decode_32(&info_p);
6128
ceph_decode_need(&info_p, info_end, namelen, bad);
6129
6130
if (mds_namespace &&
6131
strlen(mds_namespace) == namelen &&
6132
!strncmp(mds_namespace, (char *)info_p, namelen)) {
6133
mount_fscid = fscid;
6134
break;
6135
}
6136
}
6137
6138
ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
6139
if (mount_fscid != (u32)-1) {
6140
fsc->client->monc.fs_cluster_id = mount_fscid;
6141
ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
6142
0, true);
6143
ceph_monc_renew_subs(&fsc->client->monc);
6144
} else {
6145
err = -ENOENT;
6146
goto err_out;
6147
}
6148
return;
6149
6150
bad:
6151
pr_err_client(cl, "error decoding fsmap %d. Shutting down mount.\n",
6152
err);
6153
ceph_umount_begin(mdsc->fsc->sb);
6154
ceph_msg_dump(msg);
6155
err_out:
6156
mutex_lock(&mdsc->mutex);
6157
mdsc->mdsmap_err = err;
6158
__wake_requests(mdsc, &mdsc->waiting_for_map);
6159
mutex_unlock(&mdsc->mutex);
6160
}
6161
6162
/*
6163
* handle mds map update.
6164
*/
6165
void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
6166
{
6167
struct ceph_client *cl = mdsc->fsc->client;
6168
u32 epoch;
6169
u32 maplen;
6170
void *p = msg->front.iov_base;
6171
void *end = p + msg->front.iov_len;
6172
struct ceph_mdsmap *newmap, *oldmap;
6173
struct ceph_fsid fsid;
6174
int err = -EINVAL;
6175
6176
ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
6177
ceph_decode_copy(&p, &fsid, sizeof(fsid));
6178
if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
6179
return;
6180
epoch = ceph_decode_32(&p);
6181
maplen = ceph_decode_32(&p);
6182
doutc(cl, "epoch %u len %d\n", epoch, (int)maplen);
6183
6184
/* do we need it? */
6185
mutex_lock(&mdsc->mutex);
6186
if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
6187
doutc(cl, "epoch %u <= our %u\n", epoch, mdsc->mdsmap->m_epoch);
6188
mutex_unlock(&mdsc->mutex);
6189
return;
6190
}
6191
6192
newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client));
6193
if (IS_ERR(newmap)) {
6194
err = PTR_ERR(newmap);
6195
goto bad_unlock;
6196
}
6197
6198
/* swap into place */
6199
if (mdsc->mdsmap) {
6200
oldmap = mdsc->mdsmap;
6201
mdsc->mdsmap = newmap;
6202
check_new_map(mdsc, newmap, oldmap);
6203
ceph_mdsmap_destroy(oldmap);
6204
} else {
6205
mdsc->mdsmap = newmap; /* first mds map */
6206
}
6207
mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
6208
MAX_LFS_FILESIZE);
6209
6210
__wake_requests(mdsc, &mdsc->waiting_for_map);
6211
ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
6212
mdsc->mdsmap->m_epoch);
6213
6214
mutex_unlock(&mdsc->mutex);
6215
schedule_delayed(mdsc, 0);
6216
return;
6217
6218
bad_unlock:
6219
mutex_unlock(&mdsc->mutex);
6220
bad:
6221
pr_err_client(cl, "error decoding mdsmap %d. Shutting down mount.\n",
6222
err);
6223
ceph_umount_begin(mdsc->fsc->sb);
6224
ceph_msg_dump(msg);
6225
return;
6226
}
6227
6228
static struct ceph_connection *mds_get_con(struct ceph_connection *con)
6229
{
6230
struct ceph_mds_session *s = con->private;
6231
6232
if (ceph_get_mds_session(s))
6233
return con;
6234
return NULL;
6235
}
6236
6237
static void mds_put_con(struct ceph_connection *con)
6238
{
6239
struct ceph_mds_session *s = con->private;
6240
6241
ceph_put_mds_session(s);
6242
}
6243
6244
/*
6245
* if the client is unresponsive for long enough, the mds will kill
6246
* the session entirely.
6247
*/
6248
static void mds_peer_reset(struct ceph_connection *con)
6249
{
6250
struct ceph_mds_session *s = con->private;
6251
struct ceph_mds_client *mdsc = s->s_mdsc;
6252
6253
pr_warn_client(mdsc->fsc->client, "mds%d closed our session\n",
6254
s->s_mds);
6255
if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO &&
6256
ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) >= CEPH_MDS_STATE_RECONNECT)
6257
send_mds_reconnect(mdsc, s);
6258
}
6259
6260
static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
6261
{
6262
struct ceph_mds_session *s = con->private;
6263
struct ceph_mds_client *mdsc = s->s_mdsc;
6264
struct ceph_client *cl = mdsc->fsc->client;
6265
int type = le16_to_cpu(msg->hdr.type);
6266
6267
mutex_lock(&mdsc->mutex);
6268
if (__verify_registered_session(mdsc, s) < 0) {
6269
mutex_unlock(&mdsc->mutex);
6270
goto out;
6271
}
6272
mutex_unlock(&mdsc->mutex);
6273
6274
switch (type) {
6275
case CEPH_MSG_MDS_MAP:
6276
ceph_mdsc_handle_mdsmap(mdsc, msg);
6277
break;
6278
case CEPH_MSG_FS_MAP_USER:
6279
ceph_mdsc_handle_fsmap(mdsc, msg);
6280
break;
6281
case CEPH_MSG_CLIENT_SESSION:
6282
handle_session(s, msg);
6283
break;
6284
case CEPH_MSG_CLIENT_REPLY:
6285
handle_reply(s, msg);
6286
break;
6287
case CEPH_MSG_CLIENT_REQUEST_FORWARD:
6288
handle_forward(mdsc, s, msg);
6289
break;
6290
case CEPH_MSG_CLIENT_CAPS:
6291
ceph_handle_caps(s, msg);
6292
break;
6293
case CEPH_MSG_CLIENT_SNAP:
6294
ceph_handle_snap(mdsc, s, msg);
6295
break;
6296
case CEPH_MSG_CLIENT_LEASE:
6297
handle_lease(mdsc, s, msg);
6298
break;
6299
case CEPH_MSG_CLIENT_QUOTA:
6300
ceph_handle_quota(mdsc, s, msg);
6301
break;
6302
6303
default:
6304
pr_err_client(cl, "received unknown message type %d %s\n",
6305
type, ceph_msg_type_name(type));
6306
}
6307
out:
6308
ceph_msg_put(msg);
6309
}
6310
6311
/*
6312
* authentication
6313
*/
6314
6315
/*
6316
* Note: returned pointer is the address of a structure that's
6317
* managed separately. Caller must *not* attempt to free it.
6318
*/
6319
static struct ceph_auth_handshake *
6320
mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
6321
{
6322
struct ceph_mds_session *s = con->private;
6323
struct ceph_mds_client *mdsc = s->s_mdsc;
6324
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6325
struct ceph_auth_handshake *auth = &s->s_auth;
6326
int ret;
6327
6328
ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
6329
force_new, proto, NULL, NULL);
6330
if (ret)
6331
return ERR_PTR(ret);
6332
6333
return auth;
6334
}
6335
6336
static int mds_add_authorizer_challenge(struct ceph_connection *con,
6337
void *challenge_buf, int challenge_buf_len)
6338
{
6339
struct ceph_mds_session *s = con->private;
6340
struct ceph_mds_client *mdsc = s->s_mdsc;
6341
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6342
6343
return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
6344
challenge_buf, challenge_buf_len);
6345
}
6346
6347
static int mds_verify_authorizer_reply(struct ceph_connection *con)
6348
{
6349
struct ceph_mds_session *s = con->private;
6350
struct ceph_mds_client *mdsc = s->s_mdsc;
6351
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6352
struct ceph_auth_handshake *auth = &s->s_auth;
6353
6354
return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
6355
auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
6356
NULL, NULL, NULL, NULL);
6357
}
6358
6359
static int mds_invalidate_authorizer(struct ceph_connection *con)
6360
{
6361
struct ceph_mds_session *s = con->private;
6362
struct ceph_mds_client *mdsc = s->s_mdsc;
6363
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6364
6365
ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
6366
6367
return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
6368
}
6369
6370
static int mds_get_auth_request(struct ceph_connection *con,
6371
void *buf, int *buf_len,
6372
void **authorizer, int *authorizer_len)
6373
{
6374
struct ceph_mds_session *s = con->private;
6375
struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6376
struct ceph_auth_handshake *auth = &s->s_auth;
6377
int ret;
6378
6379
ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
6380
buf, buf_len);
6381
if (ret)
6382
return ret;
6383
6384
*authorizer = auth->authorizer_buf;
6385
*authorizer_len = auth->authorizer_buf_len;
6386
return 0;
6387
}
6388
6389
static int mds_handle_auth_reply_more(struct ceph_connection *con,
6390
void *reply, int reply_len,
6391
void *buf, int *buf_len,
6392
void **authorizer, int *authorizer_len)
6393
{
6394
struct ceph_mds_session *s = con->private;
6395
struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6396
struct ceph_auth_handshake *auth = &s->s_auth;
6397
int ret;
6398
6399
ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
6400
buf, buf_len);
6401
if (ret)
6402
return ret;
6403
6404
*authorizer = auth->authorizer_buf;
6405
*authorizer_len = auth->authorizer_buf_len;
6406
return 0;
6407
}
6408
6409
static int mds_handle_auth_done(struct ceph_connection *con,
6410
u64 global_id, void *reply, int reply_len,
6411
u8 *session_key, int *session_key_len,
6412
u8 *con_secret, int *con_secret_len)
6413
{
6414
struct ceph_mds_session *s = con->private;
6415
struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6416
struct ceph_auth_handshake *auth = &s->s_auth;
6417
6418
return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
6419
session_key, session_key_len,
6420
con_secret, con_secret_len);
6421
}
6422
6423
static int mds_handle_auth_bad_method(struct ceph_connection *con,
6424
int used_proto, int result,
6425
const int *allowed_protos, int proto_cnt,
6426
const int *allowed_modes, int mode_cnt)
6427
{
6428
struct ceph_mds_session *s = con->private;
6429
struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
6430
int ret;
6431
6432
if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
6433
used_proto, result,
6434
allowed_protos, proto_cnt,
6435
allowed_modes, mode_cnt)) {
6436
ret = ceph_monc_validate_auth(monc);
6437
if (ret)
6438
return ret;
6439
}
6440
6441
return -EACCES;
6442
}
6443
6444
static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
6445
struct ceph_msg_header *hdr, int *skip)
6446
{
6447
struct ceph_msg *msg;
6448
int type = (int) le16_to_cpu(hdr->type);
6449
int front_len = (int) le32_to_cpu(hdr->front_len);
6450
6451
if (con->in_msg)
6452
return con->in_msg;
6453
6454
*skip = 0;
6455
msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
6456
if (!msg) {
6457
pr_err("unable to allocate msg type %d len %d\n",
6458
type, front_len);
6459
return NULL;
6460
}
6461
6462
return msg;
6463
}
6464
6465
static int mds_sign_message(struct ceph_msg *msg)
6466
{
6467
struct ceph_mds_session *s = msg->con->private;
6468
struct ceph_auth_handshake *auth = &s->s_auth;
6469
6470
return ceph_auth_sign_message(auth, msg);
6471
}
6472
6473
static int mds_check_message_signature(struct ceph_msg *msg)
6474
{
6475
struct ceph_mds_session *s = msg->con->private;
6476
struct ceph_auth_handshake *auth = &s->s_auth;
6477
6478
return ceph_auth_check_message_signature(auth, msg);
6479
}
6480
6481
static const struct ceph_connection_operations mds_con_ops = {
6482
.get = mds_get_con,
6483
.put = mds_put_con,
6484
.alloc_msg = mds_alloc_msg,
6485
.dispatch = mds_dispatch,
6486
.peer_reset = mds_peer_reset,
6487
.get_authorizer = mds_get_authorizer,
6488
.add_authorizer_challenge = mds_add_authorizer_challenge,
6489
.verify_authorizer_reply = mds_verify_authorizer_reply,
6490
.invalidate_authorizer = mds_invalidate_authorizer,
6491
.sign_message = mds_sign_message,
6492
.check_message_signature = mds_check_message_signature,
6493
.get_auth_request = mds_get_auth_request,
6494
.handle_auth_reply_more = mds_handle_auth_reply_more,
6495
.handle_auth_done = mds_handle_auth_done,
6496
.handle_auth_bad_method = mds_handle_auth_bad_method,
6497
};
6498
6499
/* eof */
6500
6501