35 #define NACK_TIME 80000
36 #define RENACK_TIME 100000
37 #define PERIOD_LEN 250
38 #define MAX_FRAME_PADDING 2
39 #define MAX_MISSING_SEQ 20
40 #define jb_debug(_jb, _level, _format, ...) if (_jb->debug_level >= _level) switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(_jb->session), SWITCH_LOG_ALERT, "JB:%p:%s lv:%d ln:%.4d sz:%.3u/%.3u/%.3u/%.3u c:%.3u %.3u/%.3u/%.3u/%.3u %.2f%% ->" _format, (void *) _jb, (jb->type == SJB_AUDIO ? "aud" : "vid"), _level, __LINE__, _jb->min_frame_len, _jb->max_frame_len, _jb->frame_len, _jb->complete_frames, _jb->period_count, _jb->consec_good_count, _jb->period_good_count, _jb->consec_miss_count, _jb->period_miss_count, _jb->period_miss_pct, __VA_ARGS__)
121 int insize, nmerges, psize, qsize, i;
141 for (i = 0; i < insize; i++) {
151 while (psize > 0 || (qsize > 0 && q)) {
156 e = q; q = q->
next; qsize--;
157 }
else if (qsize == 0 || !q) {
159 e = p; p = p->
next; psize--;
160 }
else if (cmp(p,q) <= 0) {
163 e = p; p = p->
next; psize--;
166 e = q; q = q->
next; qsize--;
237 }
else if (node->
prev) {
358 return lowest ? lowest : NULL;
385 if ((i % freq) == 0) {
413 return highest ? highest : NULL;
416 static inline uint32_t jb_find_highest_ts(
switch_jb_t *jb)
432 if (highest) second_highest = highest;
438 return second_highest ? second_highest : highest;
483 jb_debug(jb, 2,
"%d Change framelen from %u to %u\n", line, old_frame_len, jb->
frame_len);
491 #define jb_frame_inc(_jb, _i) jb_frame_inc_line(_jb, _i, __LINE__)
502 static inline int verify_oldest_frame(
switch_jb_t *jb)
517 for (np = lowest->
next; np; np = np->
next) {
519 if (!np->visible)
continue;
521 if (ntohs(np->packet.header.seq) != ntohs(np->prev->packet.header.seq) + 1) {
522 uint32_t val = (uint32_t)htons(ntohs(np->prev->packet.header.seq) + 1);
530 if (np->packet.header.ts != lowest->
packet.
header.
ts || !np->next) {
548 jb_debug(jb, 1,
"Dropping oldest frame ts:%u\n", ntohl(ts));
552 static inline void drop_newest_frame(
switch_jb_t *jb)
554 uint32_t ts = jb_find_highest_ts(jb);
557 jb_debug(jb, 1,
"Dropping highest frame ts:%u\n", ntohl(ts));
560 static inline void drop_second_newest_frame(
switch_jb_t *jb)
585 jb_debug(jb, (packet->
header.
m ? 1 : 2),
"PUT packet last_ts:%u ts:%u seq:%u%s\n",
589 int seq_diff = 0, ts_diff = 0;
603 if (((seq_diff >= jb->
max_frame_len) || (ts_diff > (900000 * 5)))) {
678 jb_debug(jb, 2,
"%s",
"DROPPED FRAME DETECTED RESYNCING\n");
693 jb_debug(jb, 1,
"%s",
"No nodes available....\n");
715 for (x = 0; x < 10; x++) {
721 jb_debug(jb, 2,
"%s",
"SAME FRAME DROPPING\n");
757 jb_debug(jb, 1,
"%s",
"No nodes available....\n");
799 jb->samples_per_frame = samples_per_frame;
800 jb->samples_per_second = samples_per_second;
808 jb->session = session;
814 if (tmp > 128 && tmp < 10240) {
815 jb->video_low_bitrate = (uint32_t)tmp;
833 return (jb->complete_frames >= jb->frame_len);
838 return jb->complete_frames;
843 jb->debug_level = level;
860 jb_debug(jb, 2,
"%s",
"RESET BUFFER\n");
863 jb->last_target_seq = 0;
866 jb->highest_wrote_seq = 0;
867 jb->highest_wrote_ts = 0;
869 jb->highest_read_ts = 0;
870 jb->highest_read_seq = 0;
871 jb->complete_frames = 0;
874 jb->complete_frames = 0;
875 jb->period_miss_count = 0;
876 jb->consec_miss_count = 0;
877 jb->period_miss_pct = 0;
878 jb->period_good_count = 0;
879 jb->consec_good_count = 0;
880 jb->period_count = 0;
881 jb->period_miss_inc = 0;
883 jb->last_target_ts = 0;
894 uint16_t want_seq = seq + peek;
896 }
else if (ts && jb->samples_per_frame) {
897 uint32_t want_ts = ts + (peek * jb->samples_per_frame);
905 frame->datalen = node->
len;
907 if (frame->data && frame->buflen > node->
len) {
922 *min_frame_len = jb->min_frame_len;
926 *max_frame_len = jb->max_frame_len;
930 *cur_frame_len = jb->frame_len;
944 if (jb->frame_len == jb->min_frame_len) lowest = 1;
946 jb->min_frame_len = min_frame_len;
947 jb->max_frame_len = max_frame_len;
949 if (jb->frame_len > jb->max_frame_len) {
950 jb->frame_len = jb->max_frame_len;
953 if (jb->frame_len < jb->min_frame_len) {
954 jb->frame_len = jb->min_frame_len;
957 if (jb->frame_len > jb->highest_frame_len) {
958 jb->highest_frame_len = jb->frame_len;
962 jb->frame_len = jb->min_frame_len;
1056 seq = ntohs(*((uint16_t *) var));
1057 then = (intptr_t) val;
1068 if (seq < ntohs(jb->target_seq) - jb->frame_len) {
1069 jb_debug(jb, 3,
"NACKABLE seq %u expired\n", seq);
1074 if (!least || seq < least) {
1080 jb_debug(jb, 3,
"Found NACKABLE seq %u\n", least);
1081 nack = (uint32_t) htons(least);
1084 for(i = 0; i < 16; i++) {
1087 jb_debug(jb, 3,
"Found addtl NACKABLE seq %u\n", least + i + 1);
1093 nack |= (uint32_t) blp << 16;
1107 uint16_t want = ntohs(jb->next_seq), got = ntohs(packet->header.seq);
1116 if (!want) want = got;
1119 jb->next_seq = htons(got + 1);
1123 if (got < ntohs(jb->target_seq)) {
1124 jb_debug(jb, 2,
"got nacked seq %u too late\n", got);
1127 jb_debug(jb, 2,
"got nacked %u saved the day!\n", got);
1132 if (got - want > jb->max_frame_len && got - want > 17) {
1133 jb_debug(jb, 2,
"Missing %u frames, Resetting\n", got - want);
1140 if (jb->frame_len < got - want) {
1144 jb_debug(jb, 2,
"GOT %u WANTED %u; MARK SEQS MISSING %u - %u\n", got, want, want, got - 1);
1146 for (i = want; i < got; i++) {
1147 jb_debug(jb, 2,
"MARK MISSING %u ts:%u\n", i, ntohl(packet->header.ts));
1153 if (got >= want || (want - got) > 1000) {
1154 jb->next_seq = htons(got + 1);
1176 jb_debug(jb, 2,
"Found buffered seq: %u\n", ntohs(seq));
1182 jb_debug(jb, 2,
"Missing buffered seq: %u\n", ntohs(seq));
1191 return jb->last_len;
1203 if (jb->complete_frames == 0) {
1207 if (jb->complete_frames < jb->frame_len) {
1208 jb_debug(jb, 2,
"BUFFERING %u/%u\n", jb->complete_frames , jb->frame_len);
1212 jb_debug(jb, 2,
"GET PACKET %u/%u n:%d\n", jb->complete_frames , jb->frame_len, jb->visible_nodes);
1216 if (jb->consec_good_count >= (
PERIOD_LEN - 5)) {
1220 jb->period_count = 1;
1221 jb->period_miss_inc = 0;
1222 jb->period_miss_count = 0;
1223 jb->period_good_count = 0;
1224 jb->consec_miss_count = 0;
1225 jb->consec_good_count = 0;
1227 if (jb->type ==
SJB_VIDEO && jb->channel && jb->video_low_bitrate) {
1232 jb_debug(jb, 2,
"%s",
"Allow BITRATE changes\n");
1234 jb->bitrate_control = 0;
1241 jb->bitrate_control = jb->video_low_bitrate;
1245 msg.
from = __FILE__;
1247 jb_debug(jb, 2,
"Force BITRATE to %d\n", jb->bitrate_control);
1258 jb->period_miss_pct = ((double)jb->period_miss_count / jb->period_count) * 100;
1260 if (jb->period_miss_pct > 60.0f) {
1261 jb_debug(jb, 2,
"Miss percent %02f too high, resetting buffer.\n", jb->period_miss_pct);
1268 if (!jb->read_init || ntohs(node->
packet.
header.
seq) > ntohs(jb->highest_read_seq) ||
1269 (ntohs(jb->highest_read_seq) > USHRT_MAX - 10 && ntohs(node->
packet.
header.
seq) <= 10) ) {
1273 if (jb->read_init && htons(node->
packet.
header.
seq) >= htons(jb->highest_read_seq) && (ntohl(node->
packet.
header.
ts) > ntohl(jb->highest_read_ts))) {
1274 jb->complete_frames--;
1275 jb_debug(jb, 2,
"READ frame ts: %u complete=%u/%u n:%u\n", ntohl(node->
packet.
header.
ts), jb->complete_frames , jb->frame_len, jb->visible_nodes);
1277 }
else if (!jb->read_init) {
1281 if (!jb->read_init) jb->read_init = 1;
1288 jb_debug(jb, 2,
"%s",
"Error encountered ask for new keyframe\n");
1292 jb_debug(jb, 2,
"%s",
"No frames found wait for more\n");
1298 jb_debug(jb, 2,
"%s",
"Error encountered\n");
1303 if (jb->consec_miss_count > jb->frame_len) {
1306 jb_debug(jb, 2,
"%s",
"Too many frames not found, RESIZE\n");
1309 jb_debug(jb, 2,
"%s",
"Frame not found suggest PLC\n");
1322 jb->last_len = *
len;
1326 jb_debug(jb, 1,
"GET packet ts:%u seq:%u %s\n", ntohl(packet->header.ts), ntohs(packet->header.seq), packet->header.m ?
" <MARK>" :
"");
1338 if (jb->samples_per_frame) {
1339 seq = htons(jb->last_psuedo_seq);
1340 ts = jb->last_target_ts;
1342 seq = jb->last_target_seq;
1345 packet->header.seq = seq;
1346 packet->header.ts = ts;
1351 if (jb->complete_frames > jb->max_frame_len) {
uint32_t highest_wrote_ts
#define switch_core_new_memory_pool(p)
Create a new sub memory pool from the core's master pool.
switch_status_t switch_jb_put_packet(switch_jb_t *jb, switch_rtp_packet_t *packet, switch_size_t len)
#define switch_set_flag(obj, flag)
Set a flag on an arbitrary object.
switch_status_t switch_core_inthash_init(switch_inthash_t **hash)
static switch_jb_node_t * jb_find_lowest_seq(switch_jb_t *jb, uint32_t ts)
void switch_jb_debug_level(switch_jb_t *jb, uint8_t level)
switch_core_session_message_types_t message_id
#define SWITCH_CHANNEL_LOG
uint32_t highest_read_seq
switch_inthash_t * node_hash
switch_channel_t * channel
switch_inthash_t * node_hash_ts
switch_inthash_t * missing_seq_hash
static void hide_nodes(switch_jb_t *jb)
struct switch_jb_node_s * next
void switch_jb_ts_mode(switch_jb_t *jb, uint32_t samples_per_frame, uint32_t samples_per_second)
uint32_t switch_jb_pop_nack(switch_jb_t *jb)
#define switch_core_destroy_memory_pool(p)
Returns a subpool back to the main pool.
switch_memory_pool_t * pool
#define jb_debug(_jb, _level, _format,...)
void * switch_core_inthash_find(switch_inthash_t *hash, uint32_t key)
switch_status_t switch_jb_create(switch_jb_t **jbp, switch_jb_type_t type, uint32_t min_frame_len, uint32_t max_frame_len, switch_memory_pool_t *pool)
#define jb_frame_inc(_jb, _i)
int switch_jb_frame_count(switch_jb_t *jb)
struct switch_jb_node_s switch_jb_node_t
static uint32_t jb_find_lowest_ts(switch_jb_t *jb)
static void set_read_seq(switch_jb_t *jb, uint16_t seq)
const char * switch_channel_get_variable_dup(switch_channel_t *channel, const char *varname, switch_bool_t dup, int idx)
Retrieve a variable from a given channel.
switch_jb_node_t * sort_nodes(switch_jb_node_t *list, int(*cmp)(const void *, const void *))
int switch_jb_poll(switch_jb_t *jb)
uint32_t switch_channel_test_flag(switch_channel_t *channel, switch_channel_flag_t flag)
Test for presence of given flag on a given channel.
A message object designed to allow unlike technologies to exchange data.
switch_status_t switch_mutex_unlock(switch_mutex_t *lock)
struct switch_jb_s * parent
_Ret_ switch_channel_t * switch_core_session_get_channel(_In_ switch_core_session_t *session)
Retrieve a pointer to the channel object associated with a given session.
switch_status_t switch_core_inthash_destroy(switch_inthash_t **hash)
switch_status_t switch_jb_peek_frame(switch_jb_t *jb, uint32_t ts, uint16_t seq, int peek, switch_frame_t *frame)
#define switch_clear_flag(obj, flag)
Clear a flag on an arbitrary object while locked.
#define SWITCH_MUTEX_NESTED
static switch_jb_node_t * jb_find_lowest_node(switch_jb_t *jb)
uint32_t samples_per_second
static void jb_hit(switch_jb_t *jb)
switch_status_t switch_jb_get_packet_by_seq(switch_jb_t *jb, uint16_t seq, switch_rtp_packet_t *packet, switch_size_t *len)
static void drop_oldest_frame(switch_jb_t *jb)
switch_status_t switch_jb_get_packet(switch_jb_t *jb, switch_rtp_packet_t *packet, switch_size_t *len)
static void add_node(switch_jb_t *jb, switch_rtp_packet_t *packet, switch_size_t len)
static void increment_ts(switch_jb_t *jb)
void * switch_core_inthash_delete(switch_inthash_t *hash, uint32_t key)
static void push_to_top(switch_jb_t *jb, switch_jb_node_t *node)
switch_status_t switch_mutex_lock(switch_mutex_t *lock)
#define switch_core_alloc(_pool, _mem)
Allocate memory directly from a memory pool.
static void drop_ts(switch_jb_t *jb, uint32_t ts)
switch_status_t switch_core_inthash_insert(switch_inthash_t *hash, uint32_t key, const void *data)
switch_status_t switch_mutex_init(switch_mutex_t **lock, unsigned int flags, switch_memory_pool_t *pool)
void switch_jb_clear_flag(switch_jb_t *jb, switch_jb_flag_t flag)
static switch_status_t jb_next_packet_by_ts(switch_jb_t *jb, switch_jb_node_t **nodep)
uint32_t consec_miss_count
void switch_core_hash_this(_In_ switch_hash_index_t *hi, _Out_opt_ptrdiff_cap_(klen) const void **key, _Out_opt_ switch_ssize_t *klen, _Out_ void **val)
Gets the key and value of the current hash element.
An abstraction of a data frame.
switch_hash_index_t * switch_core_hash_next(_In_ switch_hash_index_t **hi)
Gets the next element of a hashtable.
static void thin_frames(switch_jb_t *jb, int freq, int max)
#define switch_core_session_receive_message(_session, _message)
static void sort_free_nodes(switch_jb_t *jb)
uint32_t video_low_bitrate
static void increment_seq(switch_jb_t *jb)
uint32_t period_good_count
uint32_t period_miss_count
switch_status_t switch_core_session_request_video_refresh(switch_core_session_t *session)
static int node_cmp(const void *l, const void *r)
struct apr_thread_mutex_t switch_mutex_t
switch_status_t switch_jb_destroy(switch_jb_t **jbp)
switch_status_t
Common return values.
#define switch_goto_status(_status, _label)
void switch_jb_set_session(switch_jb_t *jb, switch_core_session_t *session)
static void jb_frame_inc_line(switch_jb_t *jb, int i, int line)
char body[SWITCH_RTP_MAX_BUF_LEN+4+sizeof(char *)]
switch_mutex_t * list_mutex
static switch_jb_node_t * new_node(switch_jb_t *jb)
static switch_status_t jb_next_packet_by_seq(switch_jb_t *jb, switch_jb_node_t **nodep)
static void set_read_ts(switch_jb_t *jb, uint32_t ts)
#define switch_channel_set_flag(_c, _f)
void switch_jb_set_flag(switch_jb_t *jb, switch_jb_flag_t flag)
switch_core_session_t * session
switch_size_t switch_jb_get_last_read_len(switch_jb_t *jb)
struct switch_jb_node_s * prev
static void free_nodes(switch_jb_t *jb)
switch_status_t switch_jb_set_frames(switch_jb_t *jb, uint32_t min_frame_len, uint32_t max_frame_len)
uint32_t highest_frame_len
struct apr_pool_t switch_memory_pool_t
#define switch_test_flag(obj, flag)
Test for the existance of a flag on an arbitary object.
static void jb_miss(switch_jb_t *jb)
switch_memory_pool_t * pool
void switch_log_printf(_In_ switch_text_channel_t channel, _In_z_ const char *file, _In_z_ const char *func, _In_ int line, _In_opt_z_ const char *userdata, _In_ switch_log_level_t level, _In_z_ _Printf_format_string_ const char *fmt,...) PRINTF_FUNCTION(7
Write log data to the logging engine.
switch_rtp_packet_t packet
switch_status_t switch_jb_get_frames(switch_jb_t *jb, uint32_t *min_frame_len, uint32_t *max_frame_len, uint32_t *cur_frame_len, uint32_t *highest_frame_len)
void switch_channel_clear_flag(switch_channel_t *channel, switch_channel_flag_t flag)
Clear given flag(s) from a channel.
uint32_t highest_wrote_seq
switch_time_t switch_time_now(void)
static switch_status_t jb_next_packet(switch_jb_t *jb, switch_jb_node_t **nodep)
uint32_t samples_per_frame
#define switch_core_hash_first(_h)
#define SWITCH_SIZE_T_FMT
static void hide_node(switch_jb_node_t *node, switch_bool_t pop)
struct switch_jb_node_s * node_list
void switch_jb_reset(switch_jb_t *jb)
uint32_t consec_good_count