FreeSWITCH API Documentation  1.7.0
switch_scheduler.c
Go to the documentation of this file.
1 /*
2  * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3  * Copyright (C) 2005-2014, Anthony Minessale II <anthm@freeswitch.org>
4  *
5  * Version: MPL 1.1
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18  *
19  * The Initial Developer of the Original Code is
20  * Anthony Minessale II <anthm@freeswitch.org>
21  * Portions created by the Initial Developer are Copyright (C)
22  * the Initial Developer. All Rights Reserved.
23  *
24  * Contributor(s):
25  *
26  * Anthony Minessale II <anthm@freeswitch.org>
27  *
28  *
29  * switch_scheduler.c -- Switch Scheduler
30  *
31  */
32 
33 #include <switch.h>
34 
37  int64_t executed;
38  int in_thread;
39  int destroyed;
40  int running;
43  uint32_t flags;
44  char *desc;
46 };
48 
49 static struct {
52  uint32_t task_id;
56 } globals;
57 
59 {
60  switch_event_t *event;
61  //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Executing task %u %s (%s)\n", tp->task.task_id, tp->desc, switch_str_nil(tp->task.group));
62 
63  tp->func(&tp->task);
64 
65  if (tp->task.repeat) {
66  tp->task.runtime = switch_epoch_time_now(NULL) + tp->task.repeat;
67  }
68 
69  if (tp->task.runtime > tp->executed) {
70  tp->executed = 0;
72  switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-ID", "%u", tp->task.task_id);
76  switch_queue_push(globals.event_queue, event);
77  event = NULL;
78  }
79  } else {
80  tp->destroyed = 1;
81  }
82 }
83 
85 {
88 
89  pool = tp->pool;
90  tp->pool = NULL;
91 
94  tp->in_thread = 0;
95 
96  return NULL;
97 }
98 
99 static int task_thread_loop(int done)
100 {
101  switch_scheduler_task_container_t *tofree, *tp, *last = NULL;
102 
103 
104  switch_mutex_lock(globals.task_mutex);
105 
106  for (tp = globals.task_list; tp; tp = tp->next) {
107  if (done) {
108  tp->destroyed = 1;
109  } else if (!tp->destroyed) {
110  int64_t now = switch_epoch_time_now(NULL);
111  if (now >= tp->task.runtime && !tp->in_thread) {
112  int32_t diff = (int32_t) (now - tp->task.runtime);
113  if (diff > 1) {
114  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Task was executed late by %d seconds %u %s (%s)\n",
115  diff, tp->task.task_id, tp->desc, switch_str_nil(tp->task.group));
116  }
117  tp->executed = now;
120  switch_threadattr_t *thd_attr;
122  switch_threadattr_create(&thd_attr, tp->pool);
123  switch_threadattr_detach_set(thd_attr, 1);
124  tp->in_thread = 1;
125  switch_thread_create(&thread, thd_attr, task_own_thread, tp, tp->pool);
126  } else {
127  tp->running = 1;
128  switch_mutex_unlock(globals.task_mutex);
130  switch_mutex_lock(globals.task_mutex);
131  tp->running = 0;
132  }
133  }
134  }
135  }
136  switch_mutex_unlock(globals.task_mutex);
137  switch_mutex_lock(globals.task_mutex);
138  for (tp = globals.task_list; tp;) {
139  if (tp->destroyed && !tp->in_thread) {
140  switch_event_t *event;
141 
142  tofree = tp;
143  tp = tp->next;
144  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Deleting task %u %s (%s)\n",
145  tofree->task.task_id, tofree->desc, switch_str_nil(tofree->task.group));
146 
147 
149  switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-ID", "%u", tofree->task.task_id);
150  switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Desc", tofree->desc);
152  switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-Runtime", "%" SWITCH_INT64_T_FMT, tofree->task.runtime);
153  switch_queue_push(globals.event_queue, event);
154  event = NULL;
155  }
156 
157  if (last) {
158  last->next = tofree->next;
159  } else {
160  globals.task_list = tofree->next;
161  }
162  switch_safe_free(tofree->task.group);
163  if (tofree->task.cmd_arg && switch_test_flag(tofree, SSHF_FREE_ARG)) {
164  free(tofree->task.cmd_arg);
165  }
166  switch_safe_free(tofree->desc);
167  free(tofree);
168  } else {
169  last = tp;
170  tp = tp->next;
171  }
172  }
173  switch_mutex_unlock(globals.task_mutex);
174 
175  return done;
176 }
177 
179 {
180  void *pop;
181  globals.task_thread_running = 1;
182 
183  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Starting task thread\n");
184  while (globals.task_thread_running == 1) {
185  if (task_thread_loop(0)) {
186  break;
187  }
188  if (switch_queue_pop_timeout(globals.event_queue, &pop, 500000) == SWITCH_STATUS_SUCCESS) {
189  switch_event_t *event = (switch_event_t *) pop;
190  switch_event_fire(&event);
191  }
192  }
193 
194  task_thread_loop(1);
195 
196  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Task thread ending\n");
197 
198  while(switch_queue_trypop(globals.event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
199  switch_event_t *event = (switch_event_t *) pop;
200  switch_event_destroy(&event);
201  }
202 
203  globals.task_thread_running = 0;
204 
205  return NULL;
206 }
207 
208 SWITCH_DECLARE(uint32_t) switch_scheduler_add_task(time_t task_runtime,
210  const char *desc, const char *group, uint32_t cmd_id, void *cmd_arg, switch_scheduler_flag_t flags)
211 {
212  switch_scheduler_task_container_t *container, *tp;
213  switch_event_t *event;
215  switch_ssize_t hlen = -1;
216 
217  switch_mutex_lock(globals.task_mutex);
218  switch_zmalloc(container, sizeof(*container));
219  switch_assert(func);
220 
221  if (task_runtime < now) {
222  container->task.repeat = (uint32_t)task_runtime;
223  task_runtime += now;
224  }
225 
226  container->func = func;
227  container->task.created = now;
228  container->task.runtime = task_runtime;
229  container->task.group = strdup(group ? group : "none");
230  container->task.cmd_id = cmd_id;
231  container->task.cmd_arg = cmd_arg;
232  container->flags = flags;
233  container->desc = strdup(desc ? desc : "none");
234  container->task.hash = switch_ci_hashfunc_default(container->task.group, &hlen);
235 
236  for (tp = globals.task_list; tp && tp->next; tp = tp->next);
237 
238  if (tp) {
239  tp->next = container;
240  } else {
241  globals.task_list = container;
242  }
243 
244  for (container->task.task_id = 0; !container->task.task_id; container->task.task_id = ++globals.task_id);
245 
246  switch_mutex_unlock(globals.task_mutex);
247 
248  tp = container;
249  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Added task %u %s (%s) to run at %" SWITCH_INT64_T_FMT "\n",
250  tp->task.task_id, tp->desc, switch_str_nil(tp->task.group), tp->task.runtime);
251 
253  switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-ID", "%u", tp->task.task_id);
254  switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Desc", tp->desc);
257  switch_queue_push(globals.event_queue, event);
258  event = NULL;
259  }
260  return container->task.task_id;
261 }
262 
264 {
266  uint32_t delcnt = 0;
267 
268  switch_mutex_lock(globals.task_mutex);
269  for (tp = globals.task_list; tp; tp = tp->next) {
270  if (tp->task.task_id == task_id) {
271  if (switch_test_flag(tp, SSHF_NO_DEL)) {
272  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Attempt made to delete undeletable task #%u (group %s)\n",
273  tp->task.task_id, tp->task.group);
274  break;
275  }
276 
277  if (tp->running) {
278  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Attempt made to delete running task #%u (group %s)\n",
279  tp->task.task_id, tp->task.group);
280  break;
281  }
282 
283  tp->destroyed++;
284  delcnt++;
285  break;
286  }
287  }
288  switch_mutex_unlock(globals.task_mutex);
289 
290  return delcnt;
291 }
292 
294 {
296  uint32_t delcnt = 0;
297  switch_ssize_t hlen = -1;
298  unsigned long hash = 0;
299 
300  if (zstr(group)) {
301  return 0;
302  }
303 
304  hash = switch_ci_hashfunc_default(group, &hlen);
305 
306  switch_mutex_lock(globals.task_mutex);
307  for (tp = globals.task_list; tp; tp = tp->next) {
308  if (tp->destroyed) {
309  continue;
310  }
311  if (hash == tp->task.hash && !strcmp(tp->task.group, group)) {
312  if (switch_test_flag(tp, SSHF_NO_DEL)) {
313  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Attempt made to delete undeletable task #%u (group %s)\n",
314  tp->task.task_id, group);
315  continue;
316  }
317  tp->destroyed++;
318  delcnt++;
319  }
320  }
321  switch_mutex_unlock(globals.task_mutex);
322 
323  return delcnt;
324 }
325 
327 
329 {
330 
331  switch_threadattr_t *thd_attr;
332 
333  switch_core_new_memory_pool(&globals.memory_pool);
334  switch_threadattr_create(&thd_attr, globals.memory_pool);
335  switch_mutex_init(&globals.task_mutex, SWITCH_MUTEX_NESTED, globals.memory_pool);
336  switch_queue_create(&globals.event_queue, 250000, globals.memory_pool);
337 
339 }
340 
342 {
343  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping Task Thread\n");
344  if (globals.task_thread_running == 1) {
345  int sanity = 0;
346  switch_status_t st;
347 
348  globals.task_thread_running = -1;
349 
351 
352  while (globals.task_thread_running) {
353  switch_yield(100000);
354  if (++sanity > 10) {
355  break;
356  }
357  }
358  }
359 
361 
362 }
363 
364 /* For Emacs:
365  * Local Variables:
366  * mode:c
367  * indent-tabs-mode:t
368  * tab-width:4
369  * c-basic-offset:4
370  * End:
371  * For VIM:
372  * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
373  */
struct apr_queue_t switch_queue_t
Definition: switch_apr.h:590
#define switch_event_fire(event)
Fire an event filling in most of the arguements with obvious values.
Definition: switch_event.h:412
#define switch_core_new_memory_pool(p)
Create a new sub memory pool from the core's master pool.
Definition: switch_core.h:631
uint32_t switch_scheduler_del_task_group(const char *group)
Delete a scheduled task based on the group name.
#define SWITCH_THREAD_FUNC
#define SWITCH_CHANNEL_LOG
switch_scheduler_func_t func
static void *SWITCH_THREAD_FUNC task_own_thread(switch_thread_t *thread, void *obj)
static void switch_scheduler_execute(switch_scheduler_task_container_t *tp)
switch_status_t switch_queue_pop_timeout(switch_queue_t *queue, void **data, switch_interval_time_t timeout)
Definition: switch_apr.c:1124
void switch_scheduler_task_thread_start(void)
Start the scheduler system.
#define switch_core_destroy_memory_pool(p)
Returns a subpool back to the main pool.
Definition: switch_core.h:640
switch_memory_pool_t * pool
switch_status_t switch_event_add_header(switch_event_t *event, switch_stack_t stack, const char *header_name, const char *fmt,...) PRINTF_FUNCTION(4
Add a header to an event.
Representation of an event.
Definition: switch_event.h:80
switch_status_t switch_queue_trypop(switch_queue_t *queue, void **data)
Definition: switch_apr.c:1140
unsigned int switch_ci_hashfunc_default(const char *char_key, switch_ssize_t *klen)
Definition: switch_apr.c:98
void switch_scheduler_task_thread_stop(void)
Stop the scheduler system.
uint32_t switch_scheduler_flag_t
Definition: switch_types.h:461
static switch_thread_t * thread
Definition: switch_log.c:279
switch_hash_t * hash
Definition: switch_event.c:74
uint32_t switch_scheduler_add_task(time_t task_runtime, switch_scheduler_func_t func, const char *desc, const char *group, uint32_t cmd_id, void *cmd_arg, switch_scheduler_flag_t flags)
Schedule a task in the future.
#define zstr(x)
Definition: switch_utils.h:281
switch_status_t switch_mutex_unlock(switch_mutex_t *lock)
Definition: switch_apr.c:290
switch_status_t switch_thread_join(switch_status_t *retval, switch_thread_t *thd)
Definition: switch_apr.c:1255
#define SWITCH_MUTEX_NESTED
Definition: switch_apr.h:318
switch_status_t switch_threadattr_detach_set(switch_threadattr_t *attr, int32_t on)
Definition: switch_apr.c:655
int64_t switch_time_t
Definition: switch_apr.h:188
#define switch_yield(ms)
Wait a desired number of microseconds and yield the CPU.
Definition: switch_utils.h:908
int task_thread_running
switch_status_t switch_mutex_lock(switch_mutex_t *lock)
Definition: switch_apr.c:285
intptr_t switch_ssize_t
uint32_t switch_scheduler_del_task_id(uint32_t task_id)
Delete a scheduled task.
switch_status_t switch_event_add_header_string(switch_event_t *event, switch_stack_t stack, const char *header_name, const char *data)
Add a string header to an event.
#define switch_zmalloc(ptr, len)
#define switch_safe_free(it)
Free a pointer and set it to NULL unless it already is NULL.
Definition: switch_utils.h:789
switch_status_t switch_mutex_init(switch_mutex_t **lock, unsigned int flags, switch_memory_pool_t *pool)
Definition: switch_apr.c:270
switch_memory_pool_t * memory_pool
void(* switch_scheduler_func_t)(switch_scheduler_task_t *task)
#define switch_str_nil(s)
Make a null string a blank string instead.
Definition: switch_utils.h:903
struct switch_scheduler_task_container * next
switch_thread_t * task_thread_p
struct apr_thread_mutex_t switch_mutex_t
Definition: switch_apr.h:314
switch_status_t
Common return values.
#define SWITCH_INT64_T_FMT
switch_queue_t * event_queue
struct apr_thread_t switch_thread_t
Definition: switch_apr.h:941
Main Library Header.
#define switch_event_create(event, id)
Create a new event assuming it will not be custom event and therefore hiding the unused parameters...
Definition: switch_event.h:383
#define SWITCH_DECLARE(type)
static void *SWITCH_THREAD_FUNC switch_scheduler_task_thread(switch_thread_t *thread, void *obj)
uint32_t task_id
time_t switch_epoch_time_now(time_t *t)
Get the current epoch time.
Definition: switch_time.c:321
switch_status_t switch_queue_push(switch_queue_t *queue, void *data)
Definition: switch_apr.c:1129
struct apr_pool_t switch_memory_pool_t
#define switch_test_flag(obj, flag)
Test for the existance of a flag on an arbitary object.
Definition: switch_utils.h:624
void switch_log_printf(_In_ switch_text_channel_t channel, _In_z_ const char *file, _In_z_ const char *func, _In_ int line, _In_opt_z_ const char *userdata, _In_ switch_log_level_t level, _In_z_ _Printf_format_string_ const char *fmt,...) PRINTF_FUNCTION(7
Write log data to the logging engine.
switch_status_t switch_threadattr_create(switch_threadattr_t **new_attr, switch_memory_pool_t *pool)
Definition: switch_apr.c:642
switch_status_t switch_thread_create(switch_thread_t **new_thread, switch_threadattr_t *attr, switch_thread_start_t func, void *data, switch_memory_pool_t *cont)
Definition: switch_apr.c:675
switch_memory_pool_t * pool
switch_status_t switch_queue_create(switch_queue_t **queue, unsigned int queue_capacity, switch_memory_pool_t *pool)
Definition: switch_apr.c:1109
void switch_event_destroy(switch_event_t **event)
Destroy an event.
#define switch_assert(expr)
static int task_thread_loop(int done)
switch_mutex_t * task_mutex
static struct @6 globals
switch_scheduler_task_container_t * task_list
switch_scheduler_task_t task