Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/ceph/dir.c
29265 views
1
// SPDX-License-Identifier: GPL-2.0
2
#include <linux/ceph/ceph_debug.h>
3
4
#include <linux/spinlock.h>
5
#include <linux/namei.h>
6
#include <linux/slab.h>
7
#include <linux/sched.h>
8
#include <linux/xattr.h>
9
10
#include "super.h"
11
#include "mds_client.h"
12
#include "crypto.h"
13
14
/*
15
* Directory operations: readdir, lookup, create, link, unlink,
16
* rename, etc.
17
*/
18
19
/*
20
* Ceph MDS operations are specified in terms of a base ino and
21
* relative path. Thus, the client can specify an operation on a
22
* specific inode (e.g., a getattr due to fstat(2)), or as a path
23
* relative to, say, the root directory.
24
*
25
* Normally, we limit ourselves to strict inode ops (no path component)
26
* or dentry operations (a single path component relative to an ino). The
27
* exception to this is open_root_dentry(), which will open the mount
28
* point by name.
29
*/
30
31
const struct dentry_operations ceph_dentry_ops;
32
33
static bool __dentry_lease_is_valid(struct ceph_dentry_info *di);
34
static int __dir_lease_try_check(const struct dentry *dentry);
35
36
/*
37
* Initialize ceph dentry state.
38
*/
39
static int ceph_d_init(struct dentry *dentry)
40
{
41
struct ceph_dentry_info *di;
42
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dentry->d_sb);
43
44
di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
45
if (!di)
46
return -ENOMEM; /* oh well */
47
48
di->dentry = dentry;
49
di->lease_session = NULL;
50
di->time = jiffies;
51
dentry->d_fsdata = di;
52
INIT_LIST_HEAD(&di->lease_list);
53
54
atomic64_inc(&mdsc->metric.total_dentries);
55
56
return 0;
57
}
58
59
/*
60
* for f_pos for readdir:
61
* - hash order:
62
* (0xff << 52) | ((24 bits hash) << 28) |
63
* (the nth entry has hash collision);
64
* - frag+name order;
65
* ((frag value) << 28) | (the nth entry in frag);
66
*/
67
#define OFFSET_BITS 28
68
#define OFFSET_MASK ((1 << OFFSET_BITS) - 1)
69
#define HASH_ORDER (0xffull << (OFFSET_BITS + 24))
70
loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
71
{
72
loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
73
if (hash_order)
74
fpos |= HASH_ORDER;
75
return fpos;
76
}
77
78
static bool is_hash_order(loff_t p)
79
{
80
return (p & HASH_ORDER) == HASH_ORDER;
81
}
82
83
static unsigned fpos_frag(loff_t p)
84
{
85
return p >> OFFSET_BITS;
86
}
87
88
static unsigned fpos_hash(loff_t p)
89
{
90
return ceph_frag_value(fpos_frag(p));
91
}
92
93
static unsigned fpos_off(loff_t p)
94
{
95
return p & OFFSET_MASK;
96
}
97
98
static int fpos_cmp(loff_t l, loff_t r)
99
{
100
int v = ceph_frag_compare(fpos_frag(l), fpos_frag(r));
101
if (v)
102
return v;
103
return (int)(fpos_off(l) - fpos_off(r));
104
}
105
106
/*
107
* make note of the last dentry we read, so we can
108
* continue at the same lexicographical point,
109
* regardless of what dir changes take place on the
110
* server.
111
*/
112
static int note_last_dentry(struct ceph_fs_client *fsc,
113
struct ceph_dir_file_info *dfi,
114
const char *name,
115
int len, unsigned next_offset)
116
{
117
char *buf = kmalloc(len+1, GFP_KERNEL);
118
if (!buf)
119
return -ENOMEM;
120
kfree(dfi->last_name);
121
dfi->last_name = buf;
122
memcpy(dfi->last_name, name, len);
123
dfi->last_name[len] = 0;
124
dfi->next_offset = next_offset;
125
doutc(fsc->client, "'%s'\n", dfi->last_name);
126
return 0;
127
}
128
129
130
static struct dentry *
131
__dcache_find_get_entry(struct dentry *parent, u64 idx,
132
struct ceph_readdir_cache_control *cache_ctl)
133
{
134
struct inode *dir = d_inode(parent);
135
struct ceph_client *cl = ceph_inode_to_client(dir);
136
struct dentry *dentry;
137
unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1;
138
loff_t ptr_pos = idx * sizeof(struct dentry *);
139
pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT;
140
141
if (ptr_pos >= i_size_read(dir))
142
return NULL;
143
144
if (!cache_ctl->folio || ptr_pgoff != cache_ctl->folio->index) {
145
ceph_readdir_cache_release(cache_ctl);
146
cache_ctl->folio = filemap_lock_folio(&dir->i_data, ptr_pgoff);
147
if (IS_ERR(cache_ctl->folio)) {
148
cache_ctl->folio = NULL;
149
doutc(cl, " folio %lu not found\n", ptr_pgoff);
150
return ERR_PTR(-EAGAIN);
151
}
152
/* reading/filling the cache are serialized by
153
i_rwsem, no need to use folio lock */
154
folio_unlock(cache_ctl->folio);
155
cache_ctl->dentries = kmap_local_folio(cache_ctl->folio, 0);
156
}
157
158
cache_ctl->index = idx & idx_mask;
159
160
rcu_read_lock();
161
spin_lock(&parent->d_lock);
162
/* check i_size again here, because empty directory can be
163
* marked as complete while not holding the i_rwsem. */
164
if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
165
dentry = cache_ctl->dentries[cache_ctl->index];
166
else
167
dentry = NULL;
168
spin_unlock(&parent->d_lock);
169
if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
170
dentry = NULL;
171
rcu_read_unlock();
172
return dentry ? : ERR_PTR(-EAGAIN);
173
}
174
175
/*
176
* When possible, we try to satisfy a readdir by peeking at the
177
* dcache. We make this work by carefully ordering dentries on
178
* d_children when we initially get results back from the MDS, and
179
* falling back to a "normal" sync readdir if any dentries in the dir
180
* are dropped.
181
*
182
* Complete dir indicates that we have all dentries in the dir. It is
183
* defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
184
* the MDS if/when the directory is modified).
185
*/
186
static int __dcache_readdir(struct file *file, struct dir_context *ctx,
187
int shared_gen)
188
{
189
struct ceph_dir_file_info *dfi = file->private_data;
190
struct dentry *parent = file->f_path.dentry;
191
struct inode *dir = d_inode(parent);
192
struct ceph_fs_client *fsc = ceph_inode_to_fs_client(dir);
193
struct ceph_client *cl = ceph_inode_to_client(dir);
194
struct dentry *dentry, *last = NULL;
195
struct ceph_dentry_info *di;
196
struct ceph_readdir_cache_control cache_ctl = {};
197
u64 idx = 0;
198
int err = 0;
199
200
doutc(cl, "%p %llx.%llx v%u at %llx\n", dir, ceph_vinop(dir),
201
(unsigned)shared_gen, ctx->pos);
202
203
/* search start position */
204
if (ctx->pos > 2) {
205
u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *));
206
while (count > 0) {
207
u64 step = count >> 1;
208
dentry = __dcache_find_get_entry(parent, idx + step,
209
&cache_ctl);
210
if (!dentry) {
211
/* use linear search */
212
idx = 0;
213
break;
214
}
215
if (IS_ERR(dentry)) {
216
err = PTR_ERR(dentry);
217
goto out;
218
}
219
di = ceph_dentry(dentry);
220
spin_lock(&dentry->d_lock);
221
if (fpos_cmp(di->offset, ctx->pos) < 0) {
222
idx += step + 1;
223
count -= step + 1;
224
} else {
225
count = step;
226
}
227
spin_unlock(&dentry->d_lock);
228
dput(dentry);
229
}
230
231
doutc(cl, "%p %llx.%llx cache idx %llu\n", dir,
232
ceph_vinop(dir), idx);
233
}
234
235
236
for (;;) {
237
bool emit_dentry = false;
238
dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
239
if (!dentry) {
240
dfi->file_info.flags |= CEPH_F_ATEND;
241
err = 0;
242
break;
243
}
244
if (IS_ERR(dentry)) {
245
err = PTR_ERR(dentry);
246
goto out;
247
}
248
249
spin_lock(&dentry->d_lock);
250
di = ceph_dentry(dentry);
251
if (d_unhashed(dentry) ||
252
d_really_is_negative(dentry) ||
253
di->lease_shared_gen != shared_gen ||
254
((dentry->d_flags & DCACHE_NOKEY_NAME) &&
255
fscrypt_has_encryption_key(dir))) {
256
spin_unlock(&dentry->d_lock);
257
dput(dentry);
258
err = -EAGAIN;
259
goto out;
260
}
261
if (fpos_cmp(ctx->pos, di->offset) <= 0) {
262
__ceph_dentry_dir_lease_touch(di);
263
emit_dentry = true;
264
}
265
spin_unlock(&dentry->d_lock);
266
267
if (emit_dentry) {
268
doutc(cl, " %llx dentry %p %pd %p\n", di->offset,
269
dentry, dentry, d_inode(dentry));
270
ctx->pos = di->offset;
271
if (!dir_emit(ctx, dentry->d_name.name,
272
dentry->d_name.len, ceph_present_inode(d_inode(dentry)),
273
d_inode(dentry)->i_mode >> 12)) {
274
dput(dentry);
275
err = 0;
276
break;
277
}
278
ctx->pos++;
279
280
if (last)
281
dput(last);
282
last = dentry;
283
} else {
284
dput(dentry);
285
}
286
}
287
out:
288
ceph_readdir_cache_release(&cache_ctl);
289
if (last) {
290
int ret;
291
di = ceph_dentry(last);
292
ret = note_last_dentry(fsc, dfi, last->d_name.name,
293
last->d_name.len,
294
fpos_off(di->offset) + 1);
295
if (ret < 0)
296
err = ret;
297
dput(last);
298
/* last_name no longer match cache index */
299
if (dfi->readdir_cache_idx >= 0) {
300
dfi->readdir_cache_idx = -1;
301
dfi->dir_release_count = 0;
302
}
303
}
304
return err;
305
}
306
307
static bool need_send_readdir(struct ceph_dir_file_info *dfi, loff_t pos)
308
{
309
if (!dfi->last_readdir)
310
return true;
311
if (is_hash_order(pos))
312
return !ceph_frag_contains_value(dfi->frag, fpos_hash(pos));
313
else
314
return dfi->frag != fpos_frag(pos);
315
}
316
317
static int ceph_readdir(struct file *file, struct dir_context *ctx)
318
{
319
struct ceph_dir_file_info *dfi = file->private_data;
320
struct inode *inode = file_inode(file);
321
struct ceph_inode_info *ci = ceph_inode(inode);
322
struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
323
struct ceph_mds_client *mdsc = fsc->mdsc;
324
struct ceph_client *cl = fsc->client;
325
int i;
326
int err;
327
unsigned frag = -1;
328
struct ceph_mds_reply_info_parsed *rinfo;
329
330
doutc(cl, "%p %llx.%llx file %p pos %llx\n", inode,
331
ceph_vinop(inode), file, ctx->pos);
332
if (dfi->file_info.flags & CEPH_F_ATEND)
333
return 0;
334
335
/* always start with . and .. */
336
if (ctx->pos == 0) {
337
doutc(cl, "%p %llx.%llx off 0 -> '.'\n", inode,
338
ceph_vinop(inode));
339
if (!dir_emit(ctx, ".", 1, ceph_present_inode(inode),
340
inode->i_mode >> 12))
341
return 0;
342
ctx->pos = 1;
343
}
344
if (ctx->pos == 1) {
345
u64 ino;
346
struct dentry *dentry = file->f_path.dentry;
347
348
spin_lock(&dentry->d_lock);
349
ino = ceph_present_inode(dentry->d_parent->d_inode);
350
spin_unlock(&dentry->d_lock);
351
352
doutc(cl, "%p %llx.%llx off 1 -> '..'\n", inode,
353
ceph_vinop(inode));
354
if (!dir_emit(ctx, "..", 2, ino, inode->i_mode >> 12))
355
return 0;
356
ctx->pos = 2;
357
}
358
359
err = ceph_fscrypt_prepare_readdir(inode);
360
if (err < 0)
361
return err;
362
363
spin_lock(&ci->i_ceph_lock);
364
/* request Fx cap. if have Fx, we don't need to release Fs cap
365
* for later create/unlink. */
366
__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_WR);
367
/* can we use the dcache? */
368
if (ceph_test_mount_opt(fsc, DCACHE) &&
369
!ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
370
ceph_snap(inode) != CEPH_SNAPDIR &&
371
__ceph_dir_is_complete_ordered(ci) &&
372
__ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
373
int shared_gen = atomic_read(&ci->i_shared_gen);
374
375
spin_unlock(&ci->i_ceph_lock);
376
err = __dcache_readdir(file, ctx, shared_gen);
377
if (err != -EAGAIN)
378
return err;
379
} else {
380
spin_unlock(&ci->i_ceph_lock);
381
}
382
383
/* proceed with a normal readdir */
384
more:
385
/* do we have the correct frag content buffered? */
386
if (need_send_readdir(dfi, ctx->pos)) {
387
struct ceph_mds_request *req;
388
int op = ceph_snap(inode) == CEPH_SNAPDIR ?
389
CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
390
391
/* discard old result, if any */
392
if (dfi->last_readdir) {
393
ceph_mdsc_put_request(dfi->last_readdir);
394
dfi->last_readdir = NULL;
395
}
396
397
if (is_hash_order(ctx->pos)) {
398
/* fragtree isn't always accurate. choose frag
399
* based on previous reply when possible. */
400
if (frag == (unsigned)-1)
401
frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
402
NULL, NULL);
403
} else {
404
frag = fpos_frag(ctx->pos);
405
}
406
407
doutc(cl, "fetching %p %llx.%llx frag %x offset '%s'\n",
408
inode, ceph_vinop(inode), frag, dfi->last_name);
409
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
410
if (IS_ERR(req))
411
return PTR_ERR(req);
412
413
err = ceph_alloc_readdir_reply_buffer(req, inode);
414
if (err) {
415
ceph_mdsc_put_request(req);
416
return err;
417
}
418
/* hints to request -> mds selection code */
419
req->r_direct_mode = USE_AUTH_MDS;
420
if (op == CEPH_MDS_OP_READDIR) {
421
req->r_direct_hash = ceph_frag_value(frag);
422
__set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
423
req->r_inode_drop = CEPH_CAP_FILE_EXCL;
424
}
425
if (dfi->last_name) {
426
int len = strlen(dfi->last_name);
427
428
req->r_path2 = kzalloc(NAME_MAX + 1, GFP_KERNEL);
429
if (!req->r_path2) {
430
ceph_mdsc_put_request(req);
431
return -ENOMEM;
432
}
433
memcpy(req->r_path2, dfi->last_name, len);
434
435
err = ceph_encode_encrypted_dname(inode, req->r_path2, len);
436
if (err < 0) {
437
ceph_mdsc_put_request(req);
438
return err;
439
}
440
} else if (is_hash_order(ctx->pos)) {
441
req->r_args.readdir.offset_hash =
442
cpu_to_le32(fpos_hash(ctx->pos));
443
}
444
445
req->r_dir_release_cnt = dfi->dir_release_count;
446
req->r_dir_ordered_cnt = dfi->dir_ordered_count;
447
req->r_readdir_cache_idx = dfi->readdir_cache_idx;
448
req->r_readdir_offset = dfi->next_offset;
449
req->r_args.readdir.frag = cpu_to_le32(frag);
450
req->r_args.readdir.flags =
451
cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS);
452
453
req->r_inode = inode;
454
ihold(inode);
455
req->r_dentry = dget(file->f_path.dentry);
456
err = ceph_mdsc_do_request(mdsc, NULL, req);
457
if (err < 0) {
458
ceph_mdsc_put_request(req);
459
return err;
460
}
461
doutc(cl, "%p %llx.%llx got and parsed readdir result=%d"
462
"on frag %x, end=%d, complete=%d, hash_order=%d\n",
463
inode, ceph_vinop(inode), err, frag,
464
(int)req->r_reply_info.dir_end,
465
(int)req->r_reply_info.dir_complete,
466
(int)req->r_reply_info.hash_order);
467
468
rinfo = &req->r_reply_info;
469
if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
470
frag = le32_to_cpu(rinfo->dir_dir->frag);
471
if (!rinfo->hash_order) {
472
dfi->next_offset = req->r_readdir_offset;
473
/* adjust ctx->pos to beginning of frag */
474
ctx->pos = ceph_make_fpos(frag,
475
dfi->next_offset,
476
false);
477
}
478
}
479
480
dfi->frag = frag;
481
dfi->last_readdir = req;
482
483
if (test_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags)) {
484
dfi->readdir_cache_idx = req->r_readdir_cache_idx;
485
if (dfi->readdir_cache_idx < 0) {
486
/* preclude from marking dir ordered */
487
dfi->dir_ordered_count = 0;
488
} else if (ceph_frag_is_leftmost(frag) &&
489
dfi->next_offset == 2) {
490
/* note dir version at start of readdir so
491
* we can tell if any dentries get dropped */
492
dfi->dir_release_count = req->r_dir_release_cnt;
493
dfi->dir_ordered_count = req->r_dir_ordered_cnt;
494
}
495
} else {
496
doutc(cl, "%p %llx.%llx !did_prepopulate\n", inode,
497
ceph_vinop(inode));
498
/* disable readdir cache */
499
dfi->readdir_cache_idx = -1;
500
/* preclude from marking dir complete */
501
dfi->dir_release_count = 0;
502
}
503
504
/* note next offset and last dentry name */
505
if (rinfo->dir_nr > 0) {
506
struct ceph_mds_reply_dir_entry *rde =
507
rinfo->dir_entries + (rinfo->dir_nr-1);
508
unsigned next_offset = req->r_reply_info.dir_end ?
509
2 : (fpos_off(rde->offset) + 1);
510
err = note_last_dentry(fsc, dfi, rde->name,
511
rde->name_len, next_offset);
512
if (err) {
513
ceph_mdsc_put_request(dfi->last_readdir);
514
dfi->last_readdir = NULL;
515
return err;
516
}
517
} else if (req->r_reply_info.dir_end) {
518
dfi->next_offset = 2;
519
/* keep last name */
520
}
521
}
522
523
rinfo = &dfi->last_readdir->r_reply_info;
524
doutc(cl, "%p %llx.%llx frag %x num %d pos %llx chunk first %llx\n",
525
inode, ceph_vinop(inode), dfi->frag, rinfo->dir_nr, ctx->pos,
526
rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
527
528
i = 0;
529
/* search start position */
530
if (rinfo->dir_nr > 0) {
531
int step, nr = rinfo->dir_nr;
532
while (nr > 0) {
533
step = nr >> 1;
534
if (rinfo->dir_entries[i + step].offset < ctx->pos) {
535
i += step + 1;
536
nr -= step + 1;
537
} else {
538
nr = step;
539
}
540
}
541
}
542
for (; i < rinfo->dir_nr; i++) {
543
struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
544
545
if (rde->offset < ctx->pos) {
546
pr_warn_client(cl,
547
"%p %llx.%llx rde->offset 0x%llx ctx->pos 0x%llx\n",
548
inode, ceph_vinop(inode), rde->offset, ctx->pos);
549
return -EIO;
550
}
551
552
if (WARN_ON_ONCE(!rde->inode.in))
553
return -EIO;
554
555
ctx->pos = rde->offset;
556
doutc(cl, "%p %llx.%llx (%d/%d) -> %llx '%.*s' %p\n", inode,
557
ceph_vinop(inode), i, rinfo->dir_nr, ctx->pos,
558
rde->name_len, rde->name, &rde->inode.in);
559
560
if (!dir_emit(ctx, rde->name, rde->name_len,
561
ceph_present_ino(inode->i_sb, le64_to_cpu(rde->inode.in->ino)),
562
le32_to_cpu(rde->inode.in->mode) >> 12)) {
563
/*
564
* NOTE: Here no need to put the 'dfi->last_readdir',
565
* because when dir_emit stops us it's most likely
566
* doesn't have enough memory, etc. So for next readdir
567
* it will continue.
568
*/
569
doutc(cl, "filldir stopping us...\n");
570
return 0;
571
}
572
573
/* Reset the lengths to their original allocated vals */
574
ctx->pos++;
575
}
576
577
ceph_mdsc_put_request(dfi->last_readdir);
578
dfi->last_readdir = NULL;
579
580
if (dfi->next_offset > 2) {
581
frag = dfi->frag;
582
goto more;
583
}
584
585
/* more frags? */
586
if (!ceph_frag_is_rightmost(dfi->frag)) {
587
frag = ceph_frag_next(dfi->frag);
588
if (is_hash_order(ctx->pos)) {
589
loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
590
dfi->next_offset, true);
591
if (new_pos > ctx->pos)
592
ctx->pos = new_pos;
593
/* keep last_name */
594
} else {
595
ctx->pos = ceph_make_fpos(frag, dfi->next_offset,
596
false);
597
kfree(dfi->last_name);
598
dfi->last_name = NULL;
599
}
600
doutc(cl, "%p %llx.%llx next frag is %x\n", inode,
601
ceph_vinop(inode), frag);
602
goto more;
603
}
604
dfi->file_info.flags |= CEPH_F_ATEND;
605
606
/*
607
* if dir_release_count still matches the dir, no dentries
608
* were released during the whole readdir, and we should have
609
* the complete dir contents in our cache.
610
*/
611
if (atomic64_read(&ci->i_release_count) ==
612
dfi->dir_release_count) {
613
spin_lock(&ci->i_ceph_lock);
614
if (dfi->dir_ordered_count ==
615
atomic64_read(&ci->i_ordered_count)) {
616
doutc(cl, " marking %p %llx.%llx complete and ordered\n",
617
inode, ceph_vinop(inode));
618
/* use i_size to track number of entries in
619
* readdir cache */
620
BUG_ON(dfi->readdir_cache_idx < 0);
621
i_size_write(inode, dfi->readdir_cache_idx *
622
sizeof(struct dentry*));
623
} else {
624
doutc(cl, " marking %llx.%llx complete\n",
625
ceph_vinop(inode));
626
}
627
__ceph_dir_set_complete(ci, dfi->dir_release_count,
628
dfi->dir_ordered_count);
629
spin_unlock(&ci->i_ceph_lock);
630
}
631
doutc(cl, "%p %llx.%llx file %p done.\n", inode, ceph_vinop(inode),
632
file);
633
return 0;
634
}
635
636
static void reset_readdir(struct ceph_dir_file_info *dfi)
637
{
638
if (dfi->last_readdir) {
639
ceph_mdsc_put_request(dfi->last_readdir);
640
dfi->last_readdir = NULL;
641
}
642
kfree(dfi->last_name);
643
dfi->last_name = NULL;
644
dfi->dir_release_count = 0;
645
dfi->readdir_cache_idx = -1;
646
dfi->next_offset = 2; /* compensate for . and .. */
647
dfi->file_info.flags &= ~CEPH_F_ATEND;
648
}
649
650
/*
651
* discard buffered readdir content on seekdir(0), or seek to new frag,
652
* or seek prior to current chunk
653
*/
654
static bool need_reset_readdir(struct ceph_dir_file_info *dfi, loff_t new_pos)
655
{
656
struct ceph_mds_reply_info_parsed *rinfo;
657
loff_t chunk_offset;
658
if (new_pos == 0)
659
return true;
660
if (is_hash_order(new_pos)) {
661
/* no need to reset last_name for a forward seek when
662
* dentries are sorted in hash order */
663
} else if (dfi->frag != fpos_frag(new_pos)) {
664
return true;
665
}
666
rinfo = dfi->last_readdir ? &dfi->last_readdir->r_reply_info : NULL;
667
if (!rinfo || !rinfo->dir_nr)
668
return true;
669
chunk_offset = rinfo->dir_entries[0].offset;
670
return new_pos < chunk_offset ||
671
is_hash_order(new_pos) != is_hash_order(chunk_offset);
672
}
673
674
static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
675
{
676
struct ceph_dir_file_info *dfi = file->private_data;
677
struct inode *inode = file->f_mapping->host;
678
struct ceph_client *cl = ceph_inode_to_client(inode);
679
loff_t retval;
680
681
inode_lock(inode);
682
retval = -EINVAL;
683
switch (whence) {
684
case SEEK_CUR:
685
offset += file->f_pos;
686
break;
687
case SEEK_SET:
688
break;
689
case SEEK_END:
690
retval = -EOPNOTSUPP;
691
goto out;
692
default:
693
goto out;
694
}
695
696
if (offset >= 0) {
697
if (need_reset_readdir(dfi, offset)) {
698
doutc(cl, "%p %llx.%llx dropping %p content\n",
699
inode, ceph_vinop(inode), file);
700
reset_readdir(dfi);
701
} else if (is_hash_order(offset) && offset > file->f_pos) {
702
/* for hash offset, we don't know if a forward seek
703
* is within same frag */
704
dfi->dir_release_count = 0;
705
dfi->readdir_cache_idx = -1;
706
}
707
708
if (offset != file->f_pos) {
709
file->f_pos = offset;
710
dfi->file_info.flags &= ~CEPH_F_ATEND;
711
}
712
retval = offset;
713
}
714
out:
715
inode_unlock(inode);
716
return retval;
717
}
718
719
/*
720
* Handle lookups for the hidden .snap directory.
721
*/
722
struct dentry *ceph_handle_snapdir(struct ceph_mds_request *req,
723
struct dentry *dentry)
724
{
725
struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb);
726
struct inode *parent = d_inode(dentry->d_parent); /* we hold i_rwsem */
727
struct ceph_client *cl = ceph_inode_to_client(parent);
728
729
/* .snap dir? */
730
if (ceph_snap(parent) == CEPH_NOSNAP &&
731
strcmp(dentry->d_name.name, fsc->mount_options->snapdir_name) == 0) {
732
struct dentry *res;
733
struct inode *inode = ceph_get_snapdir(parent);
734
735
res = d_splice_alias(inode, dentry);
736
doutc(cl, "ENOENT on snapdir %p '%pd', linking to "
737
"snapdir %p %llx.%llx. Spliced dentry %p\n",
738
dentry, dentry, inode, ceph_vinop(inode), res);
739
if (res)
740
dentry = res;
741
}
742
return dentry;
743
}
744
745
/*
746
* Figure out final result of a lookup/open request.
747
*
748
* Mainly, make sure we return the final req->r_dentry (if it already
749
* existed) in place of the original VFS-provided dentry when they
750
* differ.
751
*
752
* Gracefully handle the case where the MDS replies with -ENOENT and
753
* no trace (which it may do, at its discretion, e.g., if it doesn't
754
* care to issue a lease on the negative dentry).
755
*/
756
struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
757
struct dentry *dentry, int err)
758
{
759
struct ceph_client *cl = req->r_mdsc->fsc->client;
760
761
if (err == -ENOENT) {
762
/* no trace? */
763
err = 0;
764
if (!req->r_reply_info.head->is_dentry) {
765
doutc(cl,
766
"ENOENT and no trace, dentry %p inode %llx.%llx\n",
767
dentry, ceph_vinop(d_inode(dentry)));
768
if (d_really_is_positive(dentry)) {
769
d_drop(dentry);
770
err = -ENOENT;
771
} else {
772
d_add(dentry, NULL);
773
}
774
}
775
}
776
if (err)
777
dentry = ERR_PTR(err);
778
else if (dentry != req->r_dentry)
779
dentry = dget(req->r_dentry); /* we got spliced */
780
else
781
dentry = NULL;
782
return dentry;
783
}
784
785
static bool is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
786
{
787
return ceph_ino(inode) == CEPH_INO_ROOT &&
788
strncmp(dentry->d_name.name, ".ceph", 5) == 0;
789
}
790
791
/*
792
* Look up a single dir entry. If there is a lookup intent, inform
793
* the MDS so that it gets our 'caps wanted' value in a single op.
794
*/
795
static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
796
unsigned int flags)
797
{
798
struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dir->i_sb);
799
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
800
struct ceph_client *cl = fsc->client;
801
struct ceph_mds_request *req;
802
int op;
803
int mask;
804
int err;
805
806
doutc(cl, "%p %llx.%llx/'%pd' dentry %p\n", dir, ceph_vinop(dir),
807
dentry, dentry);
808
809
if (dentry->d_name.len > NAME_MAX)
810
return ERR_PTR(-ENAMETOOLONG);
811
812
if (IS_ENCRYPTED(dir)) {
813
bool had_key = fscrypt_has_encryption_key(dir);
814
815
err = fscrypt_prepare_lookup_partial(dir, dentry);
816
if (err < 0)
817
return ERR_PTR(err);
818
819
/* mark directory as incomplete if it has been unlocked */
820
if (!had_key && fscrypt_has_encryption_key(dir))
821
ceph_dir_clear_complete(dir);
822
}
823
824
/* can we conclude ENOENT locally? */
825
if (d_really_is_negative(dentry)) {
826
struct ceph_inode_info *ci = ceph_inode(dir);
827
struct ceph_dentry_info *di = ceph_dentry(dentry);
828
829
spin_lock(&ci->i_ceph_lock);
830
doutc(cl, " dir %llx.%llx flags are 0x%lx\n",
831
ceph_vinop(dir), ci->i_ceph_flags);
832
if (strncmp(dentry->d_name.name,
833
fsc->mount_options->snapdir_name,
834
dentry->d_name.len) &&
835
!is_root_ceph_dentry(dir, dentry) &&
836
ceph_test_mount_opt(fsc, DCACHE) &&
837
__ceph_dir_is_complete(ci) &&
838
__ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
839
__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
840
spin_unlock(&ci->i_ceph_lock);
841
doutc(cl, " dir %llx.%llx complete, -ENOENT\n",
842
ceph_vinop(dir));
843
d_add(dentry, NULL);
844
di->lease_shared_gen = atomic_read(&ci->i_shared_gen);
845
return NULL;
846
}
847
spin_unlock(&ci->i_ceph_lock);
848
}
849
850
op = ceph_snap(dir) == CEPH_SNAPDIR ?
851
CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
852
req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
853
if (IS_ERR(req))
854
return ERR_CAST(req);
855
req->r_dentry = dget(dentry);
856
req->r_num_caps = 2;
857
858
mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
859
if (ceph_security_xattr_wanted(dir))
860
mask |= CEPH_CAP_XATTR_SHARED;
861
req->r_args.getattr.mask = cpu_to_le32(mask);
862
863
ihold(dir);
864
req->r_parent = dir;
865
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
866
err = ceph_mdsc_do_request(mdsc, NULL, req);
867
if (err == -ENOENT) {
868
struct dentry *res;
869
870
res = ceph_handle_snapdir(req, dentry);
871
if (IS_ERR(res)) {
872
err = PTR_ERR(res);
873
} else {
874
dentry = res;
875
err = 0;
876
}
877
}
878
dentry = ceph_finish_lookup(req, dentry, err);
879
ceph_mdsc_put_request(req); /* will dput(dentry) */
880
doutc(cl, "result=%p\n", dentry);
881
return dentry;
882
}
883
884
/*
885
* If we do a create but get no trace back from the MDS, follow up with
886
* a lookup (the VFS expects us to link up the provided dentry).
887
*/
888
int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
889
{
890
struct dentry *result = ceph_lookup(dir, dentry, 0);
891
892
if (result && !IS_ERR(result)) {
893
/*
894
* We created the item, then did a lookup, and found
895
* it was already linked to another inode we already
896
* had in our cache (and thus got spliced). To not
897
* confuse VFS (especially when inode is a directory),
898
* we don't link our dentry to that inode, return an
899
* error instead.
900
*
901
* This event should be rare and it happens only when
902
* we talk to old MDS. Recent MDS does not send traceless
903
* reply for request that creates new inode.
904
*/
905
d_drop(result);
906
return -ESTALE;
907
}
908
return PTR_ERR(result);
909
}
910
911
static int ceph_mknod(struct mnt_idmap *idmap, struct inode *dir,
912
struct dentry *dentry, umode_t mode, dev_t rdev)
913
{
914
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
915
struct ceph_client *cl = mdsc->fsc->client;
916
struct ceph_mds_request *req;
917
struct ceph_acl_sec_ctx as_ctx = {};
918
int err;
919
920
if (ceph_snap(dir) != CEPH_NOSNAP)
921
return -EROFS;
922
923
err = ceph_wait_on_conflict_unlink(dentry);
924
if (err)
925
return err;
926
927
if (ceph_quota_is_max_files_exceeded(dir)) {
928
err = -EDQUOT;
929
goto out;
930
}
931
932
doutc(cl, "%p %llx.%llx/'%pd' dentry %p mode 0%ho rdev %d\n",
933
dir, ceph_vinop(dir), dentry, dentry, mode, rdev);
934
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
935
if (IS_ERR(req)) {
936
err = PTR_ERR(req);
937
goto out;
938
}
939
940
req->r_new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
941
if (IS_ERR(req->r_new_inode)) {
942
err = PTR_ERR(req->r_new_inode);
943
req->r_new_inode = NULL;
944
goto out_req;
945
}
946
947
if (S_ISREG(mode) && IS_ENCRYPTED(dir))
948
set_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags);
949
950
req->r_dentry = dget(dentry);
951
req->r_num_caps = 2;
952
req->r_parent = dir;
953
ihold(dir);
954
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
955
req->r_mnt_idmap = mnt_idmap_get(idmap);
956
req->r_args.mknod.mode = cpu_to_le32(mode);
957
req->r_args.mknod.rdev = cpu_to_le32(rdev);
958
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL |
959
CEPH_CAP_XATTR_EXCL;
960
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
961
962
ceph_as_ctx_to_req(req, &as_ctx);
963
964
err = ceph_mdsc_do_request(mdsc, dir, req);
965
if (!err && !req->r_reply_info.head->is_dentry)
966
err = ceph_handle_notrace_create(dir, dentry);
967
out_req:
968
ceph_mdsc_put_request(req);
969
out:
970
if (!err)
971
ceph_init_inode_acls(d_inode(dentry), &as_ctx);
972
else
973
d_drop(dentry);
974
ceph_release_acl_sec_ctx(&as_ctx);
975
return err;
976
}
977
978
static int ceph_create(struct mnt_idmap *idmap, struct inode *dir,
979
struct dentry *dentry, umode_t mode, bool excl)
980
{
981
return ceph_mknod(idmap, dir, dentry, mode, 0);
982
}
983
984
#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
985
static int prep_encrypted_symlink_target(struct ceph_mds_request *req,
986
const char *dest)
987
{
988
int err;
989
int len = strlen(dest);
990
struct fscrypt_str osd_link = FSTR_INIT(NULL, 0);
991
992
err = fscrypt_prepare_symlink(req->r_parent, dest, len, PATH_MAX,
993
&osd_link);
994
if (err)
995
goto out;
996
997
err = fscrypt_encrypt_symlink(req->r_new_inode, dest, len, &osd_link);
998
if (err)
999
goto out;
1000
1001
req->r_path2 = kmalloc(CEPH_BASE64_CHARS(osd_link.len) + 1, GFP_KERNEL);
1002
if (!req->r_path2) {
1003
err = -ENOMEM;
1004
goto out;
1005
}
1006
1007
len = ceph_base64_encode(osd_link.name, osd_link.len, req->r_path2);
1008
req->r_path2[len] = '\0';
1009
out:
1010
fscrypt_fname_free_buffer(&osd_link);
1011
return err;
1012
}
1013
#else
1014
static int prep_encrypted_symlink_target(struct ceph_mds_request *req,
1015
const char *dest)
1016
{
1017
return -EOPNOTSUPP;
1018
}
1019
#endif
1020
1021
static int ceph_symlink(struct mnt_idmap *idmap, struct inode *dir,
1022
struct dentry *dentry, const char *dest)
1023
{
1024
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
1025
struct ceph_client *cl = mdsc->fsc->client;
1026
struct ceph_mds_request *req;
1027
struct ceph_acl_sec_ctx as_ctx = {};
1028
umode_t mode = S_IFLNK | 0777;
1029
int err;
1030
1031
if (ceph_snap(dir) != CEPH_NOSNAP)
1032
return -EROFS;
1033
1034
err = ceph_wait_on_conflict_unlink(dentry);
1035
if (err)
1036
return err;
1037
1038
if (ceph_quota_is_max_files_exceeded(dir)) {
1039
err = -EDQUOT;
1040
goto out;
1041
}
1042
1043
doutc(cl, "%p %llx.%llx/'%pd' to '%s'\n", dir, ceph_vinop(dir), dentry,
1044
dest);
1045
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
1046
if (IS_ERR(req)) {
1047
err = PTR_ERR(req);
1048
goto out;
1049
}
1050
1051
req->r_new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
1052
if (IS_ERR(req->r_new_inode)) {
1053
err = PTR_ERR(req->r_new_inode);
1054
req->r_new_inode = NULL;
1055
goto out_req;
1056
}
1057
1058
req->r_parent = dir;
1059
ihold(dir);
1060
1061
if (IS_ENCRYPTED(req->r_new_inode)) {
1062
err = prep_encrypted_symlink_target(req, dest);
1063
if (err)
1064
goto out_req;
1065
} else {
1066
req->r_path2 = kstrdup(dest, GFP_KERNEL);
1067
if (!req->r_path2) {
1068
err = -ENOMEM;
1069
goto out_req;
1070
}
1071
}
1072
1073
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
1074
req->r_mnt_idmap = mnt_idmap_get(idmap);
1075
req->r_dentry = dget(dentry);
1076
req->r_num_caps = 2;
1077
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL |
1078
CEPH_CAP_XATTR_EXCL;
1079
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1080
1081
ceph_as_ctx_to_req(req, &as_ctx);
1082
1083
err = ceph_mdsc_do_request(mdsc, dir, req);
1084
if (!err && !req->r_reply_info.head->is_dentry)
1085
err = ceph_handle_notrace_create(dir, dentry);
1086
out_req:
1087
ceph_mdsc_put_request(req);
1088
out:
1089
if (err)
1090
d_drop(dentry);
1091
ceph_release_acl_sec_ctx(&as_ctx);
1092
return err;
1093
}
1094
1095
static struct dentry *ceph_mkdir(struct mnt_idmap *idmap, struct inode *dir,
1096
struct dentry *dentry, umode_t mode)
1097
{
1098
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
1099
struct ceph_client *cl = mdsc->fsc->client;
1100
struct ceph_mds_request *req;
1101
struct ceph_acl_sec_ctx as_ctx = {};
1102
struct dentry *ret;
1103
int err;
1104
int op;
1105
1106
err = ceph_wait_on_conflict_unlink(dentry);
1107
if (err)
1108
return ERR_PTR(err);
1109
1110
if (ceph_snap(dir) == CEPH_SNAPDIR) {
1111
/* mkdir .snap/foo is a MKSNAP */
1112
op = CEPH_MDS_OP_MKSNAP;
1113
doutc(cl, "mksnap %llx.%llx/'%pd' dentry %p\n",
1114
ceph_vinop(dir), dentry, dentry);
1115
} else if (ceph_snap(dir) == CEPH_NOSNAP) {
1116
doutc(cl, "mkdir %llx.%llx/'%pd' dentry %p mode 0%ho\n",
1117
ceph_vinop(dir), dentry, dentry, mode);
1118
op = CEPH_MDS_OP_MKDIR;
1119
} else {
1120
ret = ERR_PTR(-EROFS);
1121
goto out;
1122
}
1123
1124
if (op == CEPH_MDS_OP_MKDIR &&
1125
ceph_quota_is_max_files_exceeded(dir)) {
1126
ret = ERR_PTR(-EDQUOT);
1127
goto out;
1128
}
1129
if ((op == CEPH_MDS_OP_MKSNAP) && IS_ENCRYPTED(dir) &&
1130
!fscrypt_has_encryption_key(dir)) {
1131
ret = ERR_PTR(-ENOKEY);
1132
goto out;
1133
}
1134
1135
1136
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
1137
if (IS_ERR(req)) {
1138
ret = ERR_CAST(req);
1139
goto out;
1140
}
1141
1142
mode |= S_IFDIR;
1143
req->r_new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
1144
if (IS_ERR(req->r_new_inode)) {
1145
ret = ERR_CAST(req->r_new_inode);
1146
req->r_new_inode = NULL;
1147
goto out_req;
1148
}
1149
1150
req->r_dentry = dget(dentry);
1151
req->r_num_caps = 2;
1152
req->r_parent = dir;
1153
ihold(dir);
1154
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
1155
if (op == CEPH_MDS_OP_MKDIR)
1156
req->r_mnt_idmap = mnt_idmap_get(idmap);
1157
req->r_args.mkdir.mode = cpu_to_le32(mode);
1158
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL |
1159
CEPH_CAP_XATTR_EXCL;
1160
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1161
1162
ceph_as_ctx_to_req(req, &as_ctx);
1163
1164
err = ceph_mdsc_do_request(mdsc, dir, req);
1165
if (!err &&
1166
!req->r_reply_info.head->is_target &&
1167
!req->r_reply_info.head->is_dentry)
1168
err = ceph_handle_notrace_create(dir, dentry);
1169
ret = ERR_PTR(err);
1170
out_req:
1171
if (!IS_ERR(ret) && req->r_dentry != dentry)
1172
/* Some other dentry was spliced in */
1173
ret = dget(req->r_dentry);
1174
ceph_mdsc_put_request(req);
1175
out:
1176
if (!IS_ERR(ret)) {
1177
if (ret)
1178
dentry = ret;
1179
ceph_init_inode_acls(d_inode(dentry), &as_ctx);
1180
} else {
1181
d_drop(dentry);
1182
}
1183
ceph_release_acl_sec_ctx(&as_ctx);
1184
return ret;
1185
}
1186
1187
static int ceph_link(struct dentry *old_dentry, struct inode *dir,
1188
struct dentry *dentry)
1189
{
1190
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
1191
struct ceph_client *cl = mdsc->fsc->client;
1192
struct ceph_mds_request *req;
1193
int err;
1194
1195
if (dentry->d_flags & DCACHE_DISCONNECTED)
1196
return -EINVAL;
1197
1198
err = ceph_wait_on_conflict_unlink(dentry);
1199
if (err)
1200
return err;
1201
1202
if (ceph_snap(dir) != CEPH_NOSNAP)
1203
return -EROFS;
1204
1205
err = fscrypt_prepare_link(old_dentry, dir, dentry);
1206
if (err)
1207
return err;
1208
1209
doutc(cl, "%p %llx.%llx/'%pd' to '%pd'\n", dir, ceph_vinop(dir),
1210
old_dentry, dentry);
1211
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS);
1212
if (IS_ERR(req)) {
1213
d_drop(dentry);
1214
return PTR_ERR(req);
1215
}
1216
req->r_dentry = dget(dentry);
1217
req->r_num_caps = 2;
1218
req->r_old_dentry = dget(old_dentry);
1219
/*
1220
* The old_dentry maybe a DCACHE_DISCONNECTED dentry, then we
1221
* will just pass the ino# to MDSs.
1222
*/
1223
if (old_dentry->d_flags & DCACHE_DISCONNECTED)
1224
req->r_ino2 = ceph_vino(d_inode(old_dentry));
1225
req->r_parent = dir;
1226
ihold(dir);
1227
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
1228
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_XATTR_EXCL;
1229
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1230
/* release LINK_SHARED on source inode (mds will lock it) */
1231
req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
1232
err = ceph_mdsc_do_request(mdsc, dir, req);
1233
if (err) {
1234
d_drop(dentry);
1235
} else if (!req->r_reply_info.head->is_dentry) {
1236
ihold(d_inode(old_dentry));
1237
d_instantiate(dentry, d_inode(old_dentry));
1238
}
1239
ceph_mdsc_put_request(req);
1240
return err;
1241
}
1242
1243
static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
1244
struct ceph_mds_request *req)
1245
{
1246
struct dentry *dentry = req->r_dentry;
1247
struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb);
1248
struct ceph_client *cl = fsc->client;
1249
struct ceph_dentry_info *di = ceph_dentry(dentry);
1250
int result = req->r_err ? req->r_err :
1251
le32_to_cpu(req->r_reply_info.head->result);
1252
1253
if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
1254
pr_warn_client(cl,
1255
"dentry %p:%pd async unlink bit is not set\n",
1256
dentry, dentry);
1257
1258
spin_lock(&fsc->async_unlink_conflict_lock);
1259
hash_del_rcu(&di->hnode);
1260
spin_unlock(&fsc->async_unlink_conflict_lock);
1261
1262
spin_lock(&dentry->d_lock);
1263
di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK;
1264
wake_up_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT);
1265
spin_unlock(&dentry->d_lock);
1266
1267
synchronize_rcu();
1268
1269
if (result == -EJUKEBOX)
1270
goto out;
1271
1272
/* If op failed, mark everyone involved for errors */
1273
if (result) {
1274
struct ceph_path_info path_info = {0};
1275
char *path = ceph_mdsc_build_path(mdsc, dentry, &path_info, 0);
1276
1277
/* mark error on parent + clear complete */
1278
mapping_set_error(req->r_parent->i_mapping, result);
1279
ceph_dir_clear_complete(req->r_parent);
1280
1281
/* drop the dentry -- we don't know its status */
1282
if (!d_unhashed(dentry))
1283
d_drop(dentry);
1284
1285
/* mark inode itself for an error (since metadata is bogus) */
1286
mapping_set_error(req->r_old_inode->i_mapping, result);
1287
1288
pr_warn_client(cl, "failure path=(%llx)%s result=%d!\n",
1289
path_info.vino.ino, IS_ERR(path) ? "<<bad>>" : path, result);
1290
ceph_mdsc_free_path_info(&path_info);
1291
}
1292
out:
1293
iput(req->r_old_inode);
1294
ceph_mdsc_release_dir_caps(req);
1295
}
1296
1297
static int get_caps_for_async_unlink(struct inode *dir, struct dentry *dentry)
1298
{
1299
struct ceph_inode_info *ci = ceph_inode(dir);
1300
struct ceph_dentry_info *di;
1301
int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_UNLINK;
1302
1303
spin_lock(&ci->i_ceph_lock);
1304
if ((__ceph_caps_issued(ci, NULL) & want) == want) {
1305
ceph_take_cap_refs(ci, want, false);
1306
got = want;
1307
}
1308
spin_unlock(&ci->i_ceph_lock);
1309
1310
/* If we didn't get anything, return 0 */
1311
if (!got)
1312
return 0;
1313
1314
spin_lock(&dentry->d_lock);
1315
di = ceph_dentry(dentry);
1316
/*
1317
* - We are holding Fx, which implies Fs caps.
1318
* - Only support async unlink for primary linkage
1319
*/
1320
if (atomic_read(&ci->i_shared_gen) != di->lease_shared_gen ||
1321
!(di->flags & CEPH_DENTRY_PRIMARY_LINK))
1322
want = 0;
1323
spin_unlock(&dentry->d_lock);
1324
1325
/* Do we still want what we've got? */
1326
if (want == got)
1327
return got;
1328
1329
ceph_put_cap_refs(ci, got);
1330
return 0;
1331
}
1332
1333
/*
1334
* rmdir and unlink are differ only by the metadata op code
1335
*/
1336
static int ceph_unlink(struct inode *dir, struct dentry *dentry)
1337
{
1338
struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dir->i_sb);
1339
struct ceph_client *cl = fsc->client;
1340
struct ceph_mds_client *mdsc = fsc->mdsc;
1341
struct inode *inode = d_inode(dentry);
1342
struct ceph_mds_request *req;
1343
bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS);
1344
struct dentry *dn;
1345
int err = -EROFS;
1346
int op;
1347
char *path;
1348
1349
if (ceph_snap(dir) == CEPH_SNAPDIR) {
1350
/* rmdir .snap/foo is RMSNAP */
1351
doutc(cl, "rmsnap %llx.%llx/'%pd' dn\n", ceph_vinop(dir),
1352
dentry);
1353
op = CEPH_MDS_OP_RMSNAP;
1354
} else if (ceph_snap(dir) == CEPH_NOSNAP) {
1355
doutc(cl, "unlink/rmdir %llx.%llx/'%pd' inode %llx.%llx\n",
1356
ceph_vinop(dir), dentry, ceph_vinop(inode));
1357
op = d_is_dir(dentry) ?
1358
CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
1359
} else
1360
goto out;
1361
1362
dn = d_find_alias(dir);
1363
if (!dn) {
1364
try_async = false;
1365
} else {
1366
struct ceph_path_info path_info;
1367
path = ceph_mdsc_build_path(mdsc, dn, &path_info, 0);
1368
if (IS_ERR(path)) {
1369
try_async = false;
1370
err = 0;
1371
} else {
1372
err = ceph_mds_check_access(mdsc, path, MAY_WRITE);
1373
}
1374
ceph_mdsc_free_path_info(&path_info);
1375
dput(dn);
1376
1377
/* For none EACCES cases will let the MDS do the mds auth check */
1378
if (err == -EACCES) {
1379
return err;
1380
} else if (err < 0) {
1381
try_async = false;
1382
err = 0;
1383
}
1384
}
1385
1386
retry:
1387
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
1388
if (IS_ERR(req)) {
1389
err = PTR_ERR(req);
1390
goto out;
1391
}
1392
req->r_dentry = dget(dentry);
1393
req->r_num_caps = 2;
1394
req->r_parent = dir;
1395
ihold(dir);
1396
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_XATTR_EXCL;
1397
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1398
req->r_inode_drop = ceph_drop_caps_for_unlink(inode);
1399
1400
if (try_async && op == CEPH_MDS_OP_UNLINK &&
1401
(req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) {
1402
struct ceph_dentry_info *di = ceph_dentry(dentry);
1403
1404
doutc(cl, "async unlink on %llx.%llx/'%pd' caps=%s",
1405
ceph_vinop(dir), dentry,
1406
ceph_cap_string(req->r_dir_caps));
1407
set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
1408
req->r_callback = ceph_async_unlink_cb;
1409
req->r_old_inode = d_inode(dentry);
1410
ihold(req->r_old_inode);
1411
1412
spin_lock(&dentry->d_lock);
1413
di->flags |= CEPH_DENTRY_ASYNC_UNLINK;
1414
spin_unlock(&dentry->d_lock);
1415
1416
spin_lock(&fsc->async_unlink_conflict_lock);
1417
hash_add_rcu(fsc->async_unlink_conflict, &di->hnode,
1418
dentry->d_name.hash);
1419
spin_unlock(&fsc->async_unlink_conflict_lock);
1420
1421
err = ceph_mdsc_submit_request(mdsc, dir, req);
1422
if (!err) {
1423
/*
1424
* We have enough caps, so we assume that the unlink
1425
* will succeed. Fix up the target inode and dcache.
1426
*/
1427
drop_nlink(inode);
1428
d_delete(dentry);
1429
} else {
1430
spin_lock(&fsc->async_unlink_conflict_lock);
1431
hash_del_rcu(&di->hnode);
1432
spin_unlock(&fsc->async_unlink_conflict_lock);
1433
1434
spin_lock(&dentry->d_lock);
1435
di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK;
1436
spin_unlock(&dentry->d_lock);
1437
1438
if (err == -EJUKEBOX) {
1439
try_async = false;
1440
ceph_mdsc_put_request(req);
1441
goto retry;
1442
}
1443
}
1444
} else {
1445
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
1446
err = ceph_mdsc_do_request(mdsc, dir, req);
1447
if (!err && !req->r_reply_info.head->is_dentry)
1448
d_delete(dentry);
1449
}
1450
1451
ceph_mdsc_put_request(req);
1452
out:
1453
return err;
1454
}
1455
1456
static int ceph_rename(struct mnt_idmap *idmap, struct inode *old_dir,
1457
struct dentry *old_dentry, struct inode *new_dir,
1458
struct dentry *new_dentry, unsigned int flags)
1459
{
1460
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(old_dir->i_sb);
1461
struct ceph_client *cl = mdsc->fsc->client;
1462
struct ceph_mds_request *req;
1463
int op = CEPH_MDS_OP_RENAME;
1464
int err;
1465
1466
if (flags)
1467
return -EINVAL;
1468
1469
if (ceph_snap(old_dir) != ceph_snap(new_dir))
1470
return -EXDEV;
1471
if (ceph_snap(old_dir) != CEPH_NOSNAP) {
1472
if (old_dir == new_dir && ceph_snap(old_dir) == CEPH_SNAPDIR)
1473
op = CEPH_MDS_OP_RENAMESNAP;
1474
else
1475
return -EROFS;
1476
}
1477
/* don't allow cross-quota renames */
1478
if ((old_dir != new_dir) &&
1479
(!ceph_quota_is_same_realm(old_dir, new_dir)))
1480
return -EXDEV;
1481
1482
err = ceph_wait_on_conflict_unlink(new_dentry);
1483
if (err)
1484
return err;
1485
1486
err = fscrypt_prepare_rename(old_dir, old_dentry, new_dir, new_dentry,
1487
flags);
1488
if (err)
1489
return err;
1490
1491
doutc(cl, "%llx.%llx/'%pd' to %llx.%llx/'%pd'\n",
1492
ceph_vinop(old_dir), old_dentry, ceph_vinop(new_dir),
1493
new_dentry);
1494
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
1495
if (IS_ERR(req))
1496
return PTR_ERR(req);
1497
ihold(old_dir);
1498
req->r_dentry = dget(new_dentry);
1499
req->r_num_caps = 2;
1500
req->r_old_dentry = dget(old_dentry);
1501
req->r_old_dentry_dir = old_dir;
1502
req->r_parent = new_dir;
1503
ihold(new_dir);
1504
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
1505
req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_XATTR_EXCL;
1506
req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
1507
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_XATTR_EXCL;
1508
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1509
/* release LINK_RDCACHE on source inode (mds will lock it) */
1510
req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
1511
if (d_really_is_positive(new_dentry)) {
1512
req->r_inode_drop =
1513
ceph_drop_caps_for_unlink(d_inode(new_dentry));
1514
}
1515
err = ceph_mdsc_do_request(mdsc, old_dir, req);
1516
if (!err && !req->r_reply_info.head->is_dentry) {
1517
/*
1518
* Normally d_move() is done by fill_trace (called by
1519
* do_request, above). If there is no trace, we need
1520
* to do it here.
1521
*/
1522
d_move(old_dentry, new_dentry);
1523
}
1524
ceph_mdsc_put_request(req);
1525
return err;
1526
}
1527
1528
/*
1529
* Move dentry to tail of mdsc->dentry_leases list when lease is updated.
1530
* Leases at front of the list will expire first. (Assume all leases have
1531
* similar duration)
1532
*
1533
* Called under dentry->d_lock.
1534
*/
1535
void __ceph_dentry_lease_touch(struct ceph_dentry_info *di)
1536
{
1537
struct dentry *dn = di->dentry;
1538
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(dn->d_sb)->mdsc;
1539
struct ceph_client *cl = mdsc->fsc->client;
1540
1541
doutc(cl, "%p %p '%pd'\n", di, dn, dn);
1542
1543
di->flags |= CEPH_DENTRY_LEASE_LIST;
1544
if (di->flags & CEPH_DENTRY_SHRINK_LIST) {
1545
di->flags |= CEPH_DENTRY_REFERENCED;
1546
return;
1547
}
1548
1549
spin_lock(&mdsc->dentry_list_lock);
1550
list_move_tail(&di->lease_list, &mdsc->dentry_leases);
1551
spin_unlock(&mdsc->dentry_list_lock);
1552
}
1553
1554
static void __dentry_dir_lease_touch(struct ceph_mds_client* mdsc,
1555
struct ceph_dentry_info *di)
1556
{
1557
di->flags &= ~(CEPH_DENTRY_LEASE_LIST | CEPH_DENTRY_REFERENCED);
1558
di->lease_gen = 0;
1559
di->time = jiffies;
1560
list_move_tail(&di->lease_list, &mdsc->dentry_dir_leases);
1561
}
1562
1563
/*
1564
* When dir lease is used, add dentry to tail of mdsc->dentry_dir_leases
1565
* list if it's not in the list, otherwise set 'referenced' flag.
1566
*
1567
* Called under dentry->d_lock.
1568
*/
1569
void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di)
1570
{
1571
struct dentry *dn = di->dentry;
1572
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(dn->d_sb)->mdsc;
1573
struct ceph_client *cl = mdsc->fsc->client;
1574
1575
doutc(cl, "%p %p '%pd' (offset 0x%llx)\n", di, dn, dn, di->offset);
1576
1577
if (!list_empty(&di->lease_list)) {
1578
if (di->flags & CEPH_DENTRY_LEASE_LIST) {
1579
/* don't remove dentry from dentry lease list
1580
* if its lease is valid */
1581
if (__dentry_lease_is_valid(di))
1582
return;
1583
} else {
1584
di->flags |= CEPH_DENTRY_REFERENCED;
1585
return;
1586
}
1587
}
1588
1589
if (di->flags & CEPH_DENTRY_SHRINK_LIST) {
1590
di->flags |= CEPH_DENTRY_REFERENCED;
1591
di->flags &= ~CEPH_DENTRY_LEASE_LIST;
1592
return;
1593
}
1594
1595
spin_lock(&mdsc->dentry_list_lock);
1596
__dentry_dir_lease_touch(mdsc, di);
1597
spin_unlock(&mdsc->dentry_list_lock);
1598
}
1599
1600
static void __dentry_lease_unlist(struct ceph_dentry_info *di)
1601
{
1602
struct ceph_mds_client *mdsc;
1603
if (di->flags & CEPH_DENTRY_SHRINK_LIST)
1604
return;
1605
if (list_empty(&di->lease_list))
1606
return;
1607
1608
mdsc = ceph_sb_to_fs_client(di->dentry->d_sb)->mdsc;
1609
spin_lock(&mdsc->dentry_list_lock);
1610
list_del_init(&di->lease_list);
1611
spin_unlock(&mdsc->dentry_list_lock);
1612
}
1613
1614
enum {
1615
KEEP = 0,
1616
DELETE = 1,
1617
TOUCH = 2,
1618
STOP = 4,
1619
};
1620
1621
struct ceph_lease_walk_control {
1622
bool dir_lease;
1623
bool expire_dir_lease;
1624
unsigned long nr_to_scan;
1625
unsigned long dir_lease_ttl;
1626
};
1627
1628
static int __dir_lease_check(const struct dentry *, struct ceph_lease_walk_control *);
1629
static int __dentry_lease_check(const struct dentry *);
1630
1631
static unsigned long
1632
__dentry_leases_walk(struct ceph_mds_client *mdsc,
1633
struct ceph_lease_walk_control *lwc)
1634
{
1635
struct ceph_dentry_info *di, *tmp;
1636
struct dentry *dentry, *last = NULL;
1637
struct list_head* list;
1638
LIST_HEAD(dispose);
1639
unsigned long freed = 0;
1640
int ret = 0;
1641
1642
list = lwc->dir_lease ? &mdsc->dentry_dir_leases : &mdsc->dentry_leases;
1643
spin_lock(&mdsc->dentry_list_lock);
1644
list_for_each_entry_safe(di, tmp, list, lease_list) {
1645
if (!lwc->nr_to_scan)
1646
break;
1647
--lwc->nr_to_scan;
1648
1649
dentry = di->dentry;
1650
if (last == dentry)
1651
break;
1652
1653
if (!spin_trylock(&dentry->d_lock))
1654
continue;
1655
1656
if (__lockref_is_dead(&dentry->d_lockref)) {
1657
list_del_init(&di->lease_list);
1658
goto next;
1659
}
1660
1661
if (lwc->dir_lease)
1662
ret = __dir_lease_check(dentry, lwc);
1663
else
1664
ret = __dentry_lease_check(dentry);
1665
if (ret & TOUCH) {
1666
/* move it into tail of dir lease list */
1667
__dentry_dir_lease_touch(mdsc, di);
1668
if (!last)
1669
last = dentry;
1670
}
1671
if (ret & DELETE) {
1672
/* stale lease */
1673
di->flags &= ~CEPH_DENTRY_REFERENCED;
1674
if (dentry->d_lockref.count > 0) {
1675
/* update_dentry_lease() will re-add
1676
* it to lease list, or
1677
* ceph_d_delete() will return 1 when
1678
* last reference is dropped */
1679
list_del_init(&di->lease_list);
1680
} else {
1681
di->flags |= CEPH_DENTRY_SHRINK_LIST;
1682
list_move_tail(&di->lease_list, &dispose);
1683
dget_dlock(dentry);
1684
}
1685
}
1686
next:
1687
spin_unlock(&dentry->d_lock);
1688
if (ret & STOP)
1689
break;
1690
}
1691
spin_unlock(&mdsc->dentry_list_lock);
1692
1693
while (!list_empty(&dispose)) {
1694
di = list_first_entry(&dispose, struct ceph_dentry_info,
1695
lease_list);
1696
dentry = di->dentry;
1697
spin_lock(&dentry->d_lock);
1698
1699
list_del_init(&di->lease_list);
1700
di->flags &= ~CEPH_DENTRY_SHRINK_LIST;
1701
if (di->flags & CEPH_DENTRY_REFERENCED) {
1702
spin_lock(&mdsc->dentry_list_lock);
1703
if (di->flags & CEPH_DENTRY_LEASE_LIST) {
1704
list_add_tail(&di->lease_list,
1705
&mdsc->dentry_leases);
1706
} else {
1707
__dentry_dir_lease_touch(mdsc, di);
1708
}
1709
spin_unlock(&mdsc->dentry_list_lock);
1710
} else {
1711
freed++;
1712
}
1713
1714
spin_unlock(&dentry->d_lock);
1715
/* ceph_d_delete() does the trick */
1716
dput(dentry);
1717
}
1718
return freed;
1719
}
1720
1721
static int __dentry_lease_check(const struct dentry *dentry)
1722
{
1723
struct ceph_dentry_info *di = ceph_dentry(dentry);
1724
int ret;
1725
1726
if (__dentry_lease_is_valid(di))
1727
return STOP;
1728
ret = __dir_lease_try_check(dentry);
1729
if (ret == -EBUSY)
1730
return KEEP;
1731
if (ret > 0)
1732
return TOUCH;
1733
return DELETE;
1734
}
1735
1736
static int __dir_lease_check(const struct dentry *dentry,
1737
struct ceph_lease_walk_control *lwc)
1738
{
1739
struct ceph_dentry_info *di = ceph_dentry(dentry);
1740
1741
int ret = __dir_lease_try_check(dentry);
1742
if (ret == -EBUSY)
1743
return KEEP;
1744
if (ret > 0) {
1745
if (time_before(jiffies, di->time + lwc->dir_lease_ttl))
1746
return STOP;
1747
/* Move dentry to tail of dir lease list if we don't want
1748
* to delete it. So dentries in the list are checked in a
1749
* round robin manner */
1750
if (!lwc->expire_dir_lease)
1751
return TOUCH;
1752
if (dentry->d_lockref.count > 0 ||
1753
(di->flags & CEPH_DENTRY_REFERENCED))
1754
return TOUCH;
1755
/* invalidate dir lease */
1756
di->lease_shared_gen = 0;
1757
}
1758
return DELETE;
1759
}
1760
1761
int ceph_trim_dentries(struct ceph_mds_client *mdsc)
1762
{
1763
struct ceph_lease_walk_control lwc;
1764
unsigned long count;
1765
unsigned long freed;
1766
1767
spin_lock(&mdsc->caps_list_lock);
1768
if (mdsc->caps_use_max > 0 &&
1769
mdsc->caps_use_count > mdsc->caps_use_max)
1770
count = mdsc->caps_use_count - mdsc->caps_use_max;
1771
else
1772
count = 0;
1773
spin_unlock(&mdsc->caps_list_lock);
1774
1775
lwc.dir_lease = false;
1776
lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE * 2;
1777
freed = __dentry_leases_walk(mdsc, &lwc);
1778
if (!lwc.nr_to_scan) /* more invalid leases */
1779
return -EAGAIN;
1780
1781
if (lwc.nr_to_scan < CEPH_CAPS_PER_RELEASE)
1782
lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE;
1783
1784
lwc.dir_lease = true;
1785
lwc.expire_dir_lease = freed < count;
1786
lwc.dir_lease_ttl = mdsc->fsc->mount_options->caps_wanted_delay_max * HZ;
1787
freed +=__dentry_leases_walk(mdsc, &lwc);
1788
if (!lwc.nr_to_scan) /* more to check */
1789
return -EAGAIN;
1790
1791
return freed > 0 ? 1 : 0;
1792
}
1793
1794
/*
1795
* Ensure a dentry lease will no longer revalidate.
1796
*/
1797
void ceph_invalidate_dentry_lease(struct dentry *dentry)
1798
{
1799
struct ceph_dentry_info *di = ceph_dentry(dentry);
1800
spin_lock(&dentry->d_lock);
1801
di->time = jiffies;
1802
di->lease_shared_gen = 0;
1803
di->flags &= ~CEPH_DENTRY_PRIMARY_LINK;
1804
__dentry_lease_unlist(di);
1805
spin_unlock(&dentry->d_lock);
1806
}
1807
1808
/*
1809
* Check if dentry lease is valid. If not, delete the lease. Try to
1810
* renew if the least is more than half up.
1811
*/
1812
static bool __dentry_lease_is_valid(struct ceph_dentry_info *di)
1813
{
1814
struct ceph_mds_session *session;
1815
1816
if (!di->lease_gen)
1817
return false;
1818
1819
session = di->lease_session;
1820
if (session) {
1821
u32 gen;
1822
unsigned long ttl;
1823
1824
gen = atomic_read(&session->s_cap_gen);
1825
ttl = session->s_cap_ttl;
1826
1827
if (di->lease_gen == gen &&
1828
time_before(jiffies, ttl) &&
1829
time_before(jiffies, di->time))
1830
return true;
1831
}
1832
di->lease_gen = 0;
1833
return false;
1834
}
1835
1836
static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags)
1837
{
1838
struct ceph_dentry_info *di;
1839
struct ceph_mds_session *session = NULL;
1840
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(dentry->d_sb)->mdsc;
1841
struct ceph_client *cl = mdsc->fsc->client;
1842
u32 seq = 0;
1843
int valid = 0;
1844
1845
spin_lock(&dentry->d_lock);
1846
di = ceph_dentry(dentry);
1847
if (di && __dentry_lease_is_valid(di)) {
1848
valid = 1;
1849
1850
if (di->lease_renew_after &&
1851
time_after(jiffies, di->lease_renew_after)) {
1852
/*
1853
* We should renew. If we're in RCU walk mode
1854
* though, we can't do that so just return
1855
* -ECHILD.
1856
*/
1857
if (flags & LOOKUP_RCU) {
1858
valid = -ECHILD;
1859
} else {
1860
session = ceph_get_mds_session(di->lease_session);
1861
seq = di->lease_seq;
1862
di->lease_renew_after = 0;
1863
di->lease_renew_from = jiffies;
1864
}
1865
}
1866
}
1867
spin_unlock(&dentry->d_lock);
1868
1869
if (session) {
1870
ceph_mdsc_lease_send_msg(session, dentry,
1871
CEPH_MDS_LEASE_RENEW, seq);
1872
ceph_put_mds_session(session);
1873
}
1874
doutc(cl, "dentry %p = %d\n", dentry, valid);
1875
return valid;
1876
}
1877
1878
/*
1879
* Called under dentry->d_lock.
1880
*/
1881
static int __dir_lease_try_check(const struct dentry *dentry)
1882
{
1883
struct ceph_dentry_info *di = ceph_dentry(dentry);
1884
struct inode *dir;
1885
struct ceph_inode_info *ci;
1886
int valid = 0;
1887
1888
if (!di->lease_shared_gen)
1889
return 0;
1890
if (IS_ROOT(dentry))
1891
return 0;
1892
1893
dir = d_inode(dentry->d_parent);
1894
ci = ceph_inode(dir);
1895
1896
if (spin_trylock(&ci->i_ceph_lock)) {
1897
if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen &&
1898
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 0))
1899
valid = 1;
1900
spin_unlock(&ci->i_ceph_lock);
1901
} else {
1902
valid = -EBUSY;
1903
}
1904
1905
if (!valid)
1906
di->lease_shared_gen = 0;
1907
return valid;
1908
}
1909
1910
/*
1911
* Check if directory-wide content lease/cap is valid.
1912
*/
1913
static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry,
1914
struct ceph_mds_client *mdsc)
1915
{
1916
struct ceph_inode_info *ci = ceph_inode(dir);
1917
struct ceph_client *cl = mdsc->fsc->client;
1918
int valid;
1919
int shared_gen;
1920
1921
spin_lock(&ci->i_ceph_lock);
1922
valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
1923
if (valid) {
1924
__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
1925
shared_gen = atomic_read(&ci->i_shared_gen);
1926
}
1927
spin_unlock(&ci->i_ceph_lock);
1928
if (valid) {
1929
struct ceph_dentry_info *di;
1930
spin_lock(&dentry->d_lock);
1931
di = ceph_dentry(dentry);
1932
if (dir == d_inode(dentry->d_parent) &&
1933
di && di->lease_shared_gen == shared_gen)
1934
__ceph_dentry_dir_lease_touch(di);
1935
else
1936
valid = 0;
1937
spin_unlock(&dentry->d_lock);
1938
}
1939
doutc(cl, "dir %p %llx.%llx v%u dentry %p '%pd' = %d\n", dir,
1940
ceph_vinop(dir), (unsigned)atomic_read(&ci->i_shared_gen),
1941
dentry, dentry, valid);
1942
return valid;
1943
}
1944
1945
/*
1946
* Check if cached dentry can be trusted.
1947
*/
1948
static int ceph_d_revalidate(struct inode *dir, const struct qstr *name,
1949
struct dentry *dentry, unsigned int flags)
1950
{
1951
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(dentry->d_sb)->mdsc;
1952
struct ceph_client *cl = mdsc->fsc->client;
1953
int valid = 0;
1954
struct inode *inode;
1955
1956
valid = fscrypt_d_revalidate(dir, name, dentry, flags);
1957
if (valid <= 0)
1958
return valid;
1959
1960
inode = d_inode_rcu(dentry);
1961
1962
doutc(cl, "%p '%pd' inode %p offset 0x%llx nokey %d\n",
1963
dentry, dentry, inode, ceph_dentry(dentry)->offset,
1964
!!(dentry->d_flags & DCACHE_NOKEY_NAME));
1965
1966
mdsc = ceph_sb_to_fs_client(dir->i_sb)->mdsc;
1967
1968
/* always trust cached snapped dentries, snapdir dentry */
1969
if (ceph_snap(dir) != CEPH_NOSNAP) {
1970
doutc(cl, "%p '%pd' inode %p is SNAPPED\n", dentry,
1971
dentry, inode);
1972
valid = 1;
1973
} else if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1974
valid = 1;
1975
} else {
1976
valid = dentry_lease_is_valid(dentry, flags);
1977
if (valid == -ECHILD)
1978
return valid;
1979
if (valid || dir_lease_is_valid(dir, dentry, mdsc)) {
1980
if (inode)
1981
valid = ceph_is_any_caps(inode);
1982
else
1983
valid = 1;
1984
}
1985
}
1986
1987
if (!valid) {
1988
struct ceph_mds_request *req;
1989
int op, err;
1990
u32 mask;
1991
1992
if (flags & LOOKUP_RCU)
1993
return -ECHILD;
1994
1995
percpu_counter_inc(&mdsc->metric.d_lease_mis);
1996
1997
op = ceph_snap(dir) == CEPH_SNAPDIR ?
1998
CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
1999
req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
2000
if (!IS_ERR(req)) {
2001
req->r_dentry = dget(dentry);
2002
req->r_num_caps = 2;
2003
req->r_parent = dir;
2004
ihold(dir);
2005
2006
req->r_dname = name;
2007
2008
mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
2009
if (ceph_security_xattr_wanted(dir))
2010
mask |= CEPH_CAP_XATTR_SHARED;
2011
req->r_args.getattr.mask = cpu_to_le32(mask);
2012
2013
err = ceph_mdsc_do_request(mdsc, NULL, req);
2014
switch (err) {
2015
case 0:
2016
if (d_really_is_positive(dentry) &&
2017
d_inode(dentry) == req->r_target_inode)
2018
valid = 1;
2019
break;
2020
case -ENOENT:
2021
if (d_really_is_negative(dentry))
2022
valid = 1;
2023
fallthrough;
2024
default:
2025
break;
2026
}
2027
ceph_mdsc_put_request(req);
2028
doutc(cl, "%p '%pd', lookup result=%d\n", dentry,
2029
dentry, err);
2030
}
2031
} else {
2032
percpu_counter_inc(&mdsc->metric.d_lease_hit);
2033
}
2034
2035
doutc(cl, "%p '%pd' %s\n", dentry, dentry, valid ? "valid" : "invalid");
2036
if (!valid)
2037
ceph_dir_clear_complete(dir);
2038
return valid;
2039
}
2040
2041
/*
2042
* Delete unused dentry that doesn't have valid lease
2043
*
2044
* Called under dentry->d_lock.
2045
*/
2046
static int ceph_d_delete(const struct dentry *dentry)
2047
{
2048
struct ceph_dentry_info *di;
2049
2050
/* won't release caps */
2051
if (d_really_is_negative(dentry))
2052
return 0;
2053
if (ceph_snap(d_inode(dentry)) != CEPH_NOSNAP)
2054
return 0;
2055
/* valid lease? */
2056
di = ceph_dentry(dentry);
2057
if (di) {
2058
if (__dentry_lease_is_valid(di))
2059
return 0;
2060
if (__dir_lease_try_check(dentry))
2061
return 0;
2062
}
2063
return 1;
2064
}
2065
2066
/*
2067
* Release our ceph_dentry_info.
2068
*/
2069
static void ceph_d_release(struct dentry *dentry)
2070
{
2071
struct ceph_dentry_info *di = ceph_dentry(dentry);
2072
struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb);
2073
2074
doutc(fsc->client, "dentry %p '%pd'\n", dentry, dentry);
2075
2076
atomic64_dec(&fsc->mdsc->metric.total_dentries);
2077
2078
spin_lock(&dentry->d_lock);
2079
__dentry_lease_unlist(di);
2080
dentry->d_fsdata = NULL;
2081
spin_unlock(&dentry->d_lock);
2082
2083
ceph_put_mds_session(di->lease_session);
2084
kmem_cache_free(ceph_dentry_cachep, di);
2085
}
2086
2087
/*
2088
* When the VFS prunes a dentry from the cache, we need to clear the
2089
* complete flag on the parent directory.
2090
*
2091
* Called under dentry->d_lock.
2092
*/
2093
static void ceph_d_prune(struct dentry *dentry)
2094
{
2095
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dentry->d_sb);
2096
struct ceph_client *cl = mdsc->fsc->client;
2097
struct ceph_inode_info *dir_ci;
2098
struct ceph_dentry_info *di;
2099
2100
doutc(cl, "dentry %p '%pd'\n", dentry, dentry);
2101
2102
/* do we have a valid parent? */
2103
if (IS_ROOT(dentry))
2104
return;
2105
2106
/* we hold d_lock, so d_parent is stable */
2107
dir_ci = ceph_inode(d_inode(dentry->d_parent));
2108
if (dir_ci->i_vino.snap == CEPH_SNAPDIR)
2109
return;
2110
2111
/* who calls d_delete() should also disable dcache readdir */
2112
if (d_really_is_negative(dentry))
2113
return;
2114
2115
/* d_fsdata does not get cleared until d_release */
2116
if (!d_unhashed(dentry)) {
2117
__ceph_dir_clear_complete(dir_ci);
2118
return;
2119
}
2120
2121
/* Disable dcache readdir just in case that someone called d_drop()
2122
* or d_invalidate(), but MDS didn't revoke CEPH_CAP_FILE_SHARED
2123
* properly (dcache readdir is still enabled) */
2124
di = ceph_dentry(dentry);
2125
if (di->offset > 0 &&
2126
di->lease_shared_gen == atomic_read(&dir_ci->i_shared_gen))
2127
__ceph_dir_clear_ordered(dir_ci);
2128
}
2129
2130
/*
2131
* read() on a dir. This weird interface hack only works if mounted
2132
* with '-o dirstat'.
2133
*/
2134
static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
2135
loff_t *ppos)
2136
{
2137
struct ceph_dir_file_info *dfi = file->private_data;
2138
struct inode *inode = file_inode(file);
2139
struct ceph_inode_info *ci = ceph_inode(inode);
2140
int left;
2141
const int bufsize = 1024;
2142
2143
if (!ceph_test_mount_opt(ceph_sb_to_fs_client(inode->i_sb), DIRSTAT))
2144
return -EISDIR;
2145
2146
if (!dfi->dir_info) {
2147
dfi->dir_info = kmalloc(bufsize, GFP_KERNEL);
2148
if (!dfi->dir_info)
2149
return -ENOMEM;
2150
dfi->dir_info_len =
2151
snprintf(dfi->dir_info, bufsize,
2152
"entries: %20lld\n"
2153
" files: %20lld\n"
2154
" subdirs: %20lld\n"
2155
"rentries: %20lld\n"
2156
" rfiles: %20lld\n"
2157
" rsubdirs: %20lld\n"
2158
"rbytes: %20lld\n"
2159
"rctime: %10lld.%09ld\n",
2160
ci->i_files + ci->i_subdirs,
2161
ci->i_files,
2162
ci->i_subdirs,
2163
ci->i_rfiles + ci->i_rsubdirs,
2164
ci->i_rfiles,
2165
ci->i_rsubdirs,
2166
ci->i_rbytes,
2167
ci->i_rctime.tv_sec,
2168
ci->i_rctime.tv_nsec);
2169
}
2170
2171
if (*ppos >= dfi->dir_info_len)
2172
return 0;
2173
size = min_t(unsigned, size, dfi->dir_info_len-*ppos);
2174
left = copy_to_user(buf, dfi->dir_info + *ppos, size);
2175
if (left == size)
2176
return -EFAULT;
2177
*ppos += (size - left);
2178
return size - left;
2179
}
2180
2181
2182
2183
/*
2184
* Return name hash for a given dentry. This is dependent on
2185
* the parent directory's hash function.
2186
*/
2187
unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn)
2188
{
2189
struct ceph_inode_info *dci = ceph_inode(dir);
2190
unsigned hash;
2191
2192
switch (dci->i_dir_layout.dl_dir_hash) {
2193
case 0: /* for backward compat */
2194
case CEPH_STR_HASH_LINUX:
2195
return dn->d_name.hash;
2196
2197
default:
2198
spin_lock(&dn->d_lock);
2199
hash = ceph_str_hash(dci->i_dir_layout.dl_dir_hash,
2200
dn->d_name.name, dn->d_name.len);
2201
spin_unlock(&dn->d_lock);
2202
return hash;
2203
}
2204
}
2205
2206
WRAP_DIR_ITER(ceph_readdir) // FIXME!
2207
const struct file_operations ceph_dir_fops = {
2208
.read = ceph_read_dir,
2209
.iterate_shared = shared_ceph_readdir,
2210
.llseek = ceph_dir_llseek,
2211
.open = ceph_open,
2212
.release = ceph_release,
2213
.unlocked_ioctl = ceph_ioctl,
2214
.compat_ioctl = compat_ptr_ioctl,
2215
.fsync = ceph_fsync,
2216
.lock = ceph_lock,
2217
.flock = ceph_flock,
2218
};
2219
2220
const struct file_operations ceph_snapdir_fops = {
2221
.iterate_shared = shared_ceph_readdir,
2222
.llseek = ceph_dir_llseek,
2223
.open = ceph_open,
2224
.release = ceph_release,
2225
};
2226
2227
const struct inode_operations ceph_dir_iops = {
2228
.lookup = ceph_lookup,
2229
.permission = ceph_permission,
2230
.getattr = ceph_getattr,
2231
.setattr = ceph_setattr,
2232
.listxattr = ceph_listxattr,
2233
.get_inode_acl = ceph_get_acl,
2234
.set_acl = ceph_set_acl,
2235
.mknod = ceph_mknod,
2236
.symlink = ceph_symlink,
2237
.mkdir = ceph_mkdir,
2238
.link = ceph_link,
2239
.unlink = ceph_unlink,
2240
.rmdir = ceph_unlink,
2241
.rename = ceph_rename,
2242
.create = ceph_create,
2243
.atomic_open = ceph_atomic_open,
2244
};
2245
2246
const struct inode_operations ceph_snapdir_iops = {
2247
.lookup = ceph_lookup,
2248
.permission = ceph_permission,
2249
.getattr = ceph_getattr,
2250
.mkdir = ceph_mkdir,
2251
.rmdir = ceph_unlink,
2252
.rename = ceph_rename,
2253
};
2254
2255
const struct dentry_operations ceph_dentry_ops = {
2256
.d_revalidate = ceph_d_revalidate,
2257
.d_delete = ceph_d_delete,
2258
.d_release = ceph_d_release,
2259
.d_prune = ceph_d_prune,
2260
.d_init = ceph_d_init,
2261
};
2262
2263