天天看點

dpdk vhost研究(二)

繼續本專題的研究,關于本專題前期的内容請參考這裡。

消息機制

當使用vhost-user時,需要在系統中建立一個unix domain socket server,用來處理qemu發送給host的消息。

如果有新的socket連接配接,說明guest建立了新的virtio-net裝置,vhost驅動會為之建立一個vhost裝置,之後qemu就可以通過socket和vhost進行通信了;當socket關閉,vhost就會銷毀對應的裝置。

常用的消息包括:

//driver\net\virtio\virtio_user\vhost_kernel.c
/* vhost kernel ioctls */
#define VHOST_VIRTIO 0xAF
/*傳回vhost支援的virtio-net功能子集*/
#define VHOST_GET_FEATURES _IOR(VHOST_VIRTIO, 0x00, __u64)  
/*檢查功能掩碼,設定vhost和virtio前端共同支援的特性,需要兩者同時支援才能生效*/
#define VHOST_SET_FEATURES _IOW(VHOST_VIRTIO, 0x00, __u64)
/*将裝置設定為目前程序所有*/
#define VHOST_SET_OWNER _IO(VHOST_VIRTIO, 0x01)
/*目前程序釋放對裝置的所有權*/
#define VHOST_RESET_OWNER _IO(VHOST_VIRTIO, 0x02)
/*設定記憶體空間布局資訊,用于封包收發時的位址轉換*/
#define VHOST_SET_MEM_TABLE _IOW(VHOST_VIRTIO, 0x03, struct vhost_memory_kernel)
/*下面兩個宏,用于guest線上遷移*/
#define VHOST_SET_LOG_BASE _IOW(VHOST_VIRTIO, 0x04, __u64)
#define VHOST_SET_LOG_FD _IOW(VHOST_VIRTIO, 0x07, int)
/*vhost記錄每個虛拟隊列的大小*/
#define VHOST_SET_VRING_NUM _IOW(VHOST_VIRTIO, 0x10, struct vhost_vring_state)
/*由qemu發送virtqueue結構的虛拟位址。vhost将該位址轉換成vhost的虛拟位址。*/
#define VHOST_SET_VRING_ADDR _IOW(VHOST_VIRTIO, 0x11, struct vhost_vring_addr)
/*傳遞初始索引值,vhost通過該索引值找到初始描述符*/
#define VHOST_SET_VRING_BASE _IOW(VHOST_VIRTIO, 0x12, struct vhost_vring_state)
/*将虛拟隊列的目前可用索引值發送給qemu*/
#define VHOST_GET_VRING_BASE _IOWR(VHOST_VIRTIO, 0x12, struct vhost_vring_state)
/*傳遞eventfd檔案描述符。當guest有新的資料要發送時,通過該檔案描述符通知vhsot接收資料
* 并發送到目的地;vhost使用eventfd代理子產品把這個檔案描述符從qemu上下文切換到自己的程序
* 上下文
*/
#define VHOST_SET_VRING_KICK _IOW(VHOST_VIRTIO, 0x20, struct vhost_vring_file)
/*也是用來傳遞eventfd檔案描述符。使vhost能夠在完成對新的資料包接收時,通過中斷方式通知
*guest準備接收資料包。使用eventfd代理子產品把這個檔案描述符從qemu上下文切換到自己的程序
*上下文
*/
#define VHOST_SET_VRING_CALL _IOW(VHOST_VIRTIO, 0x21, struct vhost_vring_file)
/*代碼中僅有定義,未使用*/
#define VHOST_SET_VRING_ERR _IOW(VHOST_VIRTIO, 0x22, struct vhost_vring_file)
/*用來支援virtio-user*/
#define VHOST_NET_SET_BACKEND _IOW(VHOST_VIRTIO, 0x30, struct vhost_vring_file)
           

位址轉換和記憶體映射

virtqueue和vring進行資料交換的核心是使用一種機制将資料緩沖區實作對guest和host同時可見,進而通過避免資料的拷貝來消耗性能。dpdk vhost在這裡使用的是大頁記憶體、記憶體映射以及相應的位址轉換來完成這個功能的。

