btif_sock_thread.cc 16.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/******************************************************************************
 *
 *  Copyright (C) 2009-2012 Broadcom Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at:
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 ******************************************************************************/

/************************************************************************************
 *
21
 *  Filename:      btif_sock_thread.cc
22 23 24 25 26
 *
 *  Description:   socket select thread
 *
 ***********************************************************************************/

27 28 29
#define LOG_TAG "bt_btif_sock"

#include "btif_sock_thread.h"
30

31 32
#include <alloca.h>
#include <ctype.h>
33
#include <errno.h>
34
#include <fcntl.h>
35
#include <features.h>
36 37 38 39
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
40
#include <string.h>
41 42
#include <sys/poll.h>
#include <sys/select.h>
43
#include <sys/socket.h>
44
#include <sys/types.h>
45 46 47 48
#include <sys/un.h>
#include <time.h>
#include <unistd.h>

49 50
#include <mutex>

51
#include "bta_api.h"
52
#include "btif_common.h"
53 54
#include "btif_sock.h"
#include "btif_sock_util.h"
55 56
#include "btif_util.h"
#include "osi/include/socket_utils/sockets.h"
57

Marie Janssen's avatar
Marie Janssen committed
58
#define asrt(s) if(!(s)) APPL_TRACE_ERROR("## %s assert %s failed at line:%d ##",__func__, #s, __LINE__)
59
#define print_events(events) do { \
60
    APPL_TRACE_DEBUG("print poll event:%x", events); \
61 62 63 64 65 66 67
    if ((events) & POLLIN) APPL_TRACE_DEBUG(  "   POLLIN "); \
    if ((events) & POLLPRI) APPL_TRACE_DEBUG( "   POLLPRI "); \
    if ((events) & POLLOUT) APPL_TRACE_DEBUG( "   POLLOUT "); \
    if ((events) & POLLERR) APPL_TRACE_DEBUG( "   POLLERR "); \
    if ((events) & POLLHUP) APPL_TRACE_DEBUG( "   POLLHUP "); \
    if ((events) & POLLNVAL) APPL_TRACE_DEBUG("   POLLNVAL "); \
    if ((events) & POLLRDHUP) APPL_TRACE_DEBUG("   POLLRDHUP"); \
68 69 70 71 72 73 74 75 76 77 78 79
    } while(0)

#define MAX_THREAD 8
#define MAX_POLL 64
#define POLL_EXCEPTION_EVENTS (POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)
#define IS_EXCEPTION(e) ((e) & POLL_EXCEPTION_EVENTS)
#define IS_READ(e) ((e) & POLLIN)
#define IS_WRITE(e) ((e) & POLLOUT)
/*cmd executes in socket poll thread */
#define CMD_WAKEUP       1
#define CMD_EXIT         2
#define CMD_ADD_FD       3
80 81
#define CMD_REMOVE_FD    4
#define CMD_USER_PRIVATE 5
82 83 84 85 86 87 88 89 90 91 92 93

typedef struct {
    struct pollfd pfd;
    uint32_t user_id;
    int type;
    int flags;
} poll_slot_t;
typedef struct {
    int cmd_fdr, cmd_fdw;
    int poll_count;
    poll_slot_t ps[MAX_POLL];
    int psi[MAX_POLL]; //index of poll slot
Ian Coolidge's avatar
Ian Coolidge committed
94
    volatile pthread_t thread_id;
95 96 97 98 99 100 101 102 103 104 105
    btsock_signaled_cb callback;
    btsock_cmd_cb cmd_callback;
    int used;
} thread_slot_t;
static thread_slot_t ts[MAX_THREAD];

static void *sock_poll_thread(void *arg);
static inline void close_cmd_fd(int h);

static inline void add_poll(int h, int fd, int type, int flags, uint32_t user_id);

106
static std::recursive_mutex thread_slot_lock;
107

Ian Coolidge's avatar
Ian Coolidge committed
108 109
static inline int create_thread(void *(*start_routine)(void *), void * arg,
                                pthread_t * thread_id)
