Android M bluetooth update & 新消息处理机制

Posted by Vector Blog on November 6, 2015

新支持的profile : SAP(SIM access profile)

另外原有的profile也有改变:
同时保持连接两台A2DP Sink 设备 (目前默认仍然只能同时连接一台设备,可以通过修改 persist.bt.max.a2dp.connections 为 2 来支持这个特性) 同时保持连接两台AVRCP controllers设备

虽然是连接两台设备,但同时只能有一台设备收听音乐,收听音乐的设备可以在这两台已连接设备之间切换 (可以通过修改 persist.bt.enable.multicast 为 1 来支持)

BT enable 变化

在之前是直接调用 enableNative() , 现在会首先 enable BLE gatt service . 其中 initializeNative() 中会去获取 bluedroid 中 gatt 的接口,并调用其 init ,init 仅仅是传递上层的 callback

bluedroid 中的流程也有一些改变 :

另外 Android M bluedroid 中的各模块之间的消息处理机制也和之前不一样

创建线程主要通过 thread_new , 参数为 线程的 name :

static const size_t DEFAULT_WORK_QUEUE_CAPACITY = 128;
static const char *BT_WORKQUEUE_NAME = "bt_workqueue";

thread_t *bt_workqueue_thread = thread_new(BT_WORKQUEUE_NAME);

thread_t *thread_new(const char *name) {
  return thread_new_sized(name, DEFAULT_WORK_QUEUE_CAPACITY);
}

最后返回的是一个 thread_t 指针, bluedroid 中的工作线程都是由该结构体来表示 。先来看一下这个结构体

struct thread_t {
  bool is_joined;
  pthread_t pthread;
  pid_t tid;
  char name[THREAD_NAME_MAX + 1];
  reactor_t *reactor;
  fixed_queue_t *work_queue;
};

前4个成员变量比较好理解, 先来看一下 fixed_queue_t :

typedef struct fixed_queue_t {
  list_t *list;	//消息队列实体
  semaphore_t *enqueue_sem;	//用于判断该队列可写
  semaphore_t *dequeue_sem;	//用于通知队列可读
  pthread_mutex_t lock;
  size_t capacity;

  reactor_object_t *dequeue_object;
  fixed_queue_cb dequeue_ready;
  void *dequeue_context;
} fixed_queue_t;

可以看到工作线程中都会包含一个消息队列 , 另外也还可以建立单独的消息队列并调用 fixed_queue_register_dequeue 来注册到相应的工作线程处理 。

先来看工作线程的建立 :

thread_t *thread_new_sized(const char *name, size_t work_queue_capacity) {
  thread_t *ret = osi_calloc(sizeof(thread_t));

  ret->reactor = reactor_new();
	{	 //reactor_new
	  	  reactor_t *ret = (reactor_t *)osi_calloc(sizeof(reactor_t));
	  	  // 创建 epoll 句柄, 最大监听 64 个fd,
	  	  ret->epoll_fd = epoll_create(MAX_EVENTS);
	  	  // 创建 eventfd , 主要用来做事件通知
	  	  ret->event_fd = eventfd(0, 0);
	  	  pthread_mutex_init(&ret->list_lock, NULL);
	  	  ret->invalidation_list = list_new(NULL);
	  	  struct epoll_event event;
	  	  memset(&event, 0, sizeof(event));
	  	  event.events = EPOLLIN;
	  	  event.data.ptr = NULL;
	  	  // 将 event_fd 加入监听, 监听事件为 EPOLLIN
	  	  epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event);
	}	

  ret->work_queue = fixed_queue_new(work_queue_capacity);
	{	  //fixed_queue_new
		  fixed_queue_t *ret = osi_calloc(sizeof(fixed_queue_t))
	  	  pthread_mutex_init(&ret->lock, NULL);
		  ret->capacity = capacity;
		  ret->list = list_new(NULL);
	  	  ret->enqueue_sem = semaphore_new(capacity);
		  ret->dequeue_sem = semaphore_new(0);
	}

  // Start is on the stack, but we use a semaphore, so it's safe
  struct start_arg start;

  strncpy(ret->name, name, THREAD_NAME_MAX);
  start.thread = ret;
  // 创建工作线程实体
  pthread_create(&ret->pthread, NULL, run_thread, &start);
  ......
  return ret;

