ThreadPool.c 13.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * Copyright (C) 2010 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/* ThreadPool */

#include "sles_allinclusive.h"

// Entry point for each worker thread

static void *ThreadPool_start(void *context)
{
    ThreadPool *tp = (ThreadPool *) context;
    assert(NULL != tp);
    for (;;) {
        Closure *pClosure = ThreadPool_remove(tp);
29
        // closure is NULL when thread pool is being destroyed
30
        if (NULL == pClosure) {
31
            break;
32 33 34
        }
        // make a copy of parameters, then free the parameters
        const Closure closure = *pClosure;
35
        free(pClosure);
36 37 38 39 40 41
        // extract parameters and call the right method depending on kind
        ClosureKind kind = closure.mKind;
        void *context1 = closure.mContext1;
        void *context2 = closure.mContext2;
        int parameter1 = closure.mParameter1;
        switch (kind) {
42
          case CLOSURE_KIND_PPI:
43
            {
44
            ClosureHandler_ppi handler_ppi = closure.mHandler.mHandler_ppi;
45 46 47 48
            assert(NULL != handler_ppi);
            (*handler_ppi)(context1, context2, parameter1);
            }
            break;
49
          case CLOSURE_KIND_PPII:
50
            {
51
            ClosureHandler_ppii handler_ppii = closure.mHandler.mHandler_ppii;
52 53 54 55 56
            assert(NULL != handler_ppii);
            int parameter2 = closure.mParameter2;
            (*handler_ppii)(context1, context2, parameter1, parameter2);
            }
            break;
57 58 59 60 61 62 63 64 65 66
          case CLOSURE_KIND_PIIPP:
            {
            ClosureHandler_piipp handler_piipp = closure.mHandler.mHandler_piipp;
            assert(NULL != handler_piipp);
            int parameter2 = closure.mParameter2;
            void *context3 = closure.mContext3;
            (*handler_piipp)(context1, parameter1, parameter2, context2, context3);
            }
            break;
          default:
67 68 69 70
            SL_LOGE("Unexpected callback kind %d", kind);
            assert(false);
            break;
        }
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
    }
    return NULL;
}

#define INITIALIZED_NONE         0
#define INITIALIZED_MUTEX        1
#define INITIALIZED_CONDNOTFULL  2
#define INITIALIZED_CONDNOTEMPTY 4
#define INITIALIZED_ALL          7

static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads);

// Initialize a ThreadPool
// maxClosures defaults to CLOSURE_TYPICAL if 0
// maxThreads defaults to THREAD_TYPICAL if 0