110 111 112 113
{
    pthread_attr_t thread_attr;
    pthread_attr_init(&thread_attr);
    pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
114 115
    int policy;
    int min_pri=0;
116
    int ret = -1;
117
    struct sched_param param;
Ian Coolidge's avatar
Ian Coolidge committed
118

119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
    if ((ret = pthread_create(thread_id, &thread_attr, start_routine, arg))!=0 )
    {
        APPL_TRACE_ERROR("pthread_create : %s", strerror(errno));
        return ret;
    }
    /* We need to lower the priority of this thread to ensure the stack gets
     * priority over transfer to a socket */
    pthread_getschedparam(*thread_id, &policy, &param);
    min_pri = sched_get_priority_min(policy);
    if (param.sched_priority > min_pri) {
        param.sched_priority -= 1;
    }
    pthread_setschedparam(*thread_id, policy, &param);
    return ret;
}
134 135 136
static void init_poll(int cmd_fd);
static int alloc_thread_slot()
{
137
    std::unique_lock<std::recursive_mutex> lock(thread_slot_lock);
138
    int i;
139
    // reversed order to save guard uninitialized access to 0 index
140 141
    for(i = MAX_THREAD - 1; i >=0; i--)
    {
142
        APPL_TRACE_DEBUG("ts[%d].used:%d", i, ts[i].used);
143 144 145 146 147 148
        if(!ts[i].used)
        {
            ts[i].used = 1;
            return i;
        }
    }
149
    APPL_TRACE_ERROR("execeeded max thread count");
150 151 152 153 154 155 156 157 158
    return -1;
}
static void free_thread_slot(int h)
{
    if(0 <= h && h < MAX_THREAD)
    {
        close_cmd_fd(h);
        ts[h].used = 0;
    }
159
    else APPL_TRACE_ERROR("invalid thread handle:%d", h);
160 161 162 163
}
int btsock_thread_init()
{
    static int initialized;
164
    APPL_TRACE_DEBUG("in initialized:%d", initialized);
165 166 167 168 169 170 171 172 173 174 175 176 177 178
    if(!initialized)
    {
        initialized = 1;
        int h;
        for(h = 0; h < MAX_THREAD; h++)
        {
            ts[h].cmd_fdr = ts[h].cmd_fdw = -1;
            ts[h].used = 0;
            ts[h].thread_id = -1;
            ts[h].poll_count = 0;
            ts[h].callback = NULL;
            ts[h].cmd_callback = NULL;
        }
    }
Marie Janssen's avatar
Marie Janssen committed
179
    return true;
180 181 182 183 184
}
int btsock_thread_create(btsock_signaled_cb callback, btsock_cmd_cb cmd_callback)
{
    asrt(callback || cmd_callback);
    int h = alloc_thread_slot();
185
    APPL_TRACE_DEBUG("alloc_thread_slot ret:%d", h);
186 187 188
    if(h >= 0)
    {
        init_poll(h);
Ian Coolidge's avatar
Ian Coolidge committed
189 190 191
        pthread_t thread;
        int status = create_thread(sock_poll_thread, (void*)(uintptr_t)h, &thread);
        if (status)
192
        {
Ian Coolidge's avatar
Ian Coolidge committed
193
            APPL_TRACE_ERROR("create_thread failed: %s", strerror(status));
194
            free_thread_slot(h);
Ian Coolidge's avatar
Ian Coolidge committed
195
            return -1;
196
        }
Ian Coolidge's avatar
Ian Coolidge committed
197 198 199 200 201

        ts[h].thread_id = thread;
        APPL_TRACE_DEBUG("h:%d, thread id:%d", h, ts[h].thread_id);
        ts[h].callback = callback;
        ts[h].cmd_callback = cmd_callback;
202 203 204 205 206 207 208 209 210 211
    }
    return h;
}