error:
  ......
}

实际执行函数 run_thread 为 :

static void *run_thread(void *start_arg) {
  thread_t *thread = start->thread;
  //首先更改线程的名称
  if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) {
    ......
    return NULL;
  }
  //保存 tid
  thread->tid = gettid();

  // 获取到 thread->work_queue 中的 dequeue_sem
  int fd = fixed_queue_get_dequeue_fd(thread->work_queue);
  void *context = thread->work_queue;

  //监听工作线程中自带的消息队列 , ,其事件处理函数为 work_queue_read_cb
  reactor_object_t *work_queue_object = reactor_register(thread->reactor, fd, context, work_queue_read_cb, NULL);
  reactor_start(thread->reactor);		//事件处理循环
	{//reactor_start
  	  if(reactor)
     	    return run_reactor(reactor, 0);	//0表示死循环
	}
  reactor_unregister(work_queue_object);
  ......

  return NULL;
}

事件处理主要都是在 reactor_xxx 这几个函数中, 先来看 reactor_register ,其作用是将指定消息队列添加到工作线程中来监听和处理,并指定其event 处理函数, 其参数为
1.工作线程的 reactor ; 2. 代表消息队列可读的 fd 句柄 ; 3. 消息队列 ;4. event 写函数 ; 5. event 读函数

reactor_object_t *reactor_register(reactor_t *reactor,
    int fd, void *context,
    void (*read_ready)(void *context),
    void (*write_ready)(void *context)) {

  reactor_object_t *object =
      (reactor_object_t *)osi_calloc(sizeof(reactor_object_t));
      {	//epoll_wait 监听到的 event 的 data.ptr 指向该 object
		struct reactor_object_t {
		  int fd;                              // 监听的表示可读写的句柄
		  void *context;                       // 传给 *_ready 读写函数的数据, 为消息队列
		  reactor_t *reactor;                  // 工作线程的 reactor
		  pthread_mutex_t lock;               

		  void (*read_ready)(void *context);   
		  void (*write_ready)(void *context); 
		};      
      }
  object->reactor = reactor;
  object->fd = fd;
  object->context = context;	//消息队列
  object->read_ready = read_ready;	//internal_dequeue_ready
  object->write_ready = write_ready;
  pthread_mutex_init(&object->lock, NULL);

  struct epoll_event event;
  memset(&event, 0, sizeof(event));
  if (read_ready)
    event.events |= (EPOLLIN | EPOLLRDHUP);
  if (write_ready)
    event.events |= EPOLLOUT;
  event.data.ptr = object;

  // 监听消息队列的 dequeue_sem 的可读写消息
  if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
    LOG_ERROR(LOG_TAG, "%s unable to register fd %d to epoll set: %s", __func__, fd, strerror(errno));
    pthread_mutex_destroy(&object->lock);
    osi_free(object);
    return NULL;
  }
  return object;
}

工作线程中的实际一直保持运行的部分为 run_reactor , 其中第2个参数为0, 代表死循环一直保持监听, 在 bluedroid 中有提供 reactor_run_once 其调用参数为1,但没有看到有调用这个函数的地方