是以,host端必須由足夠的大頁空間,同時需要指定記憶體預配置設定。為了vhost能通路virtqueue和資料包緩沖區,所有的描述符表、環表位址,其所在頁面必須被映射到vhost的程序空間中。

vhost在收到VHOST_SET_MEM_TABLE消息後,會使用消息中的記憶體分布表來完成記憶體映射工作:

/*下面的兩個資料結構記錄guest的實體位址及偏移量*/
/**
 * Information relating to memory regions including offsets to
 * addresses in QEMUs memory file.
 */
struct rte_vhost_mem_region {
    uint64_t guest_phys_addr;
    uint64_t guest_user_addr;
    uint64_t host_user_addr;
    uint64_t size;
    void     *mmap_addr;
    uint64_t mmap_size;
    int fd;
};

/**
 * Memory structure includes region and mapping information.
 */
struct rte_vhost_memory {
    uint32_t nregions;
    struct rte_vhost_mem_region regions[];
};

/*
 *将 QEMU virtual address 轉化成 Vhost virtual address. 該函數用來将ring address
 * 轉換成host端的virtual address
 */
static uint64_t
qva_to_vva(struct virtio_net *dev, uint64_t qva)
{
    struct rte_vhost_mem_region *reg;
    uint32_t i;

    /* Find the region where the address lives. */
    for (i = ; i < dev->mem->nregions; i++) {
        reg = &dev->mem->regions[i];

        if (qva >= reg->guest_user_addr &&
            qva <  reg->guest_user_addr + reg->size) {
            return qva - reg->guest_user_addr +
                   reg->host_user_addr;
        }
    }

    return ;
}
           

virtio-net 裝置管理

一個virtio-net裝置的生命周期包括裝置建立、配置、服務啟動和裝置銷毀幾個階段。

- 裝置建立

vhost-user通過socket連接配接來建立。當建立一個virtio-net裝置是,需要

  • 配置設定新的virtio-net裝置結構,并添加到裝置連結清單中
  • 為該裝置配置設定一個處理處理核并添加裝置到資料面的連結清單中
  • 在vhost上配置設定一個為virtio-net裝置服務的RX\TX隊列
  • 配置

    利用VHOST_SET_VRING_*消息通知vhost虛拟隊列的大小、基本索引和位置,vhost将虛拟隊列映射到自己的虛拟位址空間

  • 服務啟動

    vhost利用VHOST_SET_VRING_KICK消息來啟動虛拟隊列服務。之後,vhost便可以輪詢接收隊列,并将資料放到virtio-net裝置的接收隊列上。同時,也可以輪詢發送虛拟隊列,檢視是否有待發送的資料包,如果有,則将其複制到發送隊列中。

  • 裝置銷毀

    vhost利用VHOST_GET_VRING_BASE消息來通知停止提供對接收隊列和發送虛拟隊列的服務。同時,配置設定給virtio-net裝置的處理和和實體網卡上的RX和TX隊列也将被釋放。

比較重要的API:

下面從代碼角度來了解下前面描述的過程,幾個比較重要的API包括:

注冊驅動接口

int rte_vhost_driver_register(const char *path, uint64_t flags)
           

這個函數負責在系統中注冊一個vhost driver,path表示socket的路徑。flags在最新的17.05版本中(之前的版本中還不支援可設定,隻預設支援client,重連)支援下面幾個特性:

- RTE_VHOST_USER_CLIENT :以client模式和QEMU相連

- RTE_VHOST_USER_NO_RECONNECT: 預設情況下client會一直嘗試自動和server(QEMU)建立連接配接,當server還沒有啟動或者重新開機時,通過此flag可以關閉該特性

- RTE_VHOST_USER_DEQUEUE_ZERO_COPY:用于vm2vm,vm2nic通信的一種優化方案,預設關閉

來讀下代碼:

