Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
52868 views
1
/*
2
* Permission is hereby granted, free of charge, to any person obtaining a copy
3
* of this software and associated documentation files (the "Software"), to deal
4
* in the Software without restriction, including without limitation the rights
5
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
6
* copies of the Software, and to permit persons to whom the Software is
7
* furnished to do so, subject to the following conditions:
8
*
9
* The above copyright notice and this permission notice shall be included in
10
* all copies or substantial portions of the Software.
11
*
12
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
13
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
14
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
15
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
16
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
17
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
18
* THE SOFTWARE.
19
*/
20
21
/**
22
* Thread message API test
23
*/
24
25
#include "libavutil/avassert.h"
26
#include "libavutil/avstring.h"
27
#include "libavutil/frame.h"
28
#include "libavutil/threadmessage.h"
29
#include "libavutil/thread.h" // not public
30
31
struct sender_data {
32
int id;
33
pthread_t tid;
34
int workload;
35
AVThreadMessageQueue *queue;
36
};
37
38
/* same as sender_data but shuffled for testing purpose */
39
struct receiver_data {
40
pthread_t tid;
41
int workload;
42
int id;
43
AVThreadMessageQueue *queue;
44
};
45
46
struct message {
47
AVFrame *frame;
48
// we add some junk in the message to make sure the message size is >
49
// sizeof(void*)
50
int magic;
51
};
52
53
#define MAGIC 0xdeadc0de
54
55
static void free_frame(void *arg)
56
{
57
struct message *msg = arg;
58
av_assert0(msg->magic == MAGIC);
59
av_frame_free(&msg->frame);
60
}
61
62
static void *sender_thread(void *arg)
63
{
64
int i, ret = 0;
65
struct sender_data *wd = arg;
66
67
av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
68
for (i = 0; i < wd->workload; i++) {
69
if (rand() % wd->workload < wd->workload / 10) {
70
av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
71
av_thread_message_flush(wd->queue);
72
} else {
73
char *val;
74
AVDictionary *meta = NULL;
75
struct message msg = {
76
.magic = MAGIC,
77
.frame = av_frame_alloc(),
78
};
79
80
if (!msg.frame) {
81
ret = AVERROR(ENOMEM);
82
break;
83
}
84
85
/* we add some metadata to identify the frames */
86
val = av_asprintf("frame %d/%d from sender %d",
87
i + 1, wd->workload, wd->id);
88
if (!val) {
89
av_frame_free(&msg.frame);
90
ret = AVERROR(ENOMEM);
91
break;
92
}
93
ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
94
if (ret < 0) {
95
av_frame_free(&msg.frame);
96
break;
97
}
98
av_frame_set_metadata(msg.frame, meta);
99
100
/* allocate a real frame in order to simulate "real" work */
101
msg.frame->format = AV_PIX_FMT_RGBA;
102
msg.frame->width = 320;
103
msg.frame->height = 240;
104
ret = av_frame_get_buffer(msg.frame, 32);
105
if (ret < 0) {
106
av_frame_free(&msg.frame);
107
break;
108
}
109
110
/* push the frame in the common queue */
111
av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
112
wd->id, i + 1, wd->workload, msg.frame);
113
ret = av_thread_message_queue_send(wd->queue, &msg, 0);
114
if (ret < 0) {
115
av_frame_free(&msg.frame);
116
break;
117
}
118
}
119
}
120
av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
121
wd->id, av_err2str(ret));
122
av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF);
123
return NULL;
124
}
125
126
static void *receiver_thread(void *arg)
127
{
128
int i, ret = 0;
129
struct receiver_data *rd = arg;
130
131
for (i = 0; i < rd->workload; i++) {
132
if (rand() % rd->workload < rd->workload / 10) {
133
av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue\n", rd->id);
134
av_thread_message_flush(rd->queue);
135
} else {
136
struct message msg;
137
AVDictionary *meta;
138
AVDictionaryEntry *e;
139
140
ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
141
if (ret < 0)
142
break;
143
av_assert0(msg.magic == MAGIC);
144
meta = av_frame_get_metadata(msg.frame);
145
e = av_dict_get(meta, "sig", NULL, 0);
146
av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
147
av_frame_free(&msg.frame);
148
}
149
}
150
151
av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
152
av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF);
153
154
return NULL;
155
}
156
157
static int get_workload(int minv, int maxv)
158
{
159
return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
160
}
161
162
int main(int ac, char **av)
163
{
164
int i, ret = 0;
165
int max_queue_size;
166
int nb_senders, sender_min_load, sender_max_load;
167
int nb_receivers, receiver_min_load, receiver_max_load;
168
struct sender_data *senders;
169
struct receiver_data *receivers;
170
AVThreadMessageQueue *queue = NULL;
171
172
if (ac != 8) {
173
av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
174
"<nb_senders> <sender_min_send> <sender_max_send> "
175
"<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);
176
return 1;
177
}
178
179
max_queue_size = atoi(av[1]);
180
nb_senders = atoi(av[2]);
181
sender_min_load = atoi(av[3]);
182
sender_max_load = atoi(av[4]);
183
nb_receivers = atoi(av[5]);
184
receiver_min_load = atoi(av[6]);
185
receiver_max_load = atoi(av[7]);
186
187
if (max_queue_size <= 0 ||
188
nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||
189
nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {
190
av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
191
return 1;
192
}
193
194
av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / "
195
"%d receivers receiving [%d-%d]\n", max_queue_size,
196
nb_senders, sender_min_load, sender_max_load,
197
nb_receivers, receiver_min_load, receiver_max_load);
198
199
senders = av_mallocz_array(nb_senders, sizeof(*senders));
200
receivers = av_mallocz_array(nb_receivers, sizeof(*receivers));
201
if (!senders || !receivers) {
202
ret = AVERROR(ENOMEM);
203
goto end;
204
}
205
206
ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
207
if (ret < 0)
208
goto end;
209
210
av_thread_message_queue_set_free_func(queue, free_frame);
211
212
#define SPAWN_THREADS(type) do { \
213
for (i = 0; i < nb_##type##s; i++) { \
214
struct type##_data *td = &type##s[i]; \
215
\
216
td->id = i; \
217
td->queue = queue; \
218
td->workload = get_workload(type##_min_load, type##_max_load); \
219
\
220
ret = pthread_create(&td->tid, NULL, type##_thread, td); \
221
if (ret) { \
222
const int err = AVERROR(ret); \
223
av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \
224
" thread: %s\n", av_err2str(err)); \
225
goto end; \
226
} \
227
} \
228
} while (0)
229
230
#define WAIT_THREADS(type) do { \
231
for (i = 0; i < nb_##type##s; i++) { \
232
struct type##_data *td = &type##s[i]; \
233
\
234
ret = pthread_join(td->tid, NULL); \
235
if (ret) { \
236
const int err = AVERROR(ret); \
237
av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \
238
" thread: %s\n", av_err2str(err)); \
239
goto end; \
240
} \
241
} \
242
} while (0)
243
244
SPAWN_THREADS(receiver);
245
SPAWN_THREADS(sender);
246
247
WAIT_THREADS(sender);
248
WAIT_THREADS(receiver);
249
250
end:
251
av_thread_message_queue_free(&queue);
252
av_freep(&senders);
253
av_freep(&receivers);
254
255
if (ret < 0 && ret != AVERROR_EOF) {
256
av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
257
return 1;
258
}
259
return 0;
260
}
261
262