Ansel 0.0
A darktable fork - bloat + design vision
Loading...
Searching...
No Matches
jobs.c
Go to the documentation of this file.
1/*
2 This file is part of darktable,
3 Copyright (C) 2009-2010 johannes hanika.
4 Copyright (C) 2010 Alexandre Prokoudine.
5 Copyright (C) 2010 Henrik Andersson.
6 Copyright (C) 2014-2017 Tobias Ellinghaus.
7 Copyright (C) 2015-2017 Roman Lebedev.
8 Copyright (C) 2020-2021 Hanno Schwalm.
9 Copyright (C) 2020 Pascal Obry.
10 Copyright (C) 2022, 2025-2026 Aurélien PIERRE.
11 Copyright (C) 2022 Martin Bařinka.
12
13 darktable is free software: you can redistribute it and/or modify
14 it under the terms of the GNU General Public License as published by
15 the Free Software Foundation, either version 3 of the License, or
16 (at your option) any later version.
17
18 darktable is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 GNU General Public License for more details.
22
23 You should have received a copy of the GNU General Public License
24 along with darktable. If not, see <http://www.gnu.org/licenses/>.
25*/
26
27#include "common/darktable.h"
28#include "control/jobs.h"
29#include "control/control.h"
30
31#define DT_CONTROL_FG_PRIORITY 4
32
33// The thumbtable allows at most 840 thumbs at once.
34// Once an order to recompute has been dispatched, thumbnails are not drawn
35// until it finishes and we get the final buffer.
36// If jobs are flushed from the queue before completion,
37// those thumbnails will never be redrawn.
38#define DT_CONTROL_MAX_JOBS 840
39
40/* the queue can have scheduled jobs but all
41 the workers are sleeping, so this kicks the workers
42 on timed interval.
43*/
49
71
78static inline int dt_control_job_equal(_dt_job_t *j1, _dt_job_t *j2)
79{
80 if(IS_NULL_PTR(j1) || IS_NULL_PTR(j2)) return 0;
81 if(j1->params_size != 0 && j1->params_size == j2->params_size)
82 return (j1->execute == j2->execute && j1->state_changed_cb == j2->state_changed_cb
83 && j1->queue == j2->queue && (memcmp(j1->params, j2->params, j1->params_size) == 0));
84 return (j1->execute == j2->execute && j1->state_changed_cb == j2->state_changed_cb && j1->queue == j2->queue
85 && (g_strcmp0(j1->description, j2->description) == 0));
86}
87
89{
90 if(IS_NULL_PTR(job)) return;
93 {
95 job->progress = NULL;
96 }
97 job->state = state;
98 /* pass state change to callback */
99 if(job->state_changed_cb) job->state_changed_cb(job, state);
101}
102
111
113{
115 job->params = params;
116 job->params_size = 0;
117 job->params_destroy = callback;
118}
119
122{
124 job->params = params;
126 job->params_destroy = callback;
127}
128
130{
131 if(IS_NULL_PTR(job)) return NULL;
132 return job->params;
133}
134
136{
137 _dt_job_t *job = (_dt_job_t *)calloc(1, sizeof(_dt_job_t));
138 if(IS_NULL_PTR(job)) return NULL;
139
140 va_list ap;
141 va_start(ap, msg);
142 vsnprintf(job->description, DT_CONTROL_DESCRIPTION_LEN, msg, ap);
143 va_end(ap);
144
145 job->execute = execute;
147
150 return job;
151}
152
164
166{
167 // once the job got added to the queue it may not be changed from the outside
169 return; // get_state returns DISPOSED when IS_NULL_PTR(job)
170 job->state_changed_cb = cb;
171}
172
173
175{
176 if(IS_NULL_PTR(job)) return;
177 dt_print(DT_DEBUG_CONTROL, "%s | queue: %d | priority: %d", job->description, job->queue, job->priority);
178}
179
184
186{
187 if(IS_NULL_PTR(job)) return;
189
190 // NOTE: could also use signals.
191
192 /* if job execution is not finished let's wait for it */
194 {
195 // once the job finishes, it unlocks the mutex
196 // so by locking the mutex here, we will only get the lock once the job
197 // has finished and unlocked it.
199 // yay, the job finished, we got the lock. nothing more to do.
201 }
202}
203
204static int32_t dt_control_run_job_res(dt_control_t *control, int32_t res)
205{
206 if(((unsigned int)res) >= DT_CTL_WORKER_RESERVED) return -1;
207
208 _dt_job_t *job = NULL;
210 if(control->new_res[res])
211 {
212 job = control->job_res[res];
213 control->job_res[res] = NULL; // this job belongs to us now, the queue may not touch it any longer
214 }
215 control->new_res[res] = 0;
217 if(IS_NULL_PTR(job)) return -1;
218
219 /* change state to running */
222 {
223 dt_print(DT_DEBUG_CONTROL, "[run_job+] %02d %f ", res, dt_get_wtime());
226
227 /* execute job */
229 job->result = job->execute(job);
230
232 dt_print(DT_DEBUG_CONTROL, "[run_job-] %02d %f ", res, dt_get_wtime());
235 }
238 return 0;
239}
240
242{
243 /*
244 * job scheduling works like this:
245 * - when there is a single job in the queue head with a maximal priority -> pick it
246 * - otherwise pick among the ones with the maximal priority in the following order:
247 * * user foreground
248 * * system foreground
249 * * user background
250 * * system background
251 * - the jobs that didn't get picked this round get their priority incremented
252 */
253
255
256 // find the job
257 _dt_job_t *job = NULL;
258 int winner_queue = DT_JOB_QUEUE_MAX;
259 int max_priority = -1;
260 for(int i = 0; i < DT_JOB_QUEUE_MAX; i++)
261 {
262 if(control->queues[i] == NULL) continue;
263 if(control->export_scheduled && i == DT_JOB_QUEUE_USER_EXPORT) continue;
264 _dt_job_t *_job = (_dt_job_t *)control->queues[i]->data;
265 if(_job->priority > max_priority)
266 {
267 max_priority = _job->priority;
268 job = _job;
269 winner_queue = i;
270 }
271 }
272
273 if(IS_NULL_PTR(job))
274 {
276 return NULL;
277 }
278
279 // the order of the queues in control->queues matches our priority, and we only update job when the priority
280 // is strictly bigger
281 // invariant -> job is the one we are looking for
282
283 // remove the to be scheduled job from its queue
284 GList **queue = &control->queues[winner_queue];
285 *queue = g_list_delete_link(*queue, *queue);
286 control->queue_length[winner_queue]--;
287 if(winner_queue == DT_JOB_QUEUE_USER_EXPORT) control->export_scheduled = TRUE;
288
289 // and place it in scheduled job array (for job deduping)
290 control->job[dt_control_get_threadid()] = job;
291
292 // increment the priorities of the others
293 for(int i = 0; i < DT_JOB_QUEUE_MAX; i++)
294 {
295 if(i == winner_queue || control->queues[i] == NULL) continue;
296 ((_dt_job_t *)control->queues[i]->data)->priority++;
297 }
298
300
301 return job;
302}
303
305{
307 dt_get_wtime());
310
312
313 /* execute job */
314 job->result = job->execute(job);
315
317
319 dt_get_wtime());
322}
323
324static int32_t dt_control_run_job(dt_control_t *control)
325{
326 _dt_job_t *job = dt_control_schedule_job(control);
327
328 if(IS_NULL_PTR(job)) return -1;
329
330 /* change state to running */
334
336
337 // remove the job from scheduled job array (for job deduping)
339 control->job[dt_control_get_threadid()] = NULL;
342
343 // and free it
345
346 return 0;
347}
348
349int32_t dt_control_add_job_res(dt_control_t *control, _dt_job_t *job, int32_t res)
350{
351 if(((unsigned int)res) >= DT_CTL_WORKER_RESERVED || IS_NULL_PTR(job))
352 {
354 return 1;
355 }
356
357 // TODO: pthread cancel and restart in tough cases?
359
360 // if there is a job in the queue we have to discard that first
361 if(control->job_res[res])
362 {
364 dt_control_job_dispose(control->job_res[res]);
365 }
366
367 dt_print(DT_DEBUG_CONTROL, "[add_job_res] %d | ", res);
370
372 control->job_res[res] = job;
373 control->new_res[res] = 1;
374
376
378 pthread_cond_broadcast(&control->cond);
380
381 return 0;
382}
383
385{
387
388 int count = 0;
389
390 for(int k = 0; k < control->num_threads; k++)
391 {
392 _dt_job_t *job = (_dt_job_t *)control->job[k];
393 if(!IS_NULL_PTR(job))
394 {
396 count++;
397 }
398 }
399
400 dt_print(DT_DEBUG_CONTROL, "[jobs] flushed %i pending jobs from queue %i\n", count, queue_id);
401
403}
404
406{
407 if(((unsigned int)queue_id) >= DT_JOB_QUEUE_MAX || IS_NULL_PTR(job))
408 {
410 return 1;
411 }
412
413 if(!control->running)
414 {
415 // whatever we are adding here won't be scheduled as the system isn't running. execute it synchronous instead.
416 dt_pthread_mutex_lock(&job->wait_mutex); // is that even needed?
419
421 return 0;
422 }
423
424 job->queue = queue_id;
425
426 _dt_job_t *job_for_disposal = NULL;
427
429
430 GList **queue = &control->queues[queue_id];
431 size_t length = control->queue_length[queue_id];
432
433 dt_print(DT_DEBUG_CONTROL, "[add_job] %" G_GSIZE_FORMAT " | ", length);
436
437 if(queue_id == DT_JOB_QUEUE_SYSTEM_FG)
438 {
439 // this is a stack with limited size and bubble up and all that stuff
441
442 // check if we have already scheduled the job
443 for(int k = 0; k < control->num_threads; k++)
444 {
445 _dt_job_t *other_job = (_dt_job_t *)control->job[k];
446 if(dt_control_job_equal(job, other_job))
447 {
448 dt_print(DT_DEBUG_CONTROL, "[add_job] found job already in scheduled: ");
449 dt_control_job_print(other_job);
451
453
456
457 return 0; // there can't be any further copy
458 }
459 }
460
461 // if the job is already in the queue -> move it to the top
462 for(GList *iter = *queue; iter; iter = g_list_next(iter))
463 {
464 _dt_job_t *other_job = (_dt_job_t *)iter->data;
465 if(dt_control_job_equal(job, other_job))
466 {
467 dt_print(DT_DEBUG_CONTROL, "[add_job] found job already in queue: ");
468 dt_control_job_print(other_job);
470
471 *queue = g_list_delete_link(*queue, iter);
472 length--;
473
474 job_for_disposal = job;
475
476 job = other_job;
477 break; // there can't be any further copy in the list
478 }
479 }
480
481 // now we can add the new job to the list
482 *queue = g_list_prepend(*queue, job);
483 length++;
484
485 // and take care of the maximal queue size
486 if(length > DT_CONTROL_MAX_JOBS)
487 {
488 GList *last = g_list_last(*queue);
490 dt_control_job_dispose((_dt_job_t *)last->data);
491 *queue = g_list_delete_link(*queue, last);
492 length--;
493 }
494
495 control->queue_length[queue_id] = length;
496 }
497 else
498 {
499 // the rest are FIFOs
500 if(queue_id == DT_JOB_QUEUE_USER_BG ||
501 queue_id == DT_JOB_QUEUE_USER_EXPORT ||
502 queue_id == DT_JOB_QUEUE_SYSTEM_BG)
503 job->priority = 0;
504 else
506 *queue = g_list_append(*queue, job);
507 control->queue_length[queue_id]++;
508 }
511
512 // notify workers
514 pthread_cond_broadcast(&control->cond);
516
517 // dispose of dropped job, if any
519 dt_control_job_dispose(job_for_disposal);
520
521 return 0;
522}
523
524static __thread int threadid = -1;
525
527{
528 if(threadid > -1) return threadid;
530}
531
533{
534 if(threadid > -1) return threadid;
536}
537
538static void *dt_control_work_res(void *ptr)
539{
540#ifdef _OPENMP // need to do this in every thread
541 omp_set_num_threads(darktable.num_openmp_threads);
542#endif
544 dt_control_t *s = params->self;
545 threadid = params->threadid;
546 char name[16] = {0};
547 snprintf(name, sizeof(name), "worker res %d", threadid);
549 dt_free(params);
550 int32_t threadid_res = dt_control_get_threadid_res();
551 while(dt_control_running())
552 {
553 // dt_print(DT_DEBUG_CONTROL, "[control_work] %d\n", threadid_res);
554 if(dt_control_run_job_res(s, threadid_res) < 0)
555 {
556 // wait for a new job.
557 int old;
558 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old);
562 int tmp;
563 pthread_setcancelstate(old, &tmp);
564 }
565 }
566 return NULL;
567}
568
569static void *dt_control_worker_kicker(void *ptr)
570{
571 dt_control_t *control = (dt_control_t *)ptr;
572 dt_pthread_setname("kicker");
573 while(dt_control_running())
574 {
575 sleep(2);
577 pthread_cond_broadcast(&control->cond);
579 }
580 return NULL;
581}
582
583static void *dt_control_work(void *ptr)
584{
585#ifdef _OPENMP // need to do this in every thread
586 omp_set_num_threads(darktable.num_openmp_threads);
587#endif
589 dt_control_t *control = params->self;
590 threadid = params->threadid;
591 char name[16] = {0};
592 snprintf(name, sizeof(name), "worker %d", threadid);
594 dt_free(params);
595 // int32_t threadid = dt_control_get_threadid();
596 while(dt_control_running())
597 {
598 // dt_print(DT_DEBUG_CONTROL, "[control_work] %d\n", threadid);
599 if(dt_control_run_job(control) < 0)
600 {
601 // wait for a new job.
603 dt_pthread_cond_wait(&control->cond, &control->cond_mutex);
605 }
606 }
607 return NULL;
608}
609
610// convenience functions to have a progress bar for the job.
611// this allows to show the gui indicator of the job even before it got scheduled
612void dt_control_job_add_progress(dt_job_t *job, const char *message, gboolean cancellable)
613{
614 if(IS_NULL_PTR(job)) return;
616 if(cancellable)
618}
619
620void dt_control_job_set_progress_message(dt_job_t *job, const char *message)
621{
622 if(IS_NULL_PTR(job) || !job->progress) return;
624}
625
631
633{
634 if(IS_NULL_PTR(job) || !job->progress) return -1.0;
636}
637
638
639// moved out of control.c to be able to make some helper functions static
641{
642 // start threads
643 control->num_threads = dt_worker_threads();
644 control->thread = (pthread_t *)calloc(control->num_threads, sizeof(pthread_t));
645 control->job = (dt_job_t **)calloc(control->num_threads, sizeof(dt_job_t *));
647 control->running = 1;
649 for(int k = 0; k < control->num_threads; k++)
650 {
653 params->self = control;
654 params->threadid = k;
655 dt_pthread_create(&control->thread[k], dt_control_work, params, FALSE);
656 }
657
658 /* create queue kicker thread */
660
661 for(int k = 0; k < DT_CTL_WORKER_RESERVED; k++)
662 {
663 control->job_res[k] = NULL;
664 control->new_res[k] = 0;
667 params->self = control;
668 params->threadid = k;
670 }
671}
672
674{
675 // Cancel all non-user-export jobs remaining
676 for(int i = 0; i < DT_JOB_QUEUE_MAX; i++)
677 {
678 if(control->queues[i] == NULL) continue;
679 if(control->export_scheduled && i == DT_JOB_QUEUE_USER_EXPORT) continue;
680 _dt_job_t *job = (_dt_job_t *)control->queues[i]->data;
682 }
683
684 dt_free(control->job);
685 dt_free(control->thread);
686}
687
688// clang-format off
689// modelines: These editor modelines have been set for all relevant files by tools/update_modelines.py
690// vim: shiftwidth=2 expandtab tabstop=2 cindent
691// kate: tab-indents: off; indent-width 2; replace-tabs on; indent-mode cstyle; remove-trailing-spaces modified;
692// clang-format on
#define TRUE
Definition ashift_lsd.c:162
#define FALSE
Definition ashift_lsd.c:158
size_t params_size(dt_imageio_module_format_t *self)
Definition avif.c:565
char * name
int dt_control_running()
Definition control.c:423
int dt_worker_threads()
Definition darktable.c:1677
darktable_t darktable
Definition darktable.c:181
void dt_print(dt_debug_thread_t thread, const char *msg,...)
Definition darktable.c:1542
@ DT_DEBUG_CONTROL
Definition darktable.h:716
#define dt_free(ptr)
Definition darktable.h:456
static const dt_aligned_pixel_simd_t value
Definition darktable.h:577
static double dt_get_wtime(void)
Definition darktable.h:914
#define IS_NULL_PTR(p)
C is way too permissive with !=, == and if(var) checks, which can mean too many things depending on w...
Definition darktable.h:281
int dt_pthread_create(pthread_t *thread, void *(*start_routine)(void *), void *arg, const gboolean realtime)
Definition dtpthread.c:45
void dt_pthread_setname(const char *name)
Definition dtpthread.c:118
static int dt_pthread_mutex_unlock(dt_pthread_mutex_t *mutex) RELEASE(mutex) NO_THREAD_SAFETY_ANALYSIS
Definition dtpthread.h:374
static int dt_pthread_mutex_init(dt_pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr)
Definition dtpthread.h:359
static int dt_pthread_mutex_destroy(dt_pthread_mutex_t *mutex)
Definition dtpthread.h:379
static int dt_pthread_cond_wait(pthread_cond_t *cond, dt_pthread_mutex_t *mutex)
Definition dtpthread.h:384
static int dt_pthread_mutex_lock(dt_pthread_mutex_t *mutex) ACQUIRE(mutex) NO_THREAD_SAFETY_ANALYSIS
Definition dtpthread.h:364
static void dt_control_job_execute(_dt_job_t *job)
Definition jobs.c:304
void dt_control_flush_jobs_queue(dt_control_t *control, dt_job_queue_t queue_id)
Definition jobs.c:384
static _dt_job_t * dt_control_schedule_job(dt_control_t *control)
Definition jobs.c:241
static int32_t dt_control_run_job(dt_control_t *control)
Definition jobs.c:324
static int32_t dt_control_run_job_res(dt_control_t *control, int32_t res)
Definition jobs.c:204
static void dt_control_job_set_state(_dt_job_t *job, dt_job_state_t state)
Definition jobs.c:88
dt_job_state_t dt_control_job_get_state(_dt_job_t *job)
Definition jobs.c:103
void dt_control_job_cancel(_dt_job_t *job)
Definition jobs.c:180
static void * dt_control_work(void *ptr)
Definition jobs.c:583
dt_job_t * dt_control_job_create(dt_job_execute_callback execute, const char *msg,...)
Definition jobs.c:135
int dt_control_add_job(dt_control_t *control, dt_job_queue_t queue_id, _dt_job_t *job)
Definition jobs.c:405
int32_t dt_control_add_job_res(dt_control_t *control, _dt_job_t *job, int32_t res)
Definition jobs.c:349
void * dt_control_job_get_params(const _dt_job_t *job)
Definition jobs.c:129
double dt_control_job_get_progress(dt_job_t *job)
Definition jobs.c:632
static void dt_control_job_print(_dt_job_t *job)
Definition jobs.c:174
void dt_control_job_set_progress(dt_job_t *job, double value)
Definition jobs.c:626
void dt_control_jobs_cleanup(dt_control_t *control)
Definition jobs.c:673
void dt_control_job_add_progress(dt_job_t *job, const char *message, gboolean cancellable)
Definition jobs.c:612
void dt_control_job_set_progress_message(dt_job_t *job, const char *message)
Definition jobs.c:620
#define DT_CONTROL_FG_PRIORITY
Definition jobs.c:31
void dt_control_job_set_params_with_size(dt_job_t *job, void *params, size_t params_size, dt_job_destroy_callback callback)
Definition jobs.c:120
void dt_control_job_wait(_dt_job_t *job)
Definition jobs.c:185
void dt_control_job_set_state_callback(_dt_job_t *job, dt_job_state_change_callback cb)
Definition jobs.c:165
int32_t dt_control_get_threadid()
Definition jobs.c:526
static void * dt_control_work_res(void *ptr)
Definition jobs.c:538
void dt_control_jobs_init(dt_control_t *control)
Definition jobs.c:640
void dt_control_job_set_params(_dt_job_t *job, void *params, dt_job_destroy_callback callback)
Definition jobs.c:112
static void * dt_control_worker_kicker(void *ptr)
Definition jobs.c:569
static int dt_control_job_equal(_dt_job_t *j1, _dt_job_t *j2)
Definition jobs.c:78
#define DT_CONTROL_MAX_JOBS
Definition jobs.c:38
static __thread int threadid
Definition jobs.c:524
void dt_control_job_dispose(_dt_job_t *job)
Definition jobs.c:153
static int32_t dt_control_get_threadid_res()
Definition jobs.c:532
void(* dt_job_state_change_callback)(dt_job_t *, dt_job_state_t state)
Definition jobs.h:64
#define DT_CONTROL_DESCRIPTION_LEN
Definition jobs.h:35
dt_job_queue_t
Definition jobs.h:52
@ DT_JOB_QUEUE_MAX
Definition jobs.h:58
@ DT_JOB_QUEUE_USER_BG
Definition jobs.h:55
@ DT_JOB_QUEUE_USER_EXPORT
Definition jobs.h:56
@ DT_JOB_QUEUE_SYSTEM_BG
Definition jobs.h:57
@ DT_JOB_QUEUE_SYSTEM_FG
Definition jobs.h:54
int32_t(* dt_job_execute_callback)(dt_job_t *)
Definition jobs.h:63
void(* dt_job_destroy_callback)(void *data)
Definition jobs.h:65
#define DT_CTL_WORKER_RESERVED
Definition jobs.h:37
dt_job_state_t
Definition jobs.h:41
@ DT_JOB_STATE_DISCARDED
Definition jobs.h:47
@ DT_JOB_STATE_INITIALIZED
Definition jobs.h:42
@ DT_JOB_STATE_DISPOSED
Definition jobs.h:48
@ DT_JOB_STATE_RUNNING
Definition jobs.h:44
@ DT_JOB_STATE_CANCELLED
Definition jobs.h:46
@ DT_JOB_STATE_FINISHED
Definition jobs.h:45
@ DT_JOB_STATE_QUEUED
Definition jobs.h:43
float *const restrict const size_t k
void dt_control_progress_set_progress(dt_control_t *control, dt_progress_t *progress, double value)
Definition progress.c:355
void dt_control_progress_set_message(dt_control_t *control, dt_progress_t *progress, const char *message)
Definition progress.c:389
double dt_control_progress_get_progress(dt_progress_t *progress)
Definition progress.c:373
dt_progress_t * dt_control_progress_create(dt_control_t *control, gboolean has_progress_bar, const gchar *message)
Definition progress.c:262
void dt_control_progress_destroy(dt_control_t *control, dt_progress_t *progress)
Definition progress.c:290
void dt_control_progress_attach_job(dt_control_t *control, dt_progress_t *progress, dt_job_t *job)
Definition progress.c:333
const float uint32_t state[4]
dt_job_state_t state
Definition jobs.c:61
dt_job_queue_t queue
Definition jobs.c:63
dt_progress_t * progress
Definition jobs.c:67
dt_pthread_mutex_t state_mutex
Definition jobs.c:58
size_t params_size
Definition jobs.c:54
dt_job_destroy_callback params_destroy
Definition jobs.c:55
unsigned char priority
Definition jobs.c:62
dt_job_state_change_callback state_changed_cb
Definition jobs.c:65
int32_t result
Definition jobs.c:56
void * params
Definition jobs.c:53
dt_pthread_mutex_t wait_mutex
Definition jobs.c:59
char description[DT_CONTROL_DESCRIPTION_LEN]
Definition jobs.c:69
dt_job_execute_callback execute
Definition jobs.c:52
int32_t num_openmp_threads
Definition darktable.h:758
struct dt_control_t * control
Definition darktable.h:773
pthread_t * thread
Definition control.h:259
dt_pthread_mutex_t run_mutex
Definition control.h:256
int32_t num_threads
Definition control.h:258
int32_t running
Definition control.h:254
GList * queues[DT_JOB_QUEUE_MAX]
Definition control.h:262
gboolean export_scheduled
Definition control.h:255
pthread_t thread_res[DT_CTL_WORKER_RESERVED]
Definition control.h:268
pthread_cond_t cond
Definition control.h:257
dt_pthread_mutex_t res_mutex
Definition control.h:265
pthread_t kick_on_workers_thread
Definition control.h:259
dt_pthread_mutex_t queue_mutex
Definition control.h:256
uint8_t new_res[DT_CTL_WORKER_RESERVED]
Definition control.h:267
dt_job_t * job_res[DT_CTL_WORKER_RESERVED]
Definition control.h:266
dt_pthread_mutex_t cond_mutex
Definition control.h:256
size_t queue_length[DT_JOB_QUEUE_MAX]
Definition control.h:263
dt_job_t ** job
Definition control.h:260
dt_control_t * self
Definition jobs.c:46
#define sleep(n)
Definition win.h:37