int rte_vhost_driver_register(const char *path, uint64_t flags)
{
    int ret = -;
    ...
    /*建立一個vhost-user socket,并根據不同的flag設定不同的特性*/
    struct vhost_user_socket *vsocket;
    vsocket = malloc(sizeof(struct vhost_user_socket));
    if (!vsocket)
        goto out;
    memset(vsocket, , sizeof(struct vhost_user_socket));
    vsocket->path = strdup(path);
    TAILQ_INIT(&vsocket->conn_list);
    pthread_mutex_init(&vsocket->conn_mutex, NULL);
    vsocket->dequeue_zero_copy = flags & RTE_VHOST_USER_DEQUEUE_ZERO_COPY;

    /*
     *設定上内置支援屬性,這些特性對使用者都是透明的
     */
    vsocket->supported_features = VIRTIO_NET_SUPPORTED_FEATURES;
    vsocket->features           = VIRTIO_NET_SUPPORTED_FEATURES;

    if ((flags & RTE_VHOST_USER_CLIENT) != ) {
        vsocket->reconnect = !(flags & RTE_VHOST_USER_NO_RECONNECT);
        if (vsocket->reconnect && reconn_tid == ) {
            /*建立一個線程,這個線程會在背景一直掃描全局的reconn_list連結清單,
            *不斷的嘗試将連結清單中的socket和server進行連接配接
            */
            if (vhost_user_reconnect_init() < ) {
                free(vsocket->path);
                free(vsocket);
                goto out;
            }
        }
    } else {
    /*可以看到此版本也是支援server模式的,這種情況需要QEMU充當client,
    *對QEMU的版本有依賴。
    */
        vsocket->is_server = true;
    }

    /*最終也就是建立了一個unix socket來實作通信功能*/
    ret = create_unix_socket(vsocket);
    if (ret < ) {
        free(vsocket->path);
        free(vsocket);
        goto out;
    }

    /*完成後将socket插入到vhost_user.vsockets數組中,供後續操作查詢socket,
    *查找操作見find_vhost_user_socket(),目前最大支援建立1024個sockets
    */
    vhost_user.vsockets[vhost_user.vsocket_cnt++] = vsocket;
    ...
}

/*封裝的socket建立函數,沒啥可說的*/
int create_unix_socket(struct vhost_user_socket *vsocket)
{
    int fd;
    struct sockaddr_un *un = &vsocket->un;

    fd = socket(AF_UNIX, SOCK_STREAM, );
    if (fd < )
        return -;
    RTE_LOG(INFO, VHOST_CONFIG, "vhost-user %s: socket created, fd: %d\n",
        vsocket->is_server ? "server" : "client", fd);

    if (!vsocket->is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) {
        RTE_LOG(ERR, VHOST_CONFIG,
            "vhost-user: can't set nonblocking mode for socket, fd: "
            "%d (%s)\n", fd, strerror(errno));
        close(fd);
        return -;
    }

    memset(un, , sizeof(*un));
    un->sun_family = AF_UNIX;
    strncpy(un->sun_path, vsocket->path, sizeof(un->sun_path));
    un->sun_path[sizeof(un->sun_path) - ] = '\0';

    vsocket->socket_fd = fd;
    return ;
}

/*查找函數*/
 struct vhost_user_socket *
find_vhost_user_socket(const char *path)
{
    int i;

    /*通過周遊數組方式進行查找,時間效率0(N),好在不會建立太多,
    *估計是考慮過,但覺得不值得做優化
    */
    for (i = ; i < vhost_user.vsocket_cnt; i++) {
        struct vhost_user_socket *vsocket = vhost_user.vsockets[i];

        if (!strcmp(vsocket->path, path))
            return vsocket;
    }

    return NULL;
}
           

設定使能特性:

/*顯式設定支援新特性*/
int rte_vhost_driver_set_features(const char *path, uint64_t features)
/*使能相關特性*/
int rte_vhost_driver_enable_features(const char *path, uint64_t features)
/*去使能相關特性*/
int rte_vhost_driver_disable_features(const char *path, uint64_t features)
           

以上的操作都是針對socket->features做軟體特性的設定,原理大同小異;這些接口可以用來在driver注冊後,對該driver的特性進行微調。

