Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
52868 views
1
/*
2
* Input async protocol.
3
* Copyright (c) 2015 Zhang Rui <[email protected]>
4
*
5
* This file is part of FFmpeg.
6
*
7
* FFmpeg is free software; you can redistribute it and/or
8
* modify it under the terms of the GNU Lesser General Public
9
* License as published by the Free Software Foundation; either
10
* version 2.1 of the License, or (at your option) any later version.
11
*
12
* FFmpeg is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
16
*
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with FFmpeg; if not, write to the Free Software
19
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20
*
21
* Based on libavformat/cache.c by Michael Niedermayer
22
*/
23
24
/**
25
* @TODO
26
* support timeout
27
* support work with concatdec, hls
28
*/
29
30
#include "libavutil/avassert.h"
31
#include "libavutil/avstring.h"
32
#include "libavutil/error.h"
33
#include "libavutil/fifo.h"
34
#include "libavutil/log.h"
35
#include "libavutil/opt.h"
36
#include "libavutil/thread.h"
37
#include "url.h"
38
#include <stdint.h>
39
40
#if HAVE_UNISTD_H
41
#include <unistd.h>
42
#endif
43
44
#define BUFFER_CAPACITY (4 * 1024 * 1024)
45
#define READ_BACK_CAPACITY (4 * 1024 * 1024)
46
#define SHORT_SEEK_THRESHOLD (256 * 1024)
47
48
typedef struct RingBuffer
49
{
50
AVFifoBuffer *fifo;
51
int read_back_capacity;
52
53
int read_pos;
54
} RingBuffer;
55
56
typedef struct Context {
57
AVClass *class;
58
URLContext *inner;
59
60
int seek_request;
61
int64_t seek_pos;
62
int seek_whence;
63
int seek_completed;
64
int64_t seek_ret;
65
66
int inner_io_error;
67
int io_error;
68
int io_eof_reached;
69
70
int64_t logical_pos;
71
int64_t logical_size;
72
RingBuffer ring;
73
74
pthread_cond_t cond_wakeup_main;
75
pthread_cond_t cond_wakeup_background;
76
pthread_mutex_t mutex;
77
pthread_t async_buffer_thread;
78
79
int abort_request;
80
AVIOInterruptCB interrupt_callback;
81
} Context;
82
83
static int ring_init(RingBuffer *ring, unsigned int capacity, int read_back_capacity)
84
{
85
memset(ring, 0, sizeof(RingBuffer));
86
ring->fifo = av_fifo_alloc(capacity + read_back_capacity);
87
if (!ring->fifo)
88
return AVERROR(ENOMEM);
89
90
ring->read_back_capacity = read_back_capacity;
91
return 0;
92
}
93
94
static void ring_destroy(RingBuffer *ring)
95
{
96
av_fifo_freep(&ring->fifo);
97
}
98
99
static void ring_reset(RingBuffer *ring)
100
{
101
av_fifo_reset(ring->fifo);
102
ring->read_pos = 0;
103
}
104
105
static int ring_size(RingBuffer *ring)
106
{
107
return av_fifo_size(ring->fifo) - ring->read_pos;
108
}
109
110
static int ring_space(RingBuffer *ring)
111
{
112
return av_fifo_space(ring->fifo);
113
}
114
115
static int ring_generic_read(RingBuffer *ring, void *dest, int buf_size, void (*func)(void*, void*, int))
116
{
117
int ret;
118
119
av_assert2(buf_size <= ring_size(ring));
120
ret = av_fifo_generic_peek_at(ring->fifo, dest, ring->read_pos, buf_size, func);
121
ring->read_pos += buf_size;
122
123
if (ring->read_pos > ring->read_back_capacity) {
124
av_fifo_drain(ring->fifo, ring->read_pos - ring->read_back_capacity);
125
ring->read_pos = ring->read_back_capacity;
126
}
127
128
return ret;
129
}
130
131
static int ring_generic_write(RingBuffer *ring, void *src, int size, int (*func)(void*, void*, int))
132
{
133
av_assert2(size <= ring_space(ring));
134
return av_fifo_generic_write(ring->fifo, src, size, func);
135
}
136
137
static int ring_size_of_read_back(RingBuffer *ring)
138
{
139
return ring->read_pos;
140
}
141
142
static int ring_drain(RingBuffer *ring, int offset)
143
{
144
av_assert2(offset >= -ring_size_of_read_back(ring));
145
av_assert2(offset <= -ring_size(ring));
146
ring->read_pos += offset;
147
return 0;
148
}
149
150
static int async_check_interrupt(void *arg)
151
{
152
URLContext *h = arg;
153
Context *c = h->priv_data;
154
155
if (c->abort_request)
156
return 1;
157
158
if (ff_check_interrupt(&c->interrupt_callback))
159
c->abort_request = 1;
160
161
return c->abort_request;
162
}
163
164
static int wrapped_url_read(void *src, void *dst, int size)
165
{
166
URLContext *h = src;
167
Context *c = h->priv_data;
168
int ret;
169
170
ret = ffurl_read(c->inner, dst, size);
171
c->inner_io_error = ret < 0 ? ret : 0;
172
173
return ret;
174
}
175
176
static void *async_buffer_task(void *arg)
177
{
178
URLContext *h = arg;
179
Context *c = h->priv_data;
180
RingBuffer *ring = &c->ring;
181
int ret = 0;
182
int64_t seek_ret;
183
184
while (1) {
185
int fifo_space, to_copy;
186
187
pthread_mutex_lock(&c->mutex);
188
if (async_check_interrupt(h)) {
189
c->io_eof_reached = 1;
190
c->io_error = AVERROR_EXIT;
191
pthread_cond_signal(&c->cond_wakeup_main);
192
pthread_mutex_unlock(&c->mutex);
193
break;
194
}
195
196
if (c->seek_request) {
197
seek_ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence);
198
if (seek_ret >= 0) {
199
c->io_eof_reached = 0;
200
c->io_error = 0;
201
ring_reset(ring);
202
}
203
204
c->seek_completed = 1;
205
c->seek_ret = seek_ret;
206
c->seek_request = 0;
207
208
209
pthread_cond_signal(&c->cond_wakeup_main);
210
pthread_mutex_unlock(&c->mutex);
211
continue;
212
}
213
214
fifo_space = ring_space(ring);
215
if (c->io_eof_reached || fifo_space <= 0) {
216
pthread_cond_signal(&c->cond_wakeup_main);
217
pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
218
pthread_mutex_unlock(&c->mutex);
219
continue;
220
}
221
pthread_mutex_unlock(&c->mutex);
222
223
to_copy = FFMIN(4096, fifo_space);
224
ret = ring_generic_write(ring, (void *)h, to_copy, wrapped_url_read);
225
226
pthread_mutex_lock(&c->mutex);
227
if (ret <= 0) {
228
c->io_eof_reached = 1;
229
if (c->inner_io_error < 0)
230
c->io_error = c->inner_io_error;
231
}
232
233
pthread_cond_signal(&c->cond_wakeup_main);
234
pthread_mutex_unlock(&c->mutex);
235
}
236
237
return NULL;
238
}
239
240
static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
241
{
242
Context *c = h->priv_data;
243
int ret;
244
AVIOInterruptCB interrupt_callback = {.callback = async_check_interrupt, .opaque = h};
245
246
av_strstart(arg, "async:", &arg);
247
248
ret = ring_init(&c->ring, BUFFER_CAPACITY, READ_BACK_CAPACITY);
249
if (ret < 0)
250
goto fifo_fail;
251
252
/* wrap interrupt callback */
253
c->interrupt_callback = h->interrupt_callback;
254
ret = ffurl_open_whitelist(&c->inner, arg, flags, &interrupt_callback, options, h->protocol_whitelist);
255
if (ret != 0) {
256
av_log(h, AV_LOG_ERROR, "ffurl_open failed : %s, %s\n", av_err2str(ret), arg);
257
goto url_fail;
258
}
259
260
c->logical_size = ffurl_size(c->inner);
261
h->is_streamed = c->inner->is_streamed;
262
263
ret = pthread_mutex_init(&c->mutex, NULL);
264
if (ret != 0) {
265
av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", av_err2str(ret));
266
goto mutex_fail;
267
}
268
269
ret = pthread_cond_init(&c->cond_wakeup_main, NULL);
270
if (ret != 0) {
271
av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
272
goto cond_wakeup_main_fail;
273
}
274
275
ret = pthread_cond_init(&c->cond_wakeup_background, NULL);
276
if (ret != 0) {
277
av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
278
goto cond_wakeup_background_fail;
279
}
280
281
ret = pthread_create(&c->async_buffer_thread, NULL, async_buffer_task, h);
282
if (ret) {
283
av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", av_err2str(ret));
284
goto thread_fail;
285
}
286
287
return 0;
288
289
thread_fail:
290
pthread_cond_destroy(&c->cond_wakeup_background);
291
cond_wakeup_background_fail:
292
pthread_cond_destroy(&c->cond_wakeup_main);
293
cond_wakeup_main_fail:
294
pthread_mutex_destroy(&c->mutex);
295
mutex_fail:
296
ffurl_close(c->inner);
297
url_fail:
298
ring_destroy(&c->ring);
299
fifo_fail:
300
return ret;
301
}
302
303
static int async_close(URLContext *h)
304
{
305
Context *c = h->priv_data;
306
int ret;
307
308
pthread_mutex_lock(&c->mutex);
309
c->abort_request = 1;
310
pthread_cond_signal(&c->cond_wakeup_background);
311
pthread_mutex_unlock(&c->mutex);
312
313
ret = pthread_join(c->async_buffer_thread, NULL);
314
if (ret != 0)
315
av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", av_err2str(ret));
316
317
pthread_cond_destroy(&c->cond_wakeup_background);
318
pthread_cond_destroy(&c->cond_wakeup_main);
319
pthread_mutex_destroy(&c->mutex);
320
ffurl_close(c->inner);
321
ring_destroy(&c->ring);
322
323
return 0;
324
}
325
326
static int async_read_internal(URLContext *h, void *dest, int size, int read_complete,
327
void (*func)(void*, void*, int))
328
{
329
Context *c = h->priv_data;
330
RingBuffer *ring = &c->ring;
331
int to_read = size;
332
int ret = 0;
333
334
pthread_mutex_lock(&c->mutex);
335
336
while (to_read > 0) {
337
int fifo_size, to_copy;
338
if (async_check_interrupt(h)) {
339
ret = AVERROR_EXIT;
340
break;
341
}
342
fifo_size = ring_size(ring);
343
to_copy = FFMIN(to_read, fifo_size);
344
if (to_copy > 0) {
345
ring_generic_read(ring, dest, to_copy, func);
346
if (!func)
347
dest = (uint8_t *)dest + to_copy;
348
c->logical_pos += to_copy;
349
to_read -= to_copy;
350
ret = size - to_read;
351
352
if (to_read <= 0 || !read_complete)
353
break;
354
} else if (c->io_eof_reached) {
355
if (ret <= 0) {
356
if (c->io_error)
357
ret = c->io_error;
358
else
359
ret = AVERROR_EOF;
360
}
361
break;
362
}
363
pthread_cond_signal(&c->cond_wakeup_background);
364
pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
365
}
366
367
pthread_cond_signal(&c->cond_wakeup_background);
368
pthread_mutex_unlock(&c->mutex);
369
370
return ret;
371
}
372
373
static int async_read(URLContext *h, unsigned char *buf, int size)
374
{
375
return async_read_internal(h, buf, size, 0, NULL);
376
}
377
378
static void fifo_do_not_copy_func(void* dest, void* src, int size) {
379
// do not copy
380
}
381
382
static int64_t async_seek(URLContext *h, int64_t pos, int whence)
383
{
384
Context *c = h->priv_data;
385
RingBuffer *ring = &c->ring;
386
int64_t ret;
387
int64_t new_logical_pos;
388
int fifo_size;
389
int fifo_size_of_read_back;
390
391
if (whence == AVSEEK_SIZE) {
392
av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size);
393
return c->logical_size;
394
} else if (whence == SEEK_CUR) {
395
av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
396
new_logical_pos = pos + c->logical_pos;
397
} else if (whence == SEEK_SET){
398
av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
399
new_logical_pos = pos;
400
} else {
401
return AVERROR(EINVAL);
402
}
403
if (new_logical_pos < 0)
404
return AVERROR(EINVAL);
405
406
fifo_size = ring_size(ring);
407
fifo_size_of_read_back = ring_size_of_read_back(ring);
408
if (new_logical_pos == c->logical_pos) {
409
/* current position */
410
return c->logical_pos;
411
} else if ((new_logical_pos >= (c->logical_pos - fifo_size_of_read_back)) &&
412
(new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) {
413
int pos_delta = (int)(new_logical_pos - c->logical_pos);
414
/* fast seek */
415
av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n",
416
new_logical_pos, (int)c->logical_pos,
417
(int)(new_logical_pos - c->logical_pos), fifo_size);
418
419
if (pos_delta > 0) {
420
// fast seek forwards
421
async_read_internal(h, NULL, pos_delta, 1, fifo_do_not_copy_func);
422
} else {
423
// fast seek backwards
424
ring_drain(ring, pos_delta);
425
c->logical_pos = new_logical_pos;
426
}
427
428
return c->logical_pos;
429
} else if (c->logical_size <= 0) {
430
/* can not seek */
431
return AVERROR(EINVAL);
432
} else if (new_logical_pos > c->logical_size) {
433
/* beyond end */
434
return AVERROR(EINVAL);
435
}
436
437
pthread_mutex_lock(&c->mutex);
438
439
c->seek_request = 1;
440
c->seek_pos = new_logical_pos;
441
c->seek_whence = SEEK_SET;
442
c->seek_completed = 0;
443
c->seek_ret = 0;
444
445
while (1) {
446
if (async_check_interrupt(h)) {
447
ret = AVERROR_EXIT;
448
break;
449
}
450
if (c->seek_completed) {
451
if (c->seek_ret >= 0)
452
c->logical_pos = c->seek_ret;
453
ret = c->seek_ret;
454
break;
455
}
456
pthread_cond_signal(&c->cond_wakeup_background);
457
pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
458
}
459
460
pthread_mutex_unlock(&c->mutex);
461
462
return ret;
463
}
464
465
#define OFFSET(x) offsetof(Context, x)
466
#define D AV_OPT_FLAG_DECODING_PARAM
467
468
static const AVOption options[] = {
469
{NULL},
470
};
471
472
#undef D
473
#undef OFFSET
474
475
static const AVClass async_context_class = {
476
.class_name = "Async",
477
.item_name = av_default_item_name,
478
.option = options,
479
.version = LIBAVUTIL_VERSION_INT,
480
};
481
482
URLProtocol ff_async_protocol = {
483
.name = "async",
484
.url_open2 = async_open,
485
.url_read = async_read,
486
.url_seek = async_seek,
487
.url_close = async_close,
488
.priv_data_size = sizeof(Context),
489
.priv_data_class = &async_context_class,
490
};
491
492
#ifdef TEST
493
494
#define TEST_SEEK_POS (1536)
495
#define TEST_STREAM_SIZE (2048)
496
497
typedef struct TestContext {
498
AVClass *class;
499
int64_t logical_pos;
500
int64_t logical_size;
501
502
/* options */
503
int opt_read_error;
504
} TestContext;
505
506
static int async_test_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
507
{
508
TestContext *c = h->priv_data;
509
c->logical_pos = 0;
510
c->logical_size = TEST_STREAM_SIZE;
511
return 0;
512
}
513
514
static int async_test_close(URLContext *h)
515
{
516
return 0;
517
}
518
519
static int async_test_read(URLContext *h, unsigned char *buf, int size)
520
{
521
TestContext *c = h->priv_data;
522
int i;
523
int read_len = 0;
524
525
if (c->opt_read_error)
526
return c->opt_read_error;
527
528
if (c->logical_pos >= c->logical_size)
529
return AVERROR_EOF;
530
531
for (i = 0; i < size; ++i) {
532
buf[i] = c->logical_pos & 0xFF;
533
534
c->logical_pos++;
535
read_len++;
536
537
if (c->logical_pos >= c->logical_size)
538
break;
539
}
540
541
return read_len;
542
}
543
544
static int64_t async_test_seek(URLContext *h, int64_t pos, int whence)
545
{
546
TestContext *c = h->priv_data;
547
int64_t new_logical_pos;
548
549
if (whence == AVSEEK_SIZE) {
550
return c->logical_size;
551
} else if (whence == SEEK_CUR) {
552
new_logical_pos = pos + c->logical_pos;
553
} else if (whence == SEEK_SET){
554
new_logical_pos = pos;
555
} else {
556
return AVERROR(EINVAL);
557
}
558
if (new_logical_pos < 0)
559
return AVERROR(EINVAL);
560
561
c->logical_pos = new_logical_pos;
562
return new_logical_pos;
563
}
564
565
#define OFFSET(x) offsetof(TestContext, x)
566
#define D AV_OPT_FLAG_DECODING_PARAM
567
568
static const AVOption async_test_options[] = {
569
{ "async-test-read-error", "cause read fail",
570
OFFSET(opt_read_error), AV_OPT_TYPE_INT, { .i64 = 0 }, INT_MIN, INT_MAX, .flags = D },
571
{NULL},
572
};
573
574
#undef D
575
#undef OFFSET
576
577
static const AVClass async_test_context_class = {
578
.class_name = "Async-Test",
579
.item_name = av_default_item_name,
580
.option = async_test_options,
581
.version = LIBAVUTIL_VERSION_INT,
582
};
583
584
URLProtocol ff_async_test_protocol = {
585
.name = "async-test",
586
.url_open2 = async_test_open,
587
.url_read = async_test_read,
588
.url_seek = async_test_seek,
589
.url_close = async_test_close,
590
.priv_data_size = sizeof(TestContext),
591
.priv_data_class = &async_test_context_class,
592
};
593
594
int main(void)
595
{
596
URLContext *h = NULL;
597
int i;
598
int ret;
599
int64_t size;
600
int64_t pos;
601
int64_t read_len;
602
unsigned char buf[4096];
603
AVDictionary *opts = NULL;
604
605
ffurl_register_protocol(&ff_async_protocol);
606
ffurl_register_protocol(&ff_async_test_protocol);
607
608
/*
609
* test normal read
610
*/
611
ret = ffurl_open(&h, "async:async-test:", AVIO_FLAG_READ, NULL, NULL);
612
printf("open: %d\n", ret);
613
614
size = ffurl_size(h);
615
printf("size: %"PRId64"\n", size);
616
617
pos = ffurl_seek(h, 0, SEEK_CUR);
618
read_len = 0;
619
while (1) {
620
ret = ffurl_read(h, buf, sizeof(buf));
621
if (ret == AVERROR_EOF) {
622
printf("read-error: AVERROR_EOF at %"PRId64"\n", ffurl_seek(h, 0, SEEK_CUR));
623
break;
624
}
625
else if (ret == 0)
626
break;
627
else if (ret < 0) {
628
printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
629
goto fail;
630
} else {
631
for (i = 0; i < ret; ++i) {
632
if (buf[i] != (pos & 0xFF)) {
633
printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
634
(int)buf[i], (int)(pos & 0xFF), pos);
635
break;
636
}
637
pos++;
638
}
639
}
640
641
read_len += ret;
642
}
643
printf("read: %"PRId64"\n", read_len);
644
645
/*
646
* test normal seek
647
*/
648
ret = ffurl_read(h, buf, 1);
649
printf("read: %d\n", ret);
650
651
pos = ffurl_seek(h, TEST_SEEK_POS, SEEK_SET);
652
printf("seek: %"PRId64"\n", pos);
653
654
read_len = 0;
655
while (1) {
656
ret = ffurl_read(h, buf, sizeof(buf));
657
if (ret == AVERROR_EOF)
658
break;
659
else if (ret == 0)
660
break;
661
else if (ret < 0) {
662
printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
663
goto fail;
664
} else {
665
for (i = 0; i < ret; ++i) {
666
if (buf[i] != (pos & 0xFF)) {
667
printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
668
(int)buf[i], (int)(pos & 0xFF), pos);
669
break;
670
}
671
pos++;
672
}
673
}
674
675
read_len += ret;
676
}
677
printf("read: %"PRId64"\n", read_len);
678
679
ret = ffurl_read(h, buf, 1);
680
printf("read: %d\n", ret);
681
682
/*
683
* test read error
684
*/
685
ffurl_close(h);
686
av_dict_set_int(&opts, "async-test-read-error", -10000, 0);
687
ret = ffurl_open(&h, "async:async-test:", AVIO_FLAG_READ, NULL, &opts);
688
printf("open: %d\n", ret);
689
690
ret = ffurl_read(h, buf, 1);
691
printf("read: %d\n", ret);
692
693
fail:
694
av_dict_free(&opts);
695
ffurl_close(h);
696
return 0;
697
}
698
699
#endif
700
701