SLresult ThreadPool_init(ThreadPool *tp, unsigned maxClosures, unsigned maxThreads)
{
    assert(NULL != tp);
    memset(tp, 0, sizeof(ThreadPool));
    tp->mShutdown = SL_BOOLEAN_FALSE;
92 93 94
    unsigned initialized = INITIALIZED_NONE;    // which objects were successfully initialized
    unsigned nThreads = 0;                      // number of threads successfully created
    int err;
95
    SLresult result;
96 97 98 99

    // initialize mutex and condition variables
    err = pthread_mutex_init(&tp->mMutex, (const pthread_mutexattr_t *) NULL);
    result = err_to_result(err);
100 101 102
    if (SL_RESULT_SUCCESS != result)
        goto fail;
    initialized |= INITIALIZED_MUTEX;
103 104
    err = pthread_cond_init(&tp->mCondNotFull, (const pthread_condattr_t *) NULL);
    result = err_to_result(err);
105 106 107
    if (SL_RESULT_SUCCESS != result)
        goto fail;
    initialized |= INITIALIZED_CONDNOTFULL;
108 109
    err = pthread_cond_init(&tp->mCondNotEmpty, (const pthread_condattr_t *) NULL);
    result = err_to_result(err);
110 111 112
    if (SL_RESULT_SUCCESS != result)
        goto fail;
    initialized |= INITIALIZED_CONDNOTEMPTY;
113 114

    // use default values for parameters, if not specified explicitly
115 116 117 118 119 120 121 122
    tp->mWaitingNotFull = 0;
    tp->mWaitingNotEmpty = 0;
    if (0 == maxClosures)
        maxClosures = CLOSURE_TYPICAL;
    tp->mMaxClosures = maxClosures;
    if (0 == maxThreads)
        maxThreads = THREAD_TYPICAL;
    tp->mMaxThreads = maxThreads;
123 124

    // initialize circular buffer for closures
125 126 127 128 129 130 131 132 133 134 135
    if (CLOSURE_TYPICAL >= maxClosures) {
        tp->mClosureArray = tp->mClosureTypical;
    } else {
        tp->mClosureArray = (Closure **) malloc((maxClosures + 1) * sizeof(Closure *));
        if (NULL == tp->mClosureArray) {
            result = SL_RESULT_RESOURCE_ERROR;
            goto fail;
        }
    }
    tp->mClosureFront = tp->mClosureArray;
    tp->mClosureRear = tp->mClosureArray;
136 137

    // initialize thread pool
138 139 140 141 142 143 144 145 146 147 148
    if (THREAD_TYPICAL >= maxThreads) {
        tp->mThreadArray = tp->mThreadTypical;
    } else {
        tp->mThreadArray = (pthread_t *) malloc(maxThreads * sizeof(pthread_t));
        if (NULL == tp->mThreadArray) {
            result = SL_RESULT_RESOURCE_ERROR;
            goto fail;
        }
    }
    unsigned i;
    for (i = 0; i < maxThreads; ++i) {
149 150 151
        int err = pthread_create(&tp->mThreadArray[i], (const pthread_attr_t *) NULL,
            ThreadPool_start, tp);
        result = err_to_result(err);
152 153 154 155 156
        if (SL_RESULT_SUCCESS != result)
            goto fail;
        ++nThreads;
    }
    tp->mInitialized = initialized;
157 158

    // done
159
    return SL_RESULT_SUCCESS;
160 161

    // here on any kind of error
162 163 164 165 166 167 168
fail:
    ThreadPool_deinit_internal(tp, initialized, nThreads);
    return result;
}

static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads)
{
169 170
    int ok;

171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
    assert(NULL != tp);
    // Destroy all threads
    if (0 < nThreads) {
        assert(INITIALIZED_ALL == initialized);
        ok = pthread_mutex_lock(&tp->mMutex);
        assert(0 == ok);
        tp->mShutdown = SL_BOOLEAN_TRUE;
        ok = pthread_cond_broadcast(&tp->mCondNotEmpty);
        assert(0 == ok);
        ok = pthread_cond_broadcast(&tp->mCondNotFull);
        assert(0 == ok);
        ok = pthread_mutex_unlock(&tp->mMutex);
        assert(0 == ok);
        unsigned i;
        for (i = 0; i < nThreads; ++i) {
            ok = pthread_join(tp->mThreadArray[i], (void **) NULL);
            assert(ok == 0);
        }
189 190

        // Empty out the circular buffer of closures
191 192
        ok = pthread_mutex_lock(&tp->mMutex);
        assert(0 == ok);
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
        Closure **oldFront = tp->mClosureFront;
        while (oldFront != tp->mClosureRear) {
            Closure **newFront = oldFront;
            if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1])
                newFront = tp->mClosureArray;
            Closure *pClosure = *oldFront;
            assert(NULL != pClosure);
            *oldFront = NULL;
            tp->mClosureFront = newFront;
            ok = pthread_mutex_unlock(&tp->mMutex);
            assert(0 == ok);
            free(pClosure);
            ok = pthread_mutex_lock(&tp->mMutex);
            assert(0 == ok);
        }
208 209 210 211
        ok = pthread_mutex_unlock(&tp->mMutex);
        assert(0 == ok);
        // Note that we can't be sure when mWaitingNotFull will drop to zero
    }
212 213

    // destroy the mutex and condition variables