/* create dummy socket pair used to wake up select loop */
static inline void init_cmd_fd(int h)
{
    asrt(ts[h].cmd_fdr == -1 && ts[h].cmd_fdw == -1);
    if(socketpair(AF_UNIX, SOCK_STREAM, 0, &ts[h].cmd_fdr) < 0)
    {
212
        APPL_TRACE_ERROR("socketpair failed: %s", strerror(errno));
213 214
        return;
    }
215
    APPL_TRACE_DEBUG("h:%d, cmd_fdr:%d, cmd_fdw:%d", h, ts[h].cmd_fdr, ts[h].cmd_fdw);
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
    //add the cmd fd for read & write
    add_poll(h, ts[h].cmd_fdr, 0, SOCK_THREAD_FD_RD, 0);
}
static inline void close_cmd_fd(int h)
{
    if(ts[h].cmd_fdr != -1)
    {
        close(ts[h].cmd_fdr);
        ts[h].cmd_fdr = -1;
    }
    if(ts[h].cmd_fdw != -1)
    {
        close(ts[h].cmd_fdw);
        ts[h].cmd_fdw = -1;
    }
}
typedef struct
{
    int id;
    int fd;
    int type;
    int flags;
    uint32_t user_id;
} sock_cmd_t;
int btsock_thread_add_fd(int h, int fd, int type, int flags, uint32_t user_id)
{
    if(h < 0 || h >= MAX_THREAD)
    {
244
        APPL_TRACE_ERROR("invalid bt thread handle:%d", h);
Marie Janssen's avatar
Marie Janssen committed
245
        return false;
246 247 248
    }
    if(ts[h].cmd_fdw == -1)
    {
249
        APPL_TRACE_ERROR("cmd socket is not created. socket thread may not initialized");
Marie Janssen's avatar
Marie Janssen committed
250
        return false;
251 252 253 254 255 256 257 258 259
    }
    if(flags & SOCK_THREAD_ADD_FD_SYNC)
    {
        //must executed in socket poll thread
        if(ts[h].thread_id == pthread_self())
        {
            //cleanup one-time flags
            flags &= ~SOCK_THREAD_ADD_FD_SYNC;
            add_poll(h, fd, type, flags, user_id);
Marie Janssen's avatar
Marie Janssen committed
260
            return true;
261
        }
262
        APPL_TRACE_DEBUG("THREAD_ADD_FD_SYNC is not called in poll thread, fallback to async");
263 264
    }
    sock_cmd_t cmd = {CMD_ADD_FD, fd, type, flags, user_id};
265
    APPL_TRACE_DEBUG("adding fd:%d, flags:0x%x", fd, flags);
266 267 268 269 270

    ssize_t ret;
    OSI_NO_INTR(ret = send(ts[h].cmd_fdw, &cmd, sizeof(cmd), 0));

    return ret == sizeof(cmd);
271
}
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286

bool btsock_thread_remove_fd_and_close(int thread_handle, int fd)
{
    if (thread_handle < 0 || thread_handle >= MAX_THREAD)
    {
        APPL_TRACE_ERROR("%s invalid thread handle: %d", __func__, thread_handle);
        return false;
    }
    if (fd == -1)
    {
        APPL_TRACE_ERROR("%s invalid file descriptor.", __func__);
        return false;
    }

    sock_cmd_t cmd = {CMD_REMOVE_FD, fd, 0, 0, 0};
287 288 289 290 291

    ssize_t ret;
    OSI_NO_INTR(ret = send(ts[thread_handle].cmd_fdw, &cmd, sizeof(cmd), 0));

    return ret == sizeof(cmd);
292 293
}

