tec_feng 发表于 2015-10-11 15:11:54

xen网络后端驱动分析(发送篇)

  这篇来研究netback的报文发送,后端设备创建过程中会调用netif_map,映射tx, rx的IO ring,同时绑定event channel中断。当netfront要发送skb时,会触发irq让后端发送,对应的ISR为netif_be_int
  irqreturn_t netif_be_int(int irq, void *dev_id)
{
    struct xen_netif *netif = dev_id;

    add_to_net_schedule_list_tail(netif);
    maybe_schedule_tx_action();


  直接触发net_tx_action软中断发送
  if (netif_schedulable(netif) && !netbk_queue_full(netif))
      netif_wake_queue(netif->dev);

netif_wake_queue最终取出qdisc上queue的skb,触发net_tx_action软中断发送

    return IRQ_HANDLED;
}

  

  netback报文发送会用到如下数据结构
  static struct page **mmap_pages

  mmap_pages是后端用来让前端映射的页数组,前端把需要发送的sg类型skb对应的page通过grant table映射到后端mmap_pages指向的页(如果映射失败,那么拷贝skb内容到page中)。对于这些page而言,page->mapping - 1被用来存放page在mmap_pages中的index,e.g.
  static inline void netif_set_page_index(struct page *pg, unsigned int index)
{
    *(unsigned long *)&pg->mapping = index + 1;
}

static inline int netif_page_index(struct page *pg)
{
    unsigned long idx = (unsigned long)pg->mapping - 1;

    if (!PageForeign(pg))
      return -1;

    if ((idx >= MAX_PENDING_REQS) || (mmap_pages != pg))
      return -1;

    return idx;
}

  netback_init的初始化例程会初始化好mmap_pages数组,
  mmap_pages = alloc_empty_pages_and_pagevec(MAX_PENDING_REQS);

  for (i = 0; i < MAX_PENDING_REQS; i&#43;&#43;) {
      page = mmap_pages;
      SetPageForeign(page, netif_page_release);
      netif_set_page_index(page, i);
      INIT_LIST_HEAD(&pending_inuse.list);
    }

  SetPageForeign把mmap_pages中的page设置成为foreign,这是xen支持的一种特殊的page,大致是说page的owener是其他的domain。同时page的析构函数为netif_page_release,函数通过page查找到mmap_pages的index,然后调用了netif_idx_release
  static void netif_idx_release(u16 pending_idx)
{
    static DEFINE_SPINLOCK(_lock);
    unsigned long flags;

    spin_lock_irqsave(&_lock, flags);
    dealloc_ring = pending_idx;
    /* Sync with net_tx_action_dealloc: insert idx /then/ incr producer. */
    smp_wmb();
    dealloc_prod&#43;&#43;;
    spin_unlock_irqrestore(&_lock, flags);

    tasklet_schedule(&net_tx_tasklet);
}

  netif_idx_release把mmap_pages中第pending_idx个page挂到dealloc_prod下面,触发net_tx_tasklet软中断。在软中断处理例程net_tx_action中会调用net_tx_action_dealloc处理从dealloc_cons到dealloc_prod之间的需要释放的page
  

  static u16 dealloc_ring;
static pending_ring_idx_t dealloc_prod, dealloc_cons;

  dealloc_ring是一个线性数组用来当做ring使用,dealloc_prod, dealloc_cons分别是单向增长的生产者和消费者
  

  /* Doubly-linked list of in-use pending entries. */