比如當支援mergeable特性時,可以調用rte_vhost_driver_enable_features(file,1ULL << VIRTIO_NET_F_MRG_RXBUF)來進行設定。

目前支援的特性包括:

/* The feature bitmap for virtio net */
#define VIRTIO_NET_F_CSUM   0   /* Host handles pkts w/ partial csum */
#define VIRTIO_NET_F_GUEST_CSUM 1   /* Guest handles pkts w/ partial csum */
#define VIRTIO_NET_F_MTU    3   /* Initial MTU advice. */
#define VIRTIO_NET_F_MAC    5   /* Host has given MAC address. */
#define VIRTIO_NET_F_GUEST_TSO4 7   /* Guest can handle TSOv4 in. */
#define VIRTIO_NET_F_GUEST_TSO6 8   /* Guest can handle TSOv6 in. */
#define VIRTIO_NET_F_GUEST_ECN  9   /* Guest can handle TSO[6] w/ ECN in. */
#define VIRTIO_NET_F_GUEST_UFO  10  /* Guest can handle UFO in. */
#define VIRTIO_NET_F_HOST_TSO4  11  /* Host can handle TSOv4 in. */
#define VIRTIO_NET_F_HOST_TSO6  12  /* Host can handle TSOv6 in. */
#define VIRTIO_NET_F_HOST_ECN   13  /* Host can handle TSO[6] w/ ECN in. */
#define VIRTIO_NET_F_HOST_UFO   14  /* Host can handle UFO in. */
#define VIRTIO_NET_F_MRG_RXBUF  15  /* Host can merge receive buffers. */
#define VIRTIO_NET_F_STATUS 16  /* virtio_net_config.status available */
#define VIRTIO_NET_F_CTRL_VQ    17  /* Control channel available */
#define VIRTIO_NET_F_CTRL_RX    18  /* Control channel RX mode support */
#define VIRTIO_NET_F_CTRL_VLAN  19  /* Control channel VLAN filtering */
#define VIRTIO_NET_F_CTRL_RX_EXTRA 20   /* Extra RX mode control support */
#define VIRTIO_NET_F_GUEST_ANNOUNCE 21  /* Guest can announce device on the
                     * network */
#define VIRTIO_NET_F_MQ     22  /* Device supports Receive Flow
                     * Steering */
#define VIRTIO_NET_F_CTRL_MAC_ADDR 23   /* Set MAC address */
/* Do we get callbacks when the ring is completely used, even if we've
 * suppressed them? */
#define VIRTIO_F_NOTIFY_ON_EMPTY    24
/* Can the device handle any descriptor layout? */
#define VIRTIO_F_ANY_LAYOUT     27
/* We support indirect buffer descriptors */
#define VIRTIO_RING_F_INDIRECT_DESC 28
#define VIRTIO_F_VERSION_1      32
#define VIRTIO_F_IOMMU_PLATFORM 33
           

驅動的操作函數

int rte_vhost_driver_callback_register(const char *path,
    struct vhost_device_ops const * const ops)
           

重點是第二個參數:

struct vhost_device_ops {
    int (*new_device)(int vid);     /**< Add device. */
    void (*destroy_device)(int vid);    /**< Remove device. */
    int (*vring_state_changed)(int vid, uint16_t queue_id, int enable);
    int (*features_changed)(int vid, uint64_t features);
    void *reserved[]; /**< Reserved for future extension */
};
           
  • new_device(int vid)

    當virtual device就緒時,調用該函數。該函數用來建立并初始化device的配置,包括virtqueue,virtio_memory等相關,完成後将該device插入到一個單向連結清單中,供配置查詢使用

  • destory_device(int vid)

    當virtio裝置關閉或者connection斷掉時,執行該操作。

  • vring_state_changed(int vid,uint16_t queue_id, int enable)

    該操作可以在device的特性改變時,注冊使用。比如記log日志。

  • features_changed(int vid, uint64_t features)

    這個操作會在features改變時調用,可以動态實作一些功能。例如:VHOST_F_LOG_ALL會在動态遷移的開始/結束時分别被enable/disable。

