FreeRDP
Loading...
Searching...
No Matches
StreamPool.c
1
20#include <winpr/config.h>
21
22#include <winpr/crt.h>
23#include <winpr/wlog.h>
24
25#include <winpr/collections.h>
26
27#include "../stream.h"
28#include "../log.h"
29#define XTAG WINPR_TAG("utils.streampool")
30
31struct s_StreamPoolEntry
32{
33#if defined(WITH_STREAMPOOL_DEBUG)
34 char** msg;
35 size_t lines;
36#endif
37 wStream* s;
38};
39
40struct s_wStreamPool
41{
42 size_t aSize;
43 size_t aCapacity;
44 struct s_StreamPoolEntry* aArray;
45
46 size_t uSize;
47 size_t uCapacity;
48 struct s_StreamPoolEntry* uArray;
49
51 BOOL synchronized;
52 size_t defaultSize;
53 wLog* log;
54};
55
56static void discard_entry(struct s_StreamPoolEntry* entry, BOOL discardStream)
57{
58 if (!entry)
59 return;
60
61#if defined(WITH_STREAMPOOL_DEBUG)
62 free((void*)entry->msg);
63#endif
64
65 if (discardStream && entry->s)
66 Stream_Free(entry->s, entry->s->isAllocatedStream);
67
68 const struct s_StreamPoolEntry empty = WINPR_C_ARRAY_INIT;
69 *entry = empty;
70}
71
72static struct s_StreamPoolEntry add_entry(wStream* s)
73{
74 struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
75
76#if defined(WITH_STREAMPOOL_DEBUG)
77 void* stack = winpr_backtrace(20);
78 if (stack)
79 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
80 winpr_backtrace_free(stack);
81#endif
82
83 entry.s = s;
84 return entry;
85}
86
91static inline void StreamPool_Lock(wStreamPool* pool)
92{
93 WINPR_ASSERT(pool);
94 if (pool->synchronized)
95 EnterCriticalSection(&pool->lock);
96}
97
102static inline void StreamPool_Unlock(wStreamPool* pool)
103{
104 WINPR_ASSERT(pool);
105 if (pool->synchronized)
106 LeaveCriticalSection(&pool->lock);
107}
108
109static BOOL StreamPool_EnsureCapacity(wStreamPool* pool, size_t count, BOOL usedOrAvailable)
110{
111 WINPR_ASSERT(pool);
112
113 size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
114 size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
115 struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
116
117 size_t new_cap = 0;
118 if (*cap == 0)
119 new_cap = *size + count;
120 else if (*size + count > *cap)
121 new_cap = (*size + count + 2) / 2 * 3;
122 else if ((*size + count) < *cap / 3)
123 new_cap = *cap / 2;
124
125 if (new_cap > 0)
126 {
127 struct s_StreamPoolEntry* new_arr = nullptr;
128
129 if (*cap < *size + count)
130 *cap += count;
131
132 new_arr =
133 (struct s_StreamPoolEntry*)realloc(*array, sizeof(struct s_StreamPoolEntry) * new_cap);
134 if (!new_arr)
135 return FALSE;
136 *cap = new_cap;
137 *array = new_arr;
138 }
139 return TRUE;
140}
141
146static void StreamPool_ShiftUsed(wStreamPool* pool, size_t index)
147{
148 WINPR_ASSERT(pool);
149
150 const size_t pcount = 1;
151 const size_t off = index + pcount;
152 if (pool->uSize >= off)
153 {
154 for (size_t x = 0; x < pcount; x++)
155 {
156 struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
157 discard_entry(cur, FALSE);
158 }
159 MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
160 (pool->uSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
161 pool->uSize -= pcount;
162 }
163}
164
169static void StreamPool_AddUsed(wStreamPool* pool, wStream* s)
170{
171 StreamPool_EnsureCapacity(pool, 1, TRUE);
172 pool->uArray[pool->uSize] = add_entry(s);
173 pool->uSize++;
174}
175
180static void StreamPool_RemoveUsed(wStreamPool* pool, wStream* s)
181{
182 WINPR_ASSERT(pool);
183 for (size_t index = 0; index < pool->uSize; index++)
184 {
185 struct s_StreamPoolEntry* cur = &pool->uArray[index];
186 if (cur->s == s)
187 {
188 StreamPool_ShiftUsed(pool, index);
189 break;
190 }
191 }
192}
193
194static void StreamPool_ShiftAvailable(wStreamPool* pool, size_t index)
195{
196 WINPR_ASSERT(pool);
197
198 const size_t pcount = 1;
199 const size_t off = index + pcount;
200 if (pool->aSize >= off)
201 {
202 for (size_t x = 0; x < pcount; x++)
203 {
204 struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
205 discard_entry(cur, FALSE);
206 }
207
208 MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
209 (pool->aSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
210 pool->aSize -= pcount;
211 }
212}
213
218wStream* StreamPool_Take(wStreamPool* pool, size_t size)
219{
220 BOOL found = FALSE;
221 size_t foundIndex = 0;
222 wStream* s = nullptr;
223
224 StreamPool_Lock(pool);
225
226 if (size == 0)
227 size = pool->defaultSize;
228
229 for (size_t index = 0; index < pool->aSize; index++)
230 {
231 struct s_StreamPoolEntry* cur = &pool->aArray[index];
232 s = cur->s;
233
234 if (Stream_Capacity(s) >= size)
235 {
236 found = TRUE;
237 foundIndex = index;
238 break;
239 }
240 }
241
242 if (!found)
243 {
244 s = Stream_New(nullptr, size);
245 if (!s)
246 goto out_fail;
247 }
248 else if (s)
249 {
250 Stream_ResetPosition(s);
251 if (!Stream_SetLength(s, Stream_Capacity(s)))
252 goto out_fail;
253 StreamPool_ShiftAvailable(pool, foundIndex);
254 }
255
256 if (s)
257 {
258 s->pool = pool;
259 s->count = 1;
260 StreamPool_AddUsed(pool, s);
261 }
262
263out_fail:
264 StreamPool_Unlock(pool);
265
266 return s;
267}
268
273static void StreamPool_Remove(wStreamPool* pool, wStream* s)
274{
275 StreamPool_EnsureCapacity(pool, 1, FALSE);
276 Stream_EnsureValidity(s);
277 for (size_t x = 0; x < pool->aSize; x++)
278 {
279 wStream* cs = pool->aArray[x].s;
280 if (cs == s)
281 return;
282 }
283 pool->aArray[(pool->aSize)++] = add_entry(s);
284 StreamPool_RemoveUsed(pool, s);
285}
286
287static void StreamPool_ReleaseOrReturn(wStreamPool* pool, wStream* s)
288{
289 StreamPool_Lock(pool);
290 StreamPool_Remove(pool, s);
291 StreamPool_Unlock(pool);
292}
293
294void StreamPool_Return(wStreamPool* pool, wStream* s)
295{
296 WINPR_ASSERT(pool);
297 if (!s)
298 return;
299
300 StreamPool_Lock(pool);
301 StreamPool_Remove(pool, s);
302 StreamPool_Unlock(pool);
303}
304
309void Stream_AddRef(wStream* s)
310{
311 WINPR_ASSERT(s);
312 s->count++;
313}
314
319void Stream_Release(wStream* s)
320{
321 WINPR_ASSERT(s);
322
323 if (s->count > 0)
324 s->count--;
325 if (s->count == 0)
326 {
327 if (s->pool)
328 StreamPool_ReleaseOrReturn(s->pool, s);
329 else
330 Stream_Free(s, TRUE);
331 }
332}
333
338wStream* StreamPool_Find(wStreamPool* pool, const BYTE* ptr)
339{
340 wStream* s = nullptr;
341
342 StreamPool_Lock(pool);
343
344 for (size_t index = 0; index < pool->uSize; index++)
345 {
346 struct s_StreamPoolEntry* cur = &pool->uArray[index];
347
348 if ((ptr >= Stream_Buffer(cur->s)) &&
349 (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
350 {
351 s = cur->s;
352 break;
353 }
354 }
355
356 StreamPool_Unlock(pool);
357
358 return s;
359}
360
365void StreamPool_Clear(wStreamPool* pool)
366{
367 StreamPool_Lock(pool);
368
369 for (size_t x = 0; x < pool->aSize; x++)
370 {
371 struct s_StreamPoolEntry* cur = &pool->aArray[x];
372 discard_entry(cur, TRUE);
373 }
374 pool->aSize = 0;
375
376 if (pool->uSize > 0)
377 {
378 WLog_Print(pool->log, WLOG_WARN,
379 "Clearing StreamPool, but there are %" PRIuz " streams currently in use",
380 pool->uSize);
381 for (size_t x = 0; x < pool->uSize; x++)
382 {
383 struct s_StreamPoolEntry* cur = &pool->uArray[x];
384 discard_entry(cur, TRUE);
385 }
386 pool->uSize = 0;
387 }
388
389 StreamPool_Unlock(pool);
390}
391
392size_t StreamPool_UsedCount(wStreamPool* pool)
393{
394 StreamPool_Lock(pool);
395 size_t usize = pool->uSize;
396 StreamPool_Unlock(pool);
397 return usize;
398}
399
404wStreamPool* StreamPool_New(BOOL synchronized, size_t defaultSize)
405{
406 wStreamPool* pool = calloc(1, sizeof(wStreamPool));
407
408 if (!pool)
409 return nullptr;
410
411 pool->log = WLog_Create(XTAG, WLog_GetRoot());
412 if (!pool->log)
413 goto fail;
414
415 pool->synchronized = synchronized;
416 pool->defaultSize = defaultSize;
417
418 if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
419 goto fail;
420 if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
421 goto fail;
422
423 if (!InitializeCriticalSectionAndSpinCount(&pool->lock, 4000))
424 goto fail;
425
426 return pool;
427fail:
428 WINPR_PRAGMA_DIAG_PUSH
429 WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
430 StreamPool_Free(pool);
431 WINPR_PRAGMA_DIAG_POP
432 return nullptr;
433}
434
435void StreamPool_Free(wStreamPool* pool)
436{
437 if (!pool)
438 return;
439
440 StreamPool_Clear(pool);
441
442 DeleteCriticalSection(&pool->lock);
443
444 free(pool->aArray);
445 free(pool->uArray);
446
447 WLog_Discard(pool->log);
448 free(pool);
449}
450
451char* StreamPool_GetStatistics(wStreamPool* pool, char* buffer, size_t size)
452{
453 WINPR_ASSERT(pool);
454
455 if (!buffer || (size < 1))
456 return nullptr;
457
458 size_t used = 0;
459 int offset = _snprintf(buffer, size - 1,
460 "aSize =%" PRIuz ", uSize =%" PRIuz ", aCapacity=%" PRIuz
461 ", uCapacity=%" PRIuz,
462 pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
463 if ((offset > 0) && ((size_t)offset < size))
464 used += (size_t)offset;
465
466#if defined(WITH_STREAMPOOL_DEBUG)
467 StreamPool_Lock(pool);
468
469 offset = _snprintf(&buffer[used], size - 1 - used, "\n-- dump used array take locations --\n");
470 if ((offset > 0) && ((size_t)offset < size - used))
471 used += (size_t)offset;
472 for (size_t x = 0; x < pool->uSize; x++)
473 {
474 const struct s_StreamPoolEntry* cur = &pool->uArray[x];
475 WINPR_ASSERT(cur->msg || (cur->lines == 0));
476
477 for (size_t y = 0; y < cur->lines; y++)
478 {
479 offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz " | %" PRIuz "]: %s\n", x,
480 y, cur->msg[y]);
481 if ((offset > 0) && ((size_t)offset < size - used))
482 used += (size_t)offset;
483 }
484 }
485
486 offset = _snprintf(&buffer[used], size - 1 - used, "\n-- statistics called from --\n");
487 if ((offset > 0) && ((size_t)offset < size - used))
488 used += (size_t)offset;
489
490 struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
491 void* stack = winpr_backtrace(20);
492 if (stack)
493 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
494 winpr_backtrace_free(stack);
495
496 for (size_t x = 0; x < entry.lines; x++)
497 {
498 const char* msg = entry.msg[x];
499 offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz "]: %s\n", x, msg);
500 if ((offset > 0) && ((size_t)offset < size - used))
501 used += (size_t)offset;
502 }
503 free((void*)entry.msg);
504 StreamPool_Unlock(pool);
505#endif
506 buffer[used] = '\0';
507 return buffer;
508}
509
510BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
511{
512 /* HACK: We disconnected the transport above, now wait without a read or write lock until all
513 * streams in use have been returned to the pool. */
514 while (timeoutMS > 0)
515 {
516 const size_t used = StreamPool_UsedCount(pool);
517 if (used == 0)
518 return TRUE;
519 WLog_Print(pool->log, WLOG_DEBUG, "%" PRIuz " streams still in use, sleeping...", used);
520
521 char buffer[4096] = WINPR_C_ARRAY_INIT;
522 StreamPool_GetStatistics(pool, buffer, sizeof(buffer));
523 WLog_Print(pool->log, WLOG_TRACE, "Pool statistics: %s", buffer);
524
525 UINT32 diff = 10;
526 if (timeoutMS != INFINITE)
527 {
528 diff = timeoutMS > 10 ? 10 : timeoutMS;
529 timeoutMS -= diff;
530 }
531 Sleep(diff);
532 }
533
534 return FALSE;
535}