294 295 296 297
int btsock_thread_post_cmd(int h, int type, const unsigned char* data, int size, uint32_t user_id)
{
    if(h < 0 || h >= MAX_THREAD)
    {
298
        APPL_TRACE_ERROR("invalid bt thread handle:%d", h);
Marie Janssen's avatar
Marie Janssen committed
299
        return false;
300 301 302
    }
    if(ts[h].cmd_fdw == -1)
    {
303
        APPL_TRACE_ERROR("cmd socket is not created. socket thread may not initialized");
Marie Janssen's avatar
Marie Janssen committed
304
        return false;
305 306
    }
    sock_cmd_t cmd = {CMD_USER_PRIVATE, 0, type, size, user_id};
307
    APPL_TRACE_DEBUG("post cmd type:%d, size:%d, h:%d, ", type, size, h);
308 309 310 311 312 313 314 315 316 317 318 319 320
    sock_cmd_t* cmd_send = &cmd;
    int size_send = sizeof(cmd);
    if(data && size)
    {
        size_send = sizeof(cmd) + size;
        cmd_send = (sock_cmd_t*)alloca(size_send);
        if(cmd_send)
        {
            *cmd_send = cmd;
            memcpy(cmd_send + 1, data, size);
        }
        else
        {
321
            APPL_TRACE_ERROR("alloca failed at h:%d, cmd type:%d, size:%d", h, type, size_send);
Marie Janssen's avatar
Marie Janssen committed
322
            return false;
323 324
        }
    }
325 326 327 328 329

    ssize_t ret;
    OSI_NO_INTR(ret = send(ts[h].cmd_fdw, cmd_send, size_send, 0));

    return ret == size_send;
330 331 332 333 334
}
int btsock_thread_wakeup(int h)
{
    if(h < 0 || h >= MAX_THREAD)
    {
335
        APPL_TRACE_ERROR("invalid bt thread handle:%d", h);
Marie Janssen's avatar
Marie Janssen committed
336
        return false;
337 338 339
    }
    if(ts[h].cmd_fdw == -1)
    {
340
        APPL_TRACE_ERROR("thread handle:%d, cmd socket is not created", h);
Marie Janssen's avatar
Marie Janssen committed
341
        return false;
342 343
    }
    sock_cmd_t cmd = {CMD_WAKEUP, 0, 0, 0, 0};
344 345 346 347 348

    ssize_t ret;
    OSI_NO_INTR(ret = send(ts[h].cmd_fdw, &cmd, sizeof(cmd), 0));

    return ret == sizeof(cmd);
349 350 351 352 353
}
int btsock_thread_exit(int h)
{
    if(h < 0 || h >= MAX_THREAD)
    {
354
        APPL_TRACE_ERROR("invalid bt thread handle:%d", h);
Marie Janssen's avatar
Marie Janssen committed
355
        return false;
356 357 358
    }
    if(ts[h].cmd_fdw == -1)
    {
359
        APPL_TRACE_ERROR("cmd socket is not created");
Marie Janssen's avatar
Marie Janssen committed
360
        return false;
361 362
    }
    sock_cmd_t cmd = {CMD_EXIT, 0, 0, 0, 0};
363 364 365 366 367

    ssize_t ret;
    OSI_NO_INTR(ret = send(ts[h].cmd_fdw, &cmd, sizeof(cmd), 0));

    if (ret == sizeof(cmd)) {
368 369
        pthread_join(ts[h].thread_id, 0);
        free_thread_slot(h);
Marie Janssen's avatar
Marie Janssen committed
370
        return true;
371
    }
Marie Janssen's avatar
Marie Janssen committed
372
    return false;
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
}
static void init_poll(int h)
{
    int i;
    ts[h].poll_count = 0;
    ts[h].thread_id = -1;
    ts[h].callback = NULL;
    ts[h].cmd_callback = NULL;
    for(i = 0; i < MAX_POLL; i++)
    {
        ts[h].ps[i].pfd.fd = -1;
        ts[h].psi[i] = -1;
    }
    init_cmd_fd(h);
}
static inline unsigned int flags2pevents(int flags)
{
    unsigned int pevents = 0;
    if(flags & SOCK_THREAD_FD_WR)
        pevents |= POLLOUT;
    if(flags & SOCK_THREAD_FD_RD)
        pevents |= POLLIN;
    pevents |= POLL_EXCEPTION_EVENTS;
    return pevents;
}