使能device

該接口會觸發vhost-user進行協商動作,屬于驅動初始化的最後一個步驟。

int rte_vhost_driver_start(const char *path)
           

研究下代碼:

int rte_vhost_driver_start(const char *path)
{
    struct vhost_user_socket *vsocket;
    static pthread_t fdset_tid;

    /*根據之前記錄的數組,找到socket*/
    pthread_mutex_lock(&vhost_user.mutex);
    vsocket = find_vhost_user_socket(path);
    pthread_mutex_unlock(&vhost_user.mutex);

    if (!vsocket)
        return -;

    /*建立fdset handling 線程*/
    if (fdset_tid == ) {
        int ret = pthread_create(&fdset_tid, NULL, fdset_event_dispatch,
                     &vhost_user.fdset);
        if (ret < )
            RTE_LOG(ERR, VHOST_CONFIG,
                "failed to create fdset handling thread");
    }

    /*根據啟動時指定的模式,執行不同的動作*/
    if (vsocket->is_server)
        return vhost_user_start_server(vsocket);
    else
        return vhost_user_start_client(vsocket);
}

/*client模式*/
vhost_user_start_client(struct vhost_user_socket *vsocket)
{
    int ret;
    int fd = vsocket->socket_fd;
    const char *path = vsocket->path;
    struct vhost_user_reconnect *reconn;

    /*和server進行連接配接,檢查是否可以和server進行連接配接
    * 關于server socket的建立放到QEMU中來完成,這裡僅執行
    * 連接配接操作
    */
    ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un,
                      sizeof(vsocket->un));
    if (ret == ) {
        /*檢查通過,建立vhost_device,vhost_user_connection并加入到
        * 對應的conn_list中
        */
        vhost_user_add_connection(fd, vsocket);
        return ;
    }

    RTE_LOG(WARNING, VHOST_CONFIG,
        "failed to connect to %s: %s\n",
        path, strerror(errno));

    /*檢查失敗時,判斷是否已配置重連特性,沒有的話就直接退出了*/
    if (ret == - || !vsocket->reconnect) {
        close(fd);
        return -;
    }

    /*把該socket放到重連隊列中,等待vhost_user_reconnect_init()初始化創
    * 建的背景線程執行排程了
    */
    RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path);
    reconn = malloc(sizeof(*reconn));
    if (reconn == NULL) {
        RTE_LOG(ERR, VHOST_CONFIG,
            "failed to allocate memory for reconnect\n");
        close(fd);
        return -;
    }
    reconn->un = vsocket->un;
    reconn->fd = fd;
    reconn->vsocket = vsocket;
    pthread_mutex_lock(&reconn_list.mutex);
    TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next);
    pthread_mutex_unlock(&reconn_list.mutex);

    return ;
}

/*server模式*/
vhost_user_start_server(struct vhost_user_socket *vsocket)
{
    int ret;
    int fd = vsocket->socket_fd;
    const char *path = vsocket->path;

    /*熟悉的套路,bind-->listen-->read handle*/
    ret = bind(fd, (struct sockaddr *)&vsocket->un, sizeof(vsocket->un));
    if (ret < ) {
        RTE_LOG(ERR, VHOST_CONFIG,
            "failed to bind to %s: %s; remove it and try again\n",
            path, strerror(errno));
        goto err;
    }
    RTE_LOG(INFO, VHOST_CONFIG, "bind to %s\n", path);

    ret = listen(fd, MAX_VIRTIO_BACKLOG);
    if (ret < )
        goto err;

    /*真正的處理函數,根據新連上的socket建立virtio device,
    * 插入到連接配接隊列中待處理
    */
    ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection,
          NULL, vsocket);
    if (ret < ) {
        RTE_LOG(ERR, VHOST_CONFIG,
            "failed to add listen fd %d to vhost server fdset\n",
            fd);
        goto err;
    }

    return ;

err:
    close(fd);
    return -;
}
           

封包傳輸(enqueue,dequeue)

API接口:

/*将count個封包從host轉發給guest*/
uint16_t rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
    struct rte_mbuf **pkts, uint16_t count)