214 215 216 217 218 219 220 221 222 223 224 225
    if (initialized & INITIALIZED_CONDNOTEMPTY) {
        ok = pthread_cond_destroy(&tp->mCondNotEmpty);
        assert(0 == ok);
    }
    if (initialized & INITIALIZED_CONDNOTFULL) {
        ok = pthread_cond_destroy(&tp->mCondNotFull);
        assert(0 == ok);
    }
    if (initialized & INITIALIZED_MUTEX) {
        ok = pthread_mutex_destroy(&tp->mMutex);
        assert(0 == ok);
    }
226
    tp->mInitialized = INITIALIZED_NONE;
227 228

    // release the closure circular buffer
229 230 231 232
    if (tp->mClosureTypical != tp->mClosureArray && NULL != tp->mClosureArray) {
        free(tp->mClosureArray);
        tp->mClosureArray = NULL;
    }
233 234

    // release the thread pool
235 236 237 238
    if (tp->mThreadTypical != tp->mThreadArray && NULL != tp->mThreadArray) {
        free(tp->mThreadArray);
        tp->mThreadArray = NULL;
    }
239

240 241 242 243 244 245 246
}

void ThreadPool_deinit(ThreadPool *tp)
{
    ThreadPool_deinit_internal(tp, tp->mInitialized, tp->mMaxThreads);
}

247 248 249
// Enqueue a closure to be executed later by a worker thread.
// Note that this raw interface requires an explicit "kind" and full parameter list.
// There are convenience methods below that make this easier to use.
250 251
SLresult ThreadPool_add(ThreadPool *tp, ClosureKind kind, ClosureHandler_generic handler,
        void *context1, void *context2, void *context3, int parameter1, int parameter2)
252 253
{
    assert(NULL != tp);
254 255
    assert(NULL != handler);
    Closure *closure = (Closure *) malloc(sizeof(Closure));
256
    if (NULL == closure) {
257
        return SL_RESULT_RESOURCE_ERROR;
258 259
    }
    closure->mKind = kind;
260 261 262 263 264 265 266 267 268 269 270 271 272 273
    switch(kind) {
      case CLOSURE_KIND_PPI:
        closure->mHandler.mHandler_ppi = (ClosureHandler_ppi)handler;
        break;
      case CLOSURE_KIND_PPII:
        closure->mHandler.mHandler_ppii = (ClosureHandler_ppii)handler;
        break;
      case CLOSURE_KIND_PIIPP:
        closure->mHandler.mHandler_piipp = (ClosureHandler_piipp)handler;
        break;
      default:
        SL_LOGE("ThreadPool_add() invalid closure kind %d", kind);
        assert(false);
    }
274 275
    closure->mContext1 = context1;
    closure->mContext2 = context2;
276
    closure->mContext3 = context3;
277 278
    closure->mParameter1 = parameter1;
    closure->mParameter2 = parameter2;
279 280 281
    int ok;
    ok = pthread_mutex_lock(&tp->mMutex);
    assert(0 == ok);
282 283 284 285 286 287 288
    // can't enqueue while thread pool shutting down
    if (tp->mShutdown) {
        ok = pthread_mutex_unlock(&tp->mMutex);
        assert(0 == ok);
        free(closure);
        return SL_RESULT_PRECONDITIONS_VIOLATED;
    }
289 290 291 292 293
    for (;;) {
        Closure **oldRear = tp->mClosureRear;
        Closure **newRear = oldRear;
        if (++newRear == &tp->mClosureArray[tp->mMaxClosures + 1])
            newRear = tp->mClosureArray;
294
        // if closure circular buffer is full, then wait for it to become non-full
295 296 297 298
        if (newRear == tp->mClosureFront) {
            ++tp->mWaitingNotFull;
            ok = pthread_cond_wait(&tp->mCondNotFull, &tp->mMutex);
            assert(0 == ok);
299
            // can't enqueue while thread pool shutting down
300 301 302 303 304
            if (tp->mShutdown) {
                assert(0 < tp->mWaitingNotFull);
                --tp->mWaitingNotFull;
                ok = pthread_mutex_unlock(&tp->mMutex);
                assert(0 == ok);
305 306
                free(closure);
                return SL_RESULT_PRECONDITIONS_VIOLATED;
307 308 309 310 311
            }
            continue;
        }
        assert(NULL == *oldRear);
        *oldRear = closure;
312
        tp->mClosureRear = newRear;
313
        // if a worker thread was waiting to dequeue, then suggest that it try again
314 315 316 317 318 319 320 321 322
        if (0 < tp->mWaitingNotEmpty) {
            --tp->mWaitingNotEmpty;
            ok = pthread_cond_signal(&tp->mCondNotEmpty);
            assert(0 == ok);
        }
        break;
    }
    ok = pthread_mutex_unlock(&tp->mMutex);
    assert(0 == ok);
323
    return SL_RESULT_SUCCESS;
324 325
}