static inline void set_poll(poll_slot_t* ps, int fd, int type, int flags, uint32_t user_id)
{
    ps->pfd.fd = fd;
    ps->user_id = user_id;
    if(ps->type != 0 && ps->type != type)
404
        APPL_TRACE_ERROR("poll socket type should not changed! type was:%d, type now:%d", ps->type, type);
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
    ps->type = type;
    ps->flags = flags;
    ps->pfd.events = flags2pevents(flags);
    ps->pfd.revents = 0;
}
static inline void add_poll(int h, int fd, int type, int flags, uint32_t user_id)
{
    asrt(fd != -1);
    int i;
    int empty = -1;
    poll_slot_t* ps = ts[h].ps;

    for(i = 0; i < MAX_POLL; i++)
    {
        if(ps[i].pfd.fd == fd)
        {
            asrt(ts[h].poll_count < MAX_POLL);

            set_poll(&ps[i], fd, type, flags | ps[i].flags, user_id);
            return;
        }
        else if(empty < 0 && ps[i].pfd.fd == -1)
            empty = i;
    }
    if(empty >= 0)
    {
        asrt(ts[h].poll_count < MAX_POLL);
        set_poll(&ps[empty], fd, type, flags, user_id);
        ++ts[h].poll_count;
        return;
    }
436
    APPL_TRACE_ERROR("exceeded max poll slot:%d!", MAX_POLL);
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
}
static inline void remove_poll(int h, poll_slot_t* ps, int flags)
{
    if(flags == ps->flags)
    {
        //all monitored events signaled. To remove it, just clear the slot
        --ts[h].poll_count;
        memset(ps, 0, sizeof(*ps));
        ps->pfd.fd = -1;
    }
    else
    {
        //one read or one write monitor event signaled, removed the accordding bit
        ps->flags &= ~flags;
        //update the poll events mask
        ps->pfd.events = flags2pevents(ps->flags);
    }
}
static int process_cmd_sock(int h)
{
    sock_cmd_t cmd = {-1, 0, 0, 0, 0};
    int fd = ts[h].cmd_fdr;
459 460 461 462 463

    ssize_t ret;
    OSI_NO_INTR(ret = recv(fd, &cmd, sizeof(cmd), MSG_WAITALL));

    if (ret != sizeof(cmd))
464
    {
465
        APPL_TRACE_ERROR("recv cmd errno:%d", errno);
Marie Janssen's avatar
Marie Janssen committed
466
        return false;
467
    }
468
    APPL_TRACE_DEBUG("cmd.id:%d", cmd.id);
469 470 471 472 473
    switch(cmd.id)
    {
        case CMD_ADD_FD:
            add_poll(h, cmd.fd, cmd.type, cmd.flags, cmd.user_id);
            break;
474 475 476 477 478 479 480 481 482 483 484 485
        case CMD_REMOVE_FD:
            for (int i = 1; i < MAX_POLL; ++i)
            {
                poll_slot_t *poll_slot = &ts[h].ps[i];
                if (poll_slot->pfd.fd == cmd.fd)
                {
                    remove_poll(h, poll_slot, poll_slot->flags);
                    break;
                }
            }
            close(cmd.fd);
            break;
486 487 488 489 490 491 492 493
        case CMD_WAKEUP:
            break;
        case CMD_USER_PRIVATE:
            asrt(ts[h].cmd_callback);
            if(ts[h].cmd_callback)
                ts[h].cmd_callback(fd, cmd.type, cmd.flags, cmd.user_id);
            break;
        case CMD_EXIT:
Marie Janssen's avatar
Marie Janssen committed
494
            return false;
495
        default:
496
            APPL_TRACE_DEBUG("unknown cmd: %d", cmd.id);
497 498
             break;
    }
Marie Janssen's avatar
Marie Janssen committed
499
    return true;
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
}
static void process_data_sock(int h, struct pollfd *pfds, int count)
{
    asrt(count <= ts[h].poll_count);
    int i;
    for( i= 1; i < ts[h].poll_count; i++)
    {
        if(pfds[i].revents)
        {
            int ps_i = ts[h].psi[i];
            asrt(pfds[i].fd == ts[h].ps[ps_i].pfd.fd);
            uint32_t user_id = ts[h].ps[ps_i].user_id;
            int type = ts[h].ps[ps_i].type;
            int flags = 0;
            print_events(pfds[i].revents);
            if(IS_READ(pfds[i].revents))
            {
                flags |= SOCK_THREAD_FD_RD;
            }
            if(IS_WRITE(pfds[i].revents))
            {
                flags |= SOCK_THREAD_FD_WR;
            }
            if(IS_EXCEPTION(pfds[i].revents))
            {
                flags |= SOCK_THREAD_FD_EXCEPTION;
                //remove the whole slot not flags
                remove_poll(h, &ts[h].ps[ps_i], ts[h].ps[ps_i].flags);
            }
            else if(flags)
                 remove_poll(h, &ts[h].ps[ps_i], flags); //remove the monitor flags that already processed
            if(flags)
                ts[h].callback(pfds[i].fd, type, flags, user_id);
        }
    }
}