/*從guest接收count個封包,并存儲到pkts中*/
uint16_t rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
    struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
           

直接看代碼:

uint16_t
rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
    struct rte_mbuf **pkts, uint16_t count)
{
    /*擷取guest的virtio dev*/
    struct virtio_net *dev = get_device(vid);

    if (!dev)
        return ;

    /*檢查是否支援mergable,執行不同的路徑*/
    if (dev->features & ( << VIRTIO_NET_F_MRG_RXBUF))
        return virtio_dev_merge_rx(dev, queue_id, pkts, count);
    else
        return virtio_dev_rx(dev, queue_id, pkts, count);
}

/*隻看看簡單的情況吧,mergable涉及到的優化略複雜,架構還是大同小異的。
* 該函數将從實體網卡或者别的虛機中收到的pkt放到virtio dev的RX 虛拟隊列中。
*/

//優化從函數定義就開始了,staic & inline 
static inline uint32_t __attribute__((always_inline))
virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
          struct rte_mbuf **pkts, uint32_t count)
{
    struct vhost_virtqueue *vq;
    uint16_t avail_idx, free_entries, start_idx;
    uint16_t desc_indexes[MAX_PKT_BURST];
    struct vring_desc *descs;
    uint16_t used_idx;
    uint32_t i, sz;

    /*執行相關一系列檢查*/
    LOG_DEBUG(VHOST_DATA, "(%d) %s\n", dev->vid, __func__);
    if (unlikely(!is_valid_virt_queue_idx(queue_id, , dev->nr_vring))) {
        RTE_LOG(ERR, VHOST_DATA, "(%d) %s: invalid virtqueue idx %d.\n",
            dev->vid, __func__, queue_id);
        return ;
    }

    vq = dev->virtqueue[queue_id];
    if (unlikely(vq->enabled == ))
        return ;

    avail_idx = *((volatile uint16_t *)&vq->avail->idx);
    start_idx = vq->last_used_idx;
    free_entries = avail_idx - start_idx;
    count = RTE_MIN(count, free_entries);
    count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
    if (count == )
        return ;

    LOG_DEBUG(VHOST_DATA, "(%d) start_idx %d | end_idx %d\n",
        dev->vid, start_idx, start_idx + count);

    /* Retrieve all of the desc indexes first to avoid caching issues. */
    rte_prefetch0(&vq->avail->ring[start_idx & (vq->size - )]);
    for (i = ; i < count; i++) {
        used_idx = (start_idx + i) & (vq->size - );
        desc_indexes[i] = vq->avail->ring[used_idx];
        vq->used->ring[used_idx].id = desc_indexes[i];
        vq->used->ring[used_idx].len = pkts[i]->pkt_len +
                           dev->vhost_hlen;
        vhost_log_used_vring(dev, vq,
            offsetof(struct vring_used, ring[used_idx]),
            sizeof(vq->used->ring[used_idx]));
    }

    rte_prefetch0(&vq->desc[desc_indexes[]]);
    for (i = ; i < count; i++) {
        uint16_t desc_idx = desc_indexes[i];
        int err;

        if (vq->desc[desc_idx].flags & VRING_DESC_F_INDIRECT) {
            descs = (struct vring_desc *)(uintptr_t)
                rte_vhost_gpa_to_vva(dev->mem,
                    vq->desc[desc_idx].addr);
            if (unlikely(!descs)) {
                count = i;
                break;
            }

            desc_idx = ;
            sz = vq->desc[desc_idx].len / sizeof(*descs);
        } else {
            descs = vq->desc;
            sz = vq->size;
        }
        /*一個一個的往ring中拷貝,性能估計不會太好*/
        err = copy_mbuf_to_desc(dev, descs, pkts[i], desc_idx, sz);
        if (unlikely(err)) {
            used_idx = (start_idx + i) & (vq->size - );
            vq->used->ring[used_idx].len = dev->vhost_hlen;
            vhost_log_used_vring(dev, vq,
                offsetof(struct vring_used, ring[used_idx]),
                sizeof(vq->used->ring[used_idx]));
        }

        if (i +  < count)
            rte_prefetch0(&vq->desc[desc_indexes[i+]]);
    }