static struct netbk_tx_pending_inuse pending_inuse;
static LIST_HEAD(pending_inuse_head);

  struct netbk_tx_pending_inuse {
    struct list_head list;
    unsigned long alloc_time;
};

  pending_inuse实际是一个list_head的数组,其中某些list_head组成了一个double-linked list,头部是pending_inuse_head。这个list_head代表了目前正在等待被发送的request
  

  static struct pending_tx_info {
    struct xen_netif_tx_request req;
    struct xen_netif *netif;
} pending_tx_info;
static u16 pending_ring;

  static pending_ring_idx_t pending_prod, pending_cons;

  pending_prod, pending_cons是pending_ring数组的生产者和消费者,pending_ring数组存放的是pending_tx_info的index值,每一个pending_tx_info结构体都包含了代表后端设备的xen_netif结构体指针和代表发送请求的xen_netif_tx_request指针
  

  inline static void net_tx_action_dealloc(void)
{
    struct netbk_tx_pending_inuse *inuse, *n;
    struct gnttab_unmap_grant_ref *gop;
    u16 pending_idx;
    pending_ring_idx_t dc, dp;
    struct xen_netif *netif;
    int ret;
    LIST_HEAD(list);

    dc = dealloc_cons;
    gop = tx_unmap_ops;

  /*
   * Free up any grants we have finished using
   */
  do {
      dp = dealloc_prod;

      /* Ensure we see all indices enqueued by netif_idx_release(). */
      smp_rmb();

      while (dc != dp) {
            unsigned long pfn;

            pending_idx = dealloc_ring;
            list_move_tail(&pending_inuse.list, &list);

            pfn = idx_to_pfn(pending_idx);
            /* Already unmapped? */
            if (!phys_to_machine_mapping_valid(pfn))
                continue;

            gnttab_set_unmap_op(gop, idx_to_kaddr(pending_idx),
                        GNTMAP_host_map,
                        grant_tx_handle);
            gop&#43;&#43;;
      }

  
  这段代码遍历从dealloc_cons到dealloc_prod的dealloc_ring数组项,dealloc_ring中保存了对应的mmap_pages的index. 在pending_inuse数组中找到对应的netbk_tx_pending_inuse项,把list_head从原来的pending_inuse_head的list_head中拿下来,加到新的list中
  同时,对每一个pending_idx,设置gnttab_unmap_grant_ref结构体,用来删除一个GR,其中GR的handle为grant_tx_handle,GR的host地址为idx_to_kaddr(pending_idx)
  所有生成的gnttab_unmap_grant_ref数组被存到tx_unmap_ops数组中
  if (netbk_copy_skb_mode != NETBK_DELAYED_COPY_SKB ||
            list_empty(&pending_inuse_head))
            break;

      /* Copy any entries that have been pending for too long. */
      list_for_each_entry_safe(inuse, n, &pending_inuse_head, list) {
            if (time_after(inuse->alloc_time &#43; HZ / 2, jiffies))
                break;

            pending_idx = inuse - pending_inuse;

            pending_tx_info.netif->nr_copied_skbs&#43;&#43;;

            switch (copy_pending_req(pending_idx)) {
            case 0:
                list_move_tail(&inuse->list, &list);
                continue;
            case -EBUSY:
                list_del_init(&inuse->list);
                continue;
            case -ENOENT:
                continue;
            }
            break;
      }
  对于在pending_inuse_head的list中停留时间过长的请求,会调用copy_pending_req把mmap_pages中对应的页替换为一个新的页,该过程通过调用gnttab_copy_grant_page完成。
  gnttab_copy_grant_page会通过alloc_page申请一个新页,把老页的内容memcpy给新页,之后通过GNTTAB_unmap_and_replace的操作,把原来映射到老页的GR,映射到新页上
  

  } while (dp != dealloc_prod);
  dealloc_cons = dc;

    ret = HYPERVISOR_grant_table_op(
      GNTTABOP_unmap_grant_ref, tx_unmap_ops, gop - tx_unmap_ops);
    BUG_ON(ret);
  通过GNTTAB_unmap_grant_ref操作,释放GR,下面就可以告诉前端,发送已经完成,这些pages可以用来做下一次发送了
  

    list_for_each_entry_safe(inuse, n, &list, list) {
      pending_idx = inuse - pending_inuse;

      netif = pending_tx_info.netif;

      make_tx_response(netif, &pending_tx_info.req,
               NETIF_RSP_OKAY);

      /* Ready for next use. */
      gnttab_reset_grant_page(mmap_pages);
      pending_ring = pending_idx;
      netif_put(netif);
      list_del_init(&inuse->list);
    }
  对于pending_inuse的page,调用make_tx_response生成xen_netif_tx_response并通知前端发送成功,返回给前端NETIF_RSP_OKAY。同时调用gnttab_reset_grant_page重置mmap_pages中对应的page. 这些空闲的page又可以拿来给下一次请求使用
  }

  这里我们基本可以猜出几个MAX_PENDING_REQS大小的数组的作用了:
  1. mmap_pages是一个struct page* 数组,用来存放用来给GR映射的页指针,与之对应的是grant_tx_handle数组,存放GR对应的grant_handle_t
  2. 当前端发送请求来到后端之后,会被映射到mmap_pages中空闲的页中(我们知道一个xen_netif_tx_request最多只对应一个page,请看我对netfront代码的分析),此时通过pending_ring来获取空闲的mmap_pages。pending_ring的pending_prod指向空闲页在mmap_pages中的位置。
  3. pending_prod指向的pending_ring_idx_t同时也指向了pending_tx_info数组中请求内容的位置。这个请求的内容xen_netif_tx_request,和对应的netback设备xen_netif都存在这个pending_tx_info结构体中
  4. pending_cons用来处理pending_prod指向的前端发送请求,所有正在被后端发送的请求都会被放入struct netbk_tx_pending_inuse pending_inuse数组,这里struct netbk_tx_pending_inuse实际是一个list_head,pending_inuse_head是这个list_head的头部,list成员按照alloc time从早到晚依次加入到尾部
  5. 当请求被成功发送后,dealloc_prod会相应增加。dealloc_ring数组用来存放需要被释放页在mmap_pages中的位置。一旦某个page被释放,其pending_inuse_idx_t对应的pending_inuse数组成员会被从pending_inuse_head链表中移除掉
  6. 总结:pending_ring, dealloc_ring构成了一个IO环,可以把pending_prod, pending_cons看作request prod, request cons, 而把dealloc_prod, dealloc_cons看作是rsp_prod, rsp_cons
  pending_cons一直到pending_prod的slots表示pending_ring中还有多少空闲的slots可以用来发送,MAX_PENDING_REQS - pending_prod &#43; pending_cons表示已经被待发送请求占用的slots个数,也就是nr_pending_reqs返回的值
  

  static void make_tx_response(struct xen_netif *netif,
               struct xen_netif_tx_request *txp,
               s8       st)
{
    RING_IDX i = netif->tx.rsp_prod_pvt;
    struct xen_netif_tx_response *resp;
    int notify;

    resp = RING_GET_RESPONSE(&netif->tx, i);
    resp->id   = txp->id;
    resp->status = st;

    if (txp->flags & NETTXF_extra_info)
      RING_GET_RESPONSE(&netif->tx, &#43;&#43;i)->status = NETIF_RSP_NULL;

    netif->tx.rsp_prod_pvt = &#43;&#43;i;
    RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&netif->tx, notify);

    /*
   * netfront_smartpoll_active indicates whether netfront timer
   * is active.
   */
    if ((netif->smart_poll == 1)) {
      if (!(netif->rx.sring->private.netif.smartpoll_active)) {
            notify_remote_via_irq(netif->irq);
            netif->rx.sring->private.netif.smartpoll_active = 1;
      }
    } else if (notify)
      notify_remote_via_irq(netif->irq);
}

  make_tx_response比较简单,就是生成xen_netif_tx_response后把响应返回给前端。注意对于NETTXF_extra_info类型的请求,响应是NETIF_RSP_NULL
  

  static unsigned net_tx_build_mops(void)
{
    struct gnttab_map_grant_ref *mop;
    struct sk_buff *skb;
    int ret;

    mop = tx_map_ops;
  tx_map_ops是一个gnttab_map_grant_ref的数组,而tx_unmap_ops则是一个gnttab_unmap_grant_ref的数组
  
    while (((nr_pending_reqs() &#43; MAX_SKB_FRAGS) < MAX_PENDING_REQS) &&
      !list_empty(&net_schedule_list)) {
  如果此时pending_ring的可用slots可以容纳最大的一个请求,同时net_schedule_list里有活动的xen_netif,那么循环处理发送请求
  struct xen_netif *netif;
      struct xen_netif_tx_request txreq;
      struct xen_netif_tx_request txfrags;
      struct xen_netif_extra_info extras;
      u16 pending_idx;
      RING_IDX idx;
      int work_to_do;
      unsigned int data_len;

      /* Get a netif from the list with work to do. */
      netif = list_first_entry(&net_schedule_list, struct xen_netif, list);
      netif_get(netif);
      remove_from_net_schedule_list(netif);

      RING_FINAL_CHECK_FOR_REQUESTS(&netif->tx, work_to_do);
      if (!work_to_do) {
            netif_put(netif);
            continue;
      }

  idx = netif->tx.req_cons;
      rmb(); /* Ensure that we see the request before we copy it. */
      memcpy(&txreq, RING_GET_REQUEST(&netif->tx, idx), sizeof(txreq));
      /* Credit-based scheduling. */
      if (txreq.size > atomic64_read(&netif->remaining_credit) &&
            tx_credit_exceeded(netif, txreq.size)) {
            netif_put(netif);
            continue;
      }

      atomic64_sub(txreq.size,&netif->remaining_credit);

      work_to_do--;
      netif->tx.req_cons = &#43;&#43;idx;

  如果netif有请求要处理,同时QoS也允许发送,那么把请求可能的第一个frag拷贝到txreq,同时netif->tx.req_cons自增1
  memset(extras, 0, sizeof(extras));
      if (txreq.flags & NETTXF_extra_info) {
            work_to_do = netbk_get_extras(netif, extras,
                              work_to_do);
            idx = netif->tx.req_cons;
            if (unlikely(work_to_do < 0)) {
                netbk_tx_err(netif, &txreq, idx);
                continue;
            }
      }

  如果请求是一个gso请求(说明有多个sg分片),那么调用netbk_get_extras,计算前端还有多少个类型为XEN_NETIF_EXTRA_FLAG_MORE的xen_netif_tx_request请求(一般来说也就一个而已,所以这段代码基本无用)
  ret = netbk_count_requests(netif, &txreq, txfrags, work_to_do);
      if (unlikely(ret < 0)) {
            netbk_tx_err(netif, &txreq, idx - ret);
            continue;
      }
      idx &#43;= ret;

  调用netbk_count_requests,把所有NETTXF_more_data的分片连同最后一个分片全部拷贝到txfrags数组中。所以对于gso请求,所有的xen_netif_tx_request分片都被存到了txreq, txfrags数组中
  if (unlikely(txreq.size < ETH_HLEN)) {
            DPRINTK(&quot;Bad packet size: %d\n&quot;, txreq.size);
            netbk_tx_err(netif, &txreq, idx);
            continue;
      }

      /* No crossing a page as the payload mustn't fragment. */
      if (unlikely((txreq.offset &#43; txreq.size) > PAGE_SIZE)) {
            DPRINTK(&quot;txreq.offset: %x, size: %u, end: %lu\n&quot;,
                txreq.offset, txreq.size,
                (txreq.offset &~PAGE_MASK) &#43; txreq.size);
            netbk_tx_err(netif, &txreq, idx);
            continue;
      }

  txreq不能跨page, txreq长度也不能少于一个二层包头,不然都是非法请求
  pending_idx = pending_ring;

      data_len = (txreq.size > PKT_PROT_LEN &&
                ret < MAX_SKB_FRAGS) ?
            PKT_PROT_LEN : txreq.size;

      skb = alloc_skb(data_len &#43; NET_SKB_PAD &#43; NET_IP_ALIGN,
                GFP_ATOMIC | __GFP_NOWARN);
      if (unlikely(skb == NULL)) {
            DPRINTK(&quot;Can't allocate a skb in start_xmit.\n&quot;);
            netbk_tx_err(netif, &txreq, idx);
            break;
      }

  /* Packets passed to netif_rx() must have some headroom. */
      skb_reserve(skb, NET_SKB_PAD &#43; NET_IP_ALIGN);

      if (extras.type) {
            struct xen_netif_extra_info *gso;
            gso = &extras;

            if (netbk_set_skb_gso(skb, gso)) {
                kfree_skb(skb);
                netbk_tx_err(netif, &txreq, idx);
                continue;
            }
      }

  分配sk_buff,设置skb的gso配置
  gnttab_set_map_op(mop, idx_to_kaddr(pending_idx),
                  GNTMAP_host_map | GNTMAP_readonly,
                  txreq.gref, netif->domid);
      mop&#43;&#43;;

  设置gnttab_map_grant_ref的GR
  memcpy(&pending_tx_info.req,
               &txreq, sizeof(txreq));
      pending_tx_info.netif = netif;
  
      *((u16 *)skb->data) = pending_idx;
/* 这里很tricky,skb->data的前2个字节用来存储第一个page的pending_idx */

      __skb_put(skb, data_len);
  /* data_len最多为72个字节 */

      skb_shinfo(skb)->nr_frags = ret;
      if (data_len < txreq.size) {
            skb_shinfo(skb)->nr_frags&#43;&#43;;
            skb_shinfo(skb)->frags.page =
                (void *)(unsigned long)pending_idx;
      } else {
            /* Discriminate from any valid pending_idx value. */
            skb_shinfo(skb)->frags.page = (void *)~0UL;
      }
  
      __skb_queue_tail(&tx_queue, skb);

  设置skb,并把skb追加到tx_queue发送队列。注意由于第一个txreq对应的skb线性区域最多只能有72个字节,那么溢出的部分有可能会被设置到frag第一个分片里。
  pending_cons&#43;&#43;;

      mop = netbk_get_requests(netif, skb, txfrags, mop);

      netif->tx.req_cons = idx;
      netif_schedule_work(netif);

      if ((mop - tx_map_ops) >= ARRAY_SIZE(tx_map_ops))
            break;
    }
设置完txreq的mop之后,自增pending_cons,然后调用netbk_get_requests设置txfrags数组的mop。netbk_get_requests会依次设置好gnttab_mop_grant_ref,把xen_netif_tx_request拷贝到pending_tx_info数组对应的slot。注意这里skb_shared_info里的frags.page不是struct page*指针了,而是这个page在mmap_pages中的idx位置。

    return mop - tx_map_ops;
}

  

  static void net_tx_submit(void)
{
    struct gnttab_map_grant_ref *mop;
    struct sk_buff *skb;

    mop = tx_map_ops;
    while ((skb = __skb_dequeue(&tx_queue)) != NULL) {
      struct xen_netif_tx_request *txp;
      struct xen_netif *netif;
      u16 pending_idx;
      unsigned data_len;

      pending_idx = *((u16 *)skb->data);
      netif       = pending_tx_info.netif;
      txp         = &pending_tx_info.req;
  skb->data开头的两个字节用来暂存线性区域page的index,后续就没用了,这个值由net_tx_build_mops函数传入
  这里通过pending_idx得到struct xen_netif, struct xen_netif_tx_request
  
      /* Check the remap error code. */
      if (unlikely(netbk_tx_check_mop(skb, &mop))) {
            DPRINTK(&quot;netback grant failed.\n&quot;);
            skb_shinfo(skb)->nr_frags = 0;
            kfree_skb(skb);
            continue;
      }

  data_len = skb->len;
      memcpy(skb->data,
               (void *)(idx_to_kaddr(pending_idx)|txp->offset),
               data_len);
      if (data_len < txp->size) {
            /* Append the packet payload as a fragment. */
            txp->offset &#43;= data_len;
            txp->size -= data_len;
      } else {
            /* Schedule a response immediately. */
            netif_idx_release(pending_idx);
      }
把pending_idx的页的内容拷贝到skb->data开头的空间中,长度为data_len = skb->len
  如果data_len < txp->size,此时data_len为72字节,txp->size大于72字节的部分由第一个frag指向。这种情况下,pending_idx所指向的页还不能释放
  如果data_len >= txp->size,此时pending_idx所指向的页的内容已经完全拷贝到了skb线性区域了,此时可以调用netif_idx_release把第pending_idx个页加到dealloc_ring的&#8;dealloc_prod中

      /*
         * Old frontends do not assert data_validated but we
         * can infer it from csum_blank so test both flags.
         */
      if (txp->flags & (NETTXF_data_validated|NETTXF_csum_blank))
            skb->ip_summed = CHECKSUM_PARTIAL;
      else
            skb->ip_summed = CHECKSUM_NONE;

      netbk_fill_frags(skb);

  前面已经设置好了skb->nr_frags,和每个skb_frag_t的page所在的pending_idx,netbk_fill_frags遍历skb->nr_frags,把pending_idx指向的页加到pending_inuse_head链表中,在基于xen_netif_tx_request,设置好每个skb_frag_t的page,offset, size
  /*
         * If the initial fragment was < PKT_PROT_LEN then
         * pull through some bytes from the other fragments to
         * increase the linear region to PKT_PROT_LEN bytes.
         */
      if (skb_headlen(skb) < PKT_PROT_LEN && skb_is_nonlinear(skb)) {
            int target = min_t(int, skb->len, PKT_PROT_LEN);
            __pskb_pull_tail(skb, target - skb_headlen(skb));
      }
保证skb线性区域的大小为PKT_PROT_LEN

      skb->dev      = netif->dev;
      skb->protocol = eth_type_trans(skb, skb->dev);

      netif->stats.rx_bytes &#43;= skb->len;
      netif->stats.rx_packets&#43;&#43;;

      if (skb->ip_summed == CHECKSUM_PARTIAL) {
            if (skb_checksum_setup(skb)) {
                DPRINTK(&quot;Can't setup checksum in net_tx_action\n&quot;);
                kfree_skb(skb);
                continue;
            }
      }
  如果需要网卡进行校验和计算,那么调用skb_checksum_setup设置skb->csum_start, skb->csum_offset。这里没有对icmp设置任何校验和项,是个潜在的问题
  

  if (unlikely(netbk_copy_skb_mode == NETBK_ALWAYS_COPY_SKB) &&
            unlikely(skb_linearize(skb))) {
            DPRINTK(&quot;Can't linearize skb in net_tx_action.\n&quot;);
            kfree_skb(skb);
            continue;
      }
  
      netif_rx(skb);
      netif->dev->last_rx = jiffies;
    }
  if (netbk_copy_skb_mode == NETBK_DELAYED_COPY_SKB &&
      !list_empty(&pending_inuse_head)) {
      struct netbk_tx_pending_inuse *oldest;

      oldest = list_entry(pending_inuse_head.next,
                  struct netbk_tx_pending_inuse, list);
      mod_timer(&netbk_tx_pending_timer, oldest->alloc_time &#43; HZ);
    }
  对还在pending_inuse_head的list中尚未处理完的请求,设置timer定时器,继续调用软中断来处理
}

  

  不知道到这里为止,你们是否觉得事情已经做完了?是不是还缺点什么?bingo,当netif_rx处理完发送请求之后,那些mmap_pages里用过的页如何释放呢?dealloc_prod总得有人增加吧,答案就在这里SetPageForeign
  for (i = 0; i < MAX_PENDING_REQS; i&#43;&#43;) {
      page = mmap_pages;
      SetPageForeign(page, netif_page_release);
      netif_set_page_index(page, i);
      INIT_LIST_HEAD(&pending_inuse.list);
    }

  我们看到netback_init中,对mmap_pages的所有页都调用了SetPageForeign. foreign page是xen引入的一种页类型。netif_page_release是页的析构函数,就是说每次想要释放这个页,其实并没有释放而是调用了netif_page_release
  static void netif_page_release(struct page *page, unsigned int order)
{
    int idx = netif_page_index(page);
    BUG_ON(order);
    BUG_ON(idx < 0);
    netif_idx_release(idx);
}

  idx是该page在mmap_pages中的索引,netif_idx_release才是真正把这个page的pending_idx加到了dealloc_ring中,同时增加dealloc_prod的家伙
  static void netif_idx_release(u16 pending_idx)
{
    static DEFINE_SPINLOCK(_lock);
    unsigned long flags;

    spin_lock_irqsave(&_lock, flags);
    dealloc_ring = pending_idx;
    /* Sync with net_tx_action_dealloc: insert idx /then/ incr producer. */
    smp_wmb();
    dealloc_prod&#43;&#43;;
    spin_unlock_irqrestore(&_lock, flags);

    tasklet_schedule(&net_tx_tasklet);
}

  

  

  

         版权声明:本文为博主原创文章,未经博主允许不得转载。
页: [1]
查看完整版本: xen网络后端驱动分析(发送篇)