static reactor_status_t run_reactor(reactor_t *reactor, int iterations) {
  // reactor 中保存当前线程句柄及运行状态
  reactor->run_thread = pthread_self();
  reactor->is_running = true;

  struct epoll_event events[MAX_EVENTS];
  //传进来的 iterations 为0 , 所以此处为死循环
  for (int i = 0; iterations == 0 || i < iterations; ++i) {
    pthread_mutex_lock(&reactor->list_lock);
    list_clear(reactor->invalidation_list);
    pthread_mutex_unlock(&reactor->list_lock);

    int ret;
    do {
      // 开始监听 reactor->epoll_fd 集合中的 I/O 事件, -1 代表阻塞
      ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1);
    } while (ret == -1 && errno == EINTR);

    if (ret == -1) {
      reactor->is_running = false;
      return REACTOR_STATUS_ERROR;
    }

    for (int j = 0; j < ret; ++j) {
      // 这里后面会用到, 会唯一监听一个数据指针为 NULL 的事件, 代表退出循环 
      if (events[j].data.ptr == NULL) {
        eventfd_t value;
        eventfd_read(reactor->event_fd, &value);
        reactor->is_running = false;
        return REACTOR_STATUS_STOP;
      }

      reactor_object_t *object = (reactor_object_t *)events[j].data.ptr;
      ......

      reactor->object_removed = false;
      /// 处理可读事件 
      if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready)
        object->read_ready(object->context);	
      //处理可写事件
      if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready)
        object->write_ready(object->context);
      pthread_mutex_unlock(&object->lock);

      if (reactor->object_removed) {
        pthread_mutex_destroy(&object->lock);
        osi_free(object);
      }
    }
  }
  
  reactor->is_running = false;
  return REACTOR_STATUS_DONE;
}

先来看向工作线程自己的消息队列发送消息

thread_post(bt_workqueue_thread, btu_task_start_up, NULL);

bool thread_post(thread_t *thread, thread_fn func, void *context) {
  //将数据填充到 item
  work_item_t *item = (work_item_t *)osi_malloc(sizeof(work_item_t));
  item->func = func;
  item->context = context;

  //看第一个参数, 此处为工作线程 thread 中的work_queue
  fixed_queue_enqueue(thread->work_queue, item);
  return true;
}

void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
  semaphore_wait(queue->enqueue_sem);

  pthread_mutex_lock(&queue->lock);
  list_append(queue->list, data);
  pthread_mutex_unlock(&queue->lock);

  //通知该消息队列已经可读了
  semaphore_post(queue->dequeue_sem);
}

再来看其处理函数为默认设置的 work_queue_read_cb

static void work_queue_read_cb(void *context) {

  fixed_queue_t *queue = (fixed_queue_t *)context;
  work_item_t *item = fixed_queue_dequeue(queue);
  item->func(item->context);
  osi_free(item);
}

所以最后是用传来的消息体中的 func 来处理, 也就是 thread_post 的后面两个参数,一个是函数,一个是传入参数

在来看一下工作线程中绑定对应的消息队列

  btu_bta_msg_queue = fixed_queue_new(SIZE_MAX);

  fixed_queue_register_dequeue(btu_bta_msg_queue,
      thread_get_reactor(bt_workqueue_thread),
      btu_bta_msg_ready,
      NULL);

/*	queue  消息队列
*	reactor  处理该消息队列的工作线程的reactor
*	ready_cb   read
*	context   意义不明,bluedroid 中调用全部为 NULL
*/
void fixed_queue_register_dequeue(fixed_queue_t *queue, reactor_t *reactor, fixed_queue_cb ready_cb, void *context) {
  // 保证一个消息队列中 register 一次
  fixed_queue_unregister_dequeue(queue);

   //queue 为前面的 btu_bta_msg_queue 消息队列
  queue->dequeue_ready = ready_cb;
  queue->dequeue_context = context;
  queue->dequeue_object = reactor_register(
    reactor, 	//工作线程的 reactor
    fixed_queue_get_dequeue_fd(queue),   //获取消息队列的 dequeue_sem
    queue,
    internal_dequeue_ready,
    NULL
  );
}

可以看到这里只注册了 read_ready : internal_dequeue_ready

static void internal_dequeue_ready(void *context) {
  fixed_queue_t *queue = context;
  queue->dequeue_ready(queue, queue->dequeue_context);
}

而这里 queue->dequeue_ready 就是 fixed_queue_register_dequeue 的第三个参数, 而 queue->dequeue_context 则一般为NULL
向注册好的消息队列发消息, 示例 :

void bta_sys_sendmsg(void *p_msg)
{
    if (btu_bta_msg_queue)
    	//第一个参数为已注册的消息队列,代表消息是添加到该队列中
        fixed_queue_enqueue(btu_bta_msg_queue, p_msg);
}