20#include <winpr/config.h>
23#include <winpr/wlog.h>
25#include <winpr/collections.h>
29#define XTAG WINPR_TAG("utils.streampool")
31struct s_StreamPoolEntry
33#if defined(WITH_STREAMPOOL_DEBUG)
44 struct s_StreamPoolEntry* aArray;
48 struct s_StreamPoolEntry* uArray;
56static void discard_entry(
struct s_StreamPoolEntry* entry, BOOL discardStream)
61#if defined(WITH_STREAMPOOL_DEBUG)
62 free((
void*)entry->msg);
65 if (discardStream && entry->s)
66 Stream_Free(entry->s, entry->s->isAllocatedStream);
68 const struct s_StreamPoolEntry empty = WINPR_C_ARRAY_INIT;
72static struct s_StreamPoolEntry add_entry(
wStream* s)
74 struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
76#if defined(WITH_STREAMPOOL_DEBUG)
77 void* stack = winpr_backtrace(20);
79 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
80 winpr_backtrace_free(stack);
91static inline void StreamPool_Lock(wStreamPool* pool)
94 if (pool->synchronized)
95 EnterCriticalSection(&pool->lock);
102static inline void StreamPool_Unlock(wStreamPool* pool)
105 if (pool->synchronized)
106 LeaveCriticalSection(&pool->lock);
109static BOOL StreamPool_EnsureCapacity(wStreamPool* pool,
size_t count, BOOL usedOrAvailable)
113 size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
114 size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
115 struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
119 new_cap = *size + count;
120 else if (*size + count > *cap)
121 new_cap = (*size + count + 2) / 2 * 3;
122 else if ((*size + count) < *cap / 3)
127 struct s_StreamPoolEntry* new_arr =
nullptr;
129 if (*cap < *size + count)
133 (
struct s_StreamPoolEntry*)realloc(*array,
sizeof(
struct s_StreamPoolEntry) * new_cap);
146static void StreamPool_ShiftUsed(wStreamPool* pool,
size_t index)
150 const size_t pcount = 1;
151 const size_t off = index + pcount;
152 if (pool->uSize >= off)
154 for (
size_t x = 0; x < pcount; x++)
156 struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
157 discard_entry(cur, FALSE);
159 MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
160 (pool->uSize - index - pcount) *
sizeof(
struct s_StreamPoolEntry));
161 pool->uSize -= pcount;
169static void StreamPool_AddUsed(wStreamPool* pool,
wStream* s)
171 StreamPool_EnsureCapacity(pool, 1, TRUE);
172 pool->uArray[pool->uSize] = add_entry(s);
180static void StreamPool_RemoveUsed(wStreamPool* pool,
wStream* s)
183 for (
size_t index = 0; index < pool->uSize; index++)
185 struct s_StreamPoolEntry* cur = &pool->uArray[index];
188 StreamPool_ShiftUsed(pool, index);
194static void StreamPool_ShiftAvailable(wStreamPool* pool,
size_t index)
198 const size_t pcount = 1;
199 const size_t off = index + pcount;
200 if (pool->aSize >= off)
202 for (
size_t x = 0; x < pcount; x++)
204 struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
205 discard_entry(cur, FALSE);
208 MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
209 (pool->aSize - index - pcount) *
sizeof(
struct s_StreamPoolEntry));
210 pool->aSize -= pcount;
218wStream* StreamPool_Take(wStreamPool* pool,
size_t size)
221 size_t foundIndex = 0;
224 StreamPool_Lock(pool);
227 size = pool->defaultSize;
229 for (
size_t index = 0; index < pool->aSize; index++)
231 struct s_StreamPoolEntry* cur = &pool->aArray[index];
234 if (Stream_Capacity(s) >= size)
244 s = Stream_New(
nullptr, size);
250 Stream_ResetPosition(s);
251 if (!Stream_SetLength(s, Stream_Capacity(s)))
253 StreamPool_ShiftAvailable(pool, foundIndex);
260 StreamPool_AddUsed(pool, s);
264 StreamPool_Unlock(pool);
273static void StreamPool_Remove(wStreamPool* pool,
wStream* s)
275 StreamPool_EnsureCapacity(pool, 1, FALSE);
276 Stream_EnsureValidity(s);
277 for (
size_t x = 0; x < pool->aSize; x++)
279 wStream* cs = pool->aArray[x].s;
283 pool->aArray[(pool->aSize)++] = add_entry(s);
284 StreamPool_RemoveUsed(pool, s);
287static void StreamPool_ReleaseOrReturn(wStreamPool* pool,
wStream* s)
289 StreamPool_Lock(pool);
290 StreamPool_Remove(pool, s);
291 StreamPool_Unlock(pool);
294void StreamPool_Return(wStreamPool* pool,
wStream* s)
300 StreamPool_Lock(pool);
301 StreamPool_Remove(pool, s);
302 StreamPool_Unlock(pool);
328 StreamPool_ReleaseOrReturn(s->pool, s);
330 Stream_Free(s, TRUE);
338wStream* StreamPool_Find(wStreamPool* pool,
const BYTE* ptr)
342 StreamPool_Lock(pool);
344 for (
size_t index = 0; index < pool->uSize; index++)
346 struct s_StreamPoolEntry* cur = &pool->uArray[index];
348 if ((ptr >= Stream_Buffer(cur->s)) &&
349 (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
356 StreamPool_Unlock(pool);
365void StreamPool_Clear(wStreamPool* pool)
367 StreamPool_Lock(pool);
369 for (
size_t x = 0; x < pool->aSize; x++)
371 struct s_StreamPoolEntry* cur = &pool->aArray[x];
372 discard_entry(cur, TRUE);
378 WLog_Print(pool->log, WLOG_WARN,
379 "Clearing StreamPool, but there are %" PRIuz
" streams currently in use",
381 for (
size_t x = 0; x < pool->uSize; x++)
383 struct s_StreamPoolEntry* cur = &pool->uArray[x];
384 discard_entry(cur, TRUE);
389 StreamPool_Unlock(pool);
392size_t StreamPool_UsedCount(wStreamPool* pool)
394 StreamPool_Lock(pool);
395 size_t usize = pool->uSize;
396 StreamPool_Unlock(pool);
404wStreamPool* StreamPool_New(BOOL
synchronized,
size_t defaultSize)
406 wStreamPool* pool = calloc(1,
sizeof(wStreamPool));
411 pool->log = WLog_Create(XTAG, WLog_GetRoot());
415 pool->synchronized =
synchronized;
416 pool->defaultSize = defaultSize;
418 if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
420 if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
423 if (!InitializeCriticalSectionAndSpinCount(&pool->lock, 4000))
428 WINPR_PRAGMA_DIAG_PUSH
429 WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
430 StreamPool_Free(pool);
431 WINPR_PRAGMA_DIAG_POP
435void StreamPool_Free(wStreamPool* pool)
440 StreamPool_Clear(pool);
442 DeleteCriticalSection(&pool->lock);
447 WLog_Discard(pool->log);
451char* StreamPool_GetStatistics(wStreamPool* pool,
char* buffer,
size_t size)
455 if (!buffer || (size < 1))
459 int offset = _snprintf(buffer, size - 1,
460 "aSize =%" PRIuz
", uSize =%" PRIuz
", aCapacity=%" PRIuz
461 ", uCapacity=%" PRIuz,
462 pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
463 if ((offset > 0) && ((
size_t)offset < size))
464 used += (size_t)offset;
466#if defined(WITH_STREAMPOOL_DEBUG)
467 StreamPool_Lock(pool);
469 offset = _snprintf(&buffer[used], size - 1 - used,
"\n-- dump used array take locations --\n");
470 if ((offset > 0) && ((
size_t)offset < size - used))
471 used += (size_t)offset;
472 for (
size_t x = 0; x < pool->uSize; x++)
474 const struct s_StreamPoolEntry* cur = &pool->uArray[x];
475 WINPR_ASSERT(cur->msg || (cur->lines == 0));
477 for (
size_t y = 0; y < cur->lines; y++)
479 offset = _snprintf(&buffer[used], size - 1 - used,
"[%" PRIuz
" | %" PRIuz
"]: %s\n", x,
481 if ((offset > 0) && ((
size_t)offset < size - used))
482 used += (size_t)offset;
486 offset = _snprintf(&buffer[used], size - 1 - used,
"\n-- statistics called from --\n");
487 if ((offset > 0) && ((
size_t)offset < size - used))
488 used += (size_t)offset;
490 struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
491 void* stack = winpr_backtrace(20);
493 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
494 winpr_backtrace_free(stack);
496 for (
size_t x = 0; x < entry.lines; x++)
498 const char* msg = entry.msg[x];
499 offset = _snprintf(&buffer[used], size - 1 - used,
"[%" PRIuz
"]: %s\n", x, msg);
500 if ((offset > 0) && ((
size_t)offset < size - used))
501 used += (size_t)offset;
503 free((
void*)entry.msg);
504 StreamPool_Unlock(pool);
510BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
514 while (timeoutMS > 0)
516 const size_t used = StreamPool_UsedCount(pool);
519 WLog_Print(pool->log, WLOG_DEBUG,
"%" PRIuz
" streams still in use, sleeping...", used);
521 char buffer[4096] = WINPR_C_ARRAY_INIT;
522 StreamPool_GetStatistics(pool, buffer,
sizeof(buffer));
523 WLog_Print(pool->log, WLOG_TRACE,
"Pool statistics: %s", buffer);
526 if (timeoutMS != INFINITE)
528 diff = timeoutMS > 10 ? 10 : timeoutMS;