    rte_smp_wmb();

    *(volatile uint16_t *)&vq->used->idx += count;
    vq->last_used_idx += count;
    vhost_log_used_vring(dev, vq,
        offsetof(struct vring_used, idx),
        sizeof(vq->used->idx));

    /* flush used->idx update before we read avail->flags. */
    rte_mb();

    /* Kick the guest if necessary. */
    /*如果條件滿足,就發事件通知*/
    if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
            && (vq->callfd >= ))
        eventfd_write(vq->callfd, (eventfd_t));
    return count;
}
           
uint16_t rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
    struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
{
    struct virtio_net *dev;
    struct rte_mbuf *rarp_mbuf = NULL;
    struct vhost_virtqueue *vq;
    uint32_t desc_indexes[MAX_PKT_BURST];
    uint32_t used_idx;
    uint32_t i = ;
    uint16_t free_entries;
    uint16_t avail_idx;

    /*擷取vdevice,并做相關檢查*/
    dev = get_device(vid);
    if (!dev)
        return ;

    if (unlikely(!is_valid_virt_queue_idx(queue_id, , dev->nr_vring))) {
        RTE_LOG(ERR, VHOST_DATA, "(%d) %s: invalid virtqueue idx %d.\n",
            dev->vid, __func__, queue_id);
        return ;
    }

    vq = dev->virtqueue[queue_id];
    if (unlikely(vq->enabled == ))
        return ;

    if (unlikely(dev->dequeue_zero_copy)) {
        struct zcopy_mbuf *zmbuf, *next;
        int nr_updated = ;

        for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
             zmbuf != NULL; zmbuf = next) {
            next = TAILQ_NEXT(zmbuf, next);

            if (mbuf_is_consumed(zmbuf->mbuf)) {
                used_idx = vq->last_used_idx++ & (vq->size - );
                update_used_ring(dev, vq, used_idx,
                         zmbuf->desc_idx);
                nr_updated += ;

                TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
                rte_pktmbuf_free(zmbuf->mbuf);
                put_zmbuf(zmbuf);
                vq->nr_zmbuf -= ;
            }
        }

        update_used_idx(dev, vq, nr_updated);
    }

    /*
     * Construct a RARP broadcast packet, and inject it to the "pkts"
     * array, to looks like that guest actually send such packet.
     *
     * Check user_send_rarp() for more information.
     *
     * broadcast_rarp shares a cacheline in the virtio_net structure
     * with some fields that are accessed during enqueue and
     * rte_atomic16_cmpset() causes a write if using cmpxchg. This could
     * result in false sharing between enqueue and dequeue.
     *
     * Prevent unnecessary false sharing by reading broadcast_rarp first
     * and only performing cmpset if the read indicates it is likely to
     * be set.
     */
    /*先要将第一個指派成構造的RARP廣播包,至于為什麼要添加這麼一個包,
    * 主要和虛拟遷移有關,有興趣的可以研究下上面的英文注釋
    */
    if (unlikely(rte_atomic16_read(&dev->broadcast_rarp) &&
            rte_atomic16_cmpset((volatile uint16_t *)
                &dev->broadcast_rarp.cnt, , ))) {

        rarp_mbuf = rte_pktmbuf_alloc(mbuf_pool);
        if (rarp_mbuf == NULL) {
            RTE_LOG(ERR, VHOST_DATA,
                "Failed to allocate memory for mbuf.\n");
            return ;
        }

        if (make_rarp_packet(rarp_mbuf, &dev->mac)) {
            rte_pktmbuf_free(rarp_mbuf);
            rarp_mbuf = NULL;
        } else {
            count -= ;
        }
    }

    free_entries = *((volatile uint16_t *)&vq->avail->idx) -
            vq->last_avail_idx;
    if (free_entries == )
        goto out;

    LOG_DEBUG(VHOST_DATA, "(%d) %s\n", dev->vid, __func__);

    /* Prefetch available and used ring */
    avail_idx = vq->last_avail_idx & (vq->size - );
    used_idx  = vq->last_used_idx  & (vq->size - );
    rte_prefetch0(&vq->avail->ring[avail_idx]);
    rte_prefetch0(&vq->used->ring[used_idx]);

    count = RTE_MIN(count, MAX_PKT_BURST);
    count = RTE_MIN(count, free_entries);
    LOG_DEBUG(VHOST_DATA, "(%d) about to dequeue %u buffers\n",
            dev->vid, count);

    /* Retrieve all of the head indexes first to avoid caching issues. */
    for (i = ; i < count; i++) {
        avail_idx = (vq->last_avail_idx + i) & (vq->size - );
        used_idx  = (vq->last_used_idx  + i) & (vq->size - );
        desc_indexes[i] = vq->avail->ring[avail_idx];

        if (likely(dev->dequeue_zero_copy == ))
            update_used_ring(dev, vq, used_idx, desc_indexes[i]);
    }

    /* Prefetch descriptor index. */
    rte_prefetch0(&vq->desc[desc_indexes[]]);
    for (i = ; i < count; i++) {
        struct vring_desc *desc;
        uint16_t sz, idx;
        int err;

        if (likely(i +  < count))
            rte_prefetch0(&vq->desc[desc_indexes[i + ]]);

        if (vq->desc[desc_indexes[i]].flags & VRING_DESC_F_INDIRECT) {
            desc = (struct vring_desc *)(uintptr_t)
                rte_vhost_gpa_to_vva(dev->mem,
                    vq->desc[desc_indexes[i]].addr);
            if (unlikely(!desc))
                break;

            rte_prefetch0(desc);
            sz = vq->desc[desc_indexes[i]].len / sizeof(*desc);
            idx = ;
        } else {
            desc = vq->desc;
            sz = vq->size;
            idx = desc_indexes[i];
        }

        pkts[i] = rte_pktmbuf_alloc(mbuf_pool);
        if (unlikely(pkts[i] == NULL)) {
            RTE_LOG(ERR, VHOST_DATA,
                "Failed to allocate memory for mbuf.\n");
            break;
        }
        //還是一個一個拷貝
        err = copy_desc_to_mbuf(dev, desc, sz, pkts[i], idx, mbuf_pool);
        if (unlikely(err)) {
            rte_pktmbuf_free(pkts[i]);
            break;
        }

        if (unlikely(dev->dequeue_zero_copy)) {
            struct zcopy_mbuf *zmbuf;

            zmbuf = get_zmbuf(vq);
            if (!zmbuf) {
                rte_pktmbuf_free(pkts[i]);
                break;
            }
            zmbuf->mbuf = pkts[i];
            zmbuf->desc_idx = desc_indexes[i];

            /*
             * Pin lock the mbuf; we will check later to see
             * whether the mbuf is freed (when we are the last
             * user) or not. If that's the case, we then could
             * update the used ring safely.
             */
            rte_mbuf_refcnt_update(pkts[i], );

            vq->nr_zmbuf += ;
            TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbuf, next);
        }
    }
    vq->last_avail_idx += i;

    if (likely(dev->dequeue_zero_copy == )) {
        vq->last_used_idx += i;
        update_used_idx(dev, vq, i);
    }

out:
    if (unlikely(rarp_mbuf != NULL)) {
        /*
         * Inject it to the head of "pkts" array, so that switch's mac
         * learning table will get updated first.
         */
        memmove(&pkts[], pkts, i * sizeof(struct rte_mbuf *));
        pkts[] = rarp_mbuf;
        i += ;
    }

    return i;
}
           

ok,到這裡比較重要的API就介紹差不多了,基本的原理應該也就掌握了。

virtio,vhost-net,vhost-user

關于這幾個概念的介紹和對比,這篇http://blog.csdn.net/qq_15437629/article/details/77899905“>文章介紹的挺清楚,大家可以參考下

==下一部分會介紹下這些API的使用示例,主要參考examples\vhost\main.c中流程,請繼續關注。==

繼續閱讀