static void prepare_poll_fds(int h, struct pollfd* pfds)
{
    int count = 0;
    int ps_i = 0;
    int pfd_i = 0;
    asrt(ts[h].poll_count <= MAX_POLL);
    memset(pfds, 0, sizeof(pfds[0])*ts[h].poll_count);
    while(count < ts[h].poll_count)
    {
        if(ps_i >= MAX_POLL)
        {
548
            APPL_TRACE_ERROR("exceed max poll range, ps_i:%d, MAX_POLL:%d, count:%d, ts[h].poll_count:%d",
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565
                    ps_i, MAX_POLL, count, ts[h].poll_count);
            return;
        }
        if(ts[h].ps[ps_i].pfd.fd >= 0)
        {
            pfds[pfd_i] =  ts[h].ps[ps_i].pfd;
            ts[h].psi[pfd_i] = ps_i;
            count++;
            pfd_i++;
        }
        ps_i++;
    }
}
static void *sock_poll_thread(void *arg)
{
    struct pollfd pfds[MAX_POLL];
    memset(pfds, 0, sizeof(pfds));
Kévin PETIT's avatar
Kévin PETIT committed
566
    int h = (intptr_t)arg;
567 568 569
    for(;;)
    {
        prepare_poll_fds(h, pfds);
570 571
        int ret;
        OSI_NO_INTR(ret = poll(pfds, ts[h].poll_count, -1));
572 573
        if(ret == -1)
        {
574
            APPL_TRACE_ERROR("poll ret -1, exit the thread, errno:%d, err:%s", errno, strerror(errno));
575 576 577 578
            break;
        }
        if(ret != 0)
        {
Marie Janssen's avatar
Marie Janssen committed
579
            int need_process_data_fd = true;
580 581 582 583 584
            if(pfds[0].revents) //cmd fd always is the first one
            {
                asrt(pfds[0].fd == ts[h].cmd_fdr);
                if(!process_cmd_sock(h))
                {
585
                    APPL_TRACE_DEBUG("h:%d, process_cmd_sock return false, exit...", h);
586 587 588
                    break;
                }
                if(ret == 1)
Marie Janssen's avatar
Marie Janssen committed
589
                    need_process_data_fd = false;
590 591 592 593 594
                else ret--; //exclude the cmd fd
            }
            if(need_process_data_fd)
                process_data_sock(h, pfds, ret);
        }
595
        else {APPL_TRACE_DEBUG("no data, select ret: %d", ret)};
596 597
    }
    ts[h].thread_id = -1;
598
    APPL_TRACE_DEBUG("socket poll thread exiting, h:%d", h);
599 600
    return 0;
}