326
// Called by a worker thread when it is ready to accept the next closure to execute
327 328 329 330 331 332 333
Closure *ThreadPool_remove(ThreadPool *tp)
{
    Closure *pClosure;
    int ok;
    ok = pthread_mutex_lock(&tp->mMutex);
    assert(0 == ok);
    for (;;) {
334 335 336 337 338
        // fail if thread pool is shutting down
        if (tp->mShutdown) {
            pClosure = NULL;
            break;
        }
339
        Closure **oldFront = tp->mClosureFront;
340
        // if closure circular buffer is empty, then wait for it to become non-empty
341 342 343 344
        if (oldFront == tp->mClosureRear) {
            ++tp->mWaitingNotEmpty;
            ok = pthread_cond_wait(&tp->mCondNotEmpty, &tp->mMutex);
            assert(0 == ok);
345
            // try again
346 347
            continue;
        }
348
        // dequeue the closure at front of circular buffer
349
        Closure **newFront = oldFront;
350
        if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1]) {
351
            newFront = tp->mClosureArray;
352
        }
353
        pClosure = *oldFront;
354 355
        assert(NULL != pClosure);
        *oldFront = NULL;
356
        tp->mClosureFront = newFront;
357
        // if a client thread was waiting to enqueue, then suggest that it try again
358 359 360 361 362 363 364 365 366 367 368
        if (0 < tp->mWaitingNotFull) {
            --tp->mWaitingNotFull;
            ok = pthread_cond_signal(&tp->mCondNotFull);
            assert(0 == ok);
        }
        break;
    }
    ok = pthread_mutex_unlock(&tp->mMutex);
    assert(0 == ok);
    return pClosure;
}
369 370 371 372 373 374

// Convenience methods for applications
SLresult ThreadPool_add_ppi(ThreadPool *tp, ClosureHandler_ppi handler,
        void *context1, void *context2, int parameter1)
{
    // function pointers are the same size so this is a safe cast
375 376
    return ThreadPool_add(tp, CLOSURE_KIND_PPI, (ClosureHandler_generic) handler,
            context1, context2, NULL, parameter1, 0);
377 378 379 380 381
}

SLresult ThreadPool_add_ppii(ThreadPool *tp, ClosureHandler_ppii handler,
        void *context1, void *context2, int parameter1, int parameter2)
{
382 383 384 385 386 387 388 389 390 391 392
    // function pointers are the same size so this is a safe cast
    return ThreadPool_add(tp, CLOSURE_KIND_PPII, (ClosureHandler_generic) handler,
            context1, context2, NULL, parameter1, parameter2);
}

SLresult ThreadPool_add_piipp(ThreadPool *tp, ClosureHandler_piipp handler,
        void *cntxt1, int param1, int param2, void *cntxt2, void *cntxt3)
{
    // function pointers are the same size so this is a safe cast
    return ThreadPool_add(tp, CLOSURE_KIND_PIIPP, (ClosureHandler_generic) handler,
            cntxt1, cntxt2, cntxt3, param1, param2);
393
}