接前一篇文章:
上一回解析了setup_vq函数的前3步,本回继续解析余下的步骤。为了便于理解和加深印象,再次贴出setup_vq函数的源码,在 Linux 内核源码/drivers/ virtio /virtio_pci_modern.c中,代码如下:
- static struct virtqueue *setup_vq(struct virtio_pci_device *vp_dev,
- struct virtio_pci_vq_info *info,
- unsigned int index,
- void (*callback)(struct virtqueue *vq),
- const char *name,
- bool ctx,
- u16 msix_vec)
- {
-
- struct virtio_pci_modern_device *mdev = &vp_dev->mdev;
- bool (*notify)(struct virtqueue *vq);
- struct virtqueue *vq;
- u16 num;
- int err;
-
- if (__virtio_test_bit(&vp_dev->vdev, VIRTIO_F_NOTIFICATION_DATA))
- notify = vp_notify_with_data;
- else
- notify = vp_notify;
-
- if (index >= vp_modern_get_num_queues(mdev))
- return ERR_PTR(-EINVAL);
-
- /* Check if queue is either not available or already active. */
- num = vp_modern_get_queue_size(mdev, index);
- if (!num || vp_modern_get_queue_enable(mdev, index))
- return ERR_PTR(-ENOENT);
-
- info->msix_vector = msix_vec;
-
- /* create the vring */
- vq = vring_create_virtqueue(index, num,
- SMP_CACHE_BYTES, &vp_dev->vdev,
- true, true, ctx,
- notify, callback, name);
- if (!vq)
- return ERR_PTR(-ENOMEM);
-
- vq->num_max = num;
-
- err = vp_active_vq(vq, msix_vec);
- if (err)
- goto err;
-
- vq->priv = (void __force *)vp_modern_map_vq_notify(mdev, index, NULL);
- if (!vq->priv) {
- err = -ENOMEM;
- goto err;
- }
-
- return vq;
-
- err:
- vring_del_virtqueue(vq);
- return ERR_PTR(err);
- }
再来回顾一下前3步。
(1) setup_vq函数 首先检测virtio PCI设备是否具有VIRTIO_F_NOTIFICATION_DATA特性 。代码片段如下:
if (__virtio_test_bit(&vp_dev->vdev, VIRTIO_F_NOTIFICATION_DATA)) notify = vp_notify_with_data; else notify = vp_notify;(2) 接下来, 调用vp_modern_get_num_queues函数获取virtqueues的长度(个数) 。代码片段如下:
if (index >= vp_modern_get_num_queues(mdev)) return ERR_PTR(-EINVAL);(3) 接下来, 调用vp_modern_get_queue_size函数获得一个virtqueue的大小 。代码片段如下:
/* Check if queue is either not available or already active. */ num = vp_modern_get_queue_size(mdev, index); if (!num || vp_modern_get_queue_enable(mdev, index)) return ERR_PTR(-ENOENT);
(4)接下来, 调用vring_create_virtqueue函数实际生成virtqueue 。vring_create_virtqueue函数在 Linux内核源码 /drivers/virtio/virtio_ring.c中,代码如下:
- struct virtqueue *vring_create_virtqueue(
- unsigned int index,
- unsigned int num,
- unsigned int vring_align,
- struct virtio_device *vdev,
- bool weak_barriers,
- bool may_reduce_num,
- bool context,
- bool (*notify)(struct virtqueue *),
- void (*callback)(struct virtqueue *),
- const char *name)
- {
-
- if (virtio_has_feature(vdev, VIRTIO_F_RING_PACKED))
- return vring_create_virtqueue_packed(index, num, vring_align,
- vdev, weak_barriers, may_reduce_num,
- context, notify, callback, name, vdev->dev.parent);
-
- return vring_create_virtqueue_split(index, num, vring_align,
- vdev, weak_barriers, may_reduce_num,
- context, notify, callback, name, vdev->dev.parent);
- }
- EXPORT_SYMBOL_GPL(vring_create_virtqueue);
vring_create_virtqueue函数根据virtio device是否具有VIRTIO_F_RING_PACKED这一feature,进行了分支处理。
virtio_has_feature函数前文书已经讲解过了,详情参见 QEMU源码全解析 —— virtio(23) 。
VIRTIO_F_RING_PACKED宏的定义在Linux内核源码/include/uapi/linux/virtio_config.h中,如下:
- /* This feature indicates support for the packed virtqueue layout. */
- #define VIRTIO_F_RING_PACKED 34
vring_create_virtqueue函数根据后端设备是否支持VIRTIO_F_RING_PACKED这个feature,进行了分支处理。支持此特性,则调用vring_create_virtqueue_packed函数;否则调用vring_create_virtqueue_split函数。一个一个来看。
- vring_create_virtqueue_packed函数
vring_create_virtqueue_packed函数也在Linux内核源码/drivers/virtio/virtio_ring.c中,代码如下:
- static struct virtqueue *vring_create_virtqueue_packed(
- unsigned int index,
- unsigned int num,
- unsigned int vring_align,
- struct virtio_device *vdev,
- bool weak_barriers,
- bool may_reduce_num,
- bool context,
- bool (*notify)(struct virtqueue *),
- void (*callback)(struct virtqueue *),
- const char *name,
- struct device *dma_dev)
- {
- struct vring_virtqueue_packed vring_packed = {};
- struct vring_virtqueue *vq;
- int err;
-
- if (vring_alloc_queue_packed(&vring_packed, vdev, num, dma_dev))
- goto err_ring;
-
- vq = kmalloc(sizeof(*vq), GFP_KERNEL);
- if (!vq)
- goto err_vq;
-
- vq->vq.callback = callback;
- vq->vq.vdev = vdev;
- vq->vq.name = name;
- vq->vq.index = index;
- vq->vq.reset = false;
- vq->we_own_ring = true;
- vq->notify = notify;
- vq->weak_barriers = weak_barriers;
- #ifdef CONFIG_VIRTIO_HARDEN_NOTIFICATION
- vq->broken = true;
- #else
- vq->broken = false;
- #endif
- vq->packed_ring = true;
- vq->dma_dev = dma_dev;
- vq->use_dma_api = vring_use_dma_api(vdev);
- vq->premapped = false;
- vq->do_unmap = vq->use_dma_api;
-
- vq->indirect = virtio_has_feature(vdev, VIRTIO_RING_F_INDIRECT_DESC) &&
- !context;
- vq->event = virtio_has_feature(vdev, VIRTIO_RING_F_EVENT_IDX);
-
- if (virtio_has_feature(vdev, VIRTIO_F_ORDER_PLATFORM))
- vq->weak_barriers = false;
-
- err = vring_alloc_state_extra_packed(&vring_packed);
- if (err)
- goto err_state_extra;
-
- virtqueue_vring_init_packed(&vring_packed, !!callback);
-
- virtqueue_init(vq, num);
- virtqueue_vring_attach_packed(vq, &vring_packed);
-
- spin_lock(&vdev->vqs_list_lock);
- list_add_tail(&vq->vq.list, &vdev->vqs);
- spin_unlock(&vdev->vqs_list_lock);
- return &vq->vq;
-
- err_state_extra:
- kfree(vq);
- err_vq:
- vring_free_packed(&vring_packed, vdev, dma_dev);
- err_ring:
- return NULL;
- }
vring_create_virtqueue_packed函数用于在Linux内核中创建一个基于packed ring的virtqueue,并返回一个指向该virtqueue的指针。
部分参数说明如下:
- index:表示virtqueue的索引号。
- num:表示virtqueue的数量。
- vdev:指向virtio_device结构体的指针,表示与virtqueue相关联的设备。
- notify:一个函数指针(回调),用于在数据传输完成后通知宿主机。
- callback:一个函数指针(回调),用于在接收到宿主机通知时执行相应的操作。
- name:指定virtqueue的名称。
返回值:
函数执行成功,返回一个指向vring_virtqueue结构体的指针,该结构体包含了管理packed ring的相关信息,如描述符表、可用环和已用环等。
- vring_create_virtqueue_split函数
vring_create_virtqueue_split函数也在Linux内核源码/drivers/virtio/virtio_ring.c中,代码如下:
- static struct virtqueue *vring_create_virtqueue_split(
- unsigned int index,
- unsigned int num,
- unsigned int vring_align,
- struct virtio_device *vdev,
- bool weak_barriers,
- bool may_reduce_num,
- bool context,
- bool (*notify)(struct virtqueue *),
- void (*callback)(struct virtqueue *),
- const char *name,
- struct device *dma_dev)
- {
- struct vring_virtqueue_split vring_split = {};
- struct virtqueue *vq;
- int err;
-
- err = vring_alloc_queue_split(&vring_split, vdev, num, vring_align,
- may_reduce_num, dma_dev);
- if (err)
- return NULL;
-
- vq = __vring_new_virtqueue(index, &vring_split, vdev, weak_barriers,
- context, notify, callback, name, dma_dev);
- if (!vq) {
- vring_free_split(&vring_split, vdev, dma_dev);
- return NULL;
- }
-
- to_vvq(vq)->we_own_ring = true;
-
- return vq;
- }
vring_create_virtqueue_split函数用于在Linux内核中创建一个基于 split ring的virtqueue,并返回一个指向该virtqueue的指针。
该函数的主要参数包括:
- index:表示virtqueue的索引号。
- num:表示virtqueue的数量。
- vdev:指向virtio_device结构体的指针,表示与virtqueue相关联的设备。
- notify:一个函数指针(回调),用于在数据传输完成后通知宿主机。
- callback:一个函数指针(回调),用于在接收到宿主机通知时执行相应的操作。
- name:指定virtqueue的名称。
返回值:
函数执行成功,返回一个指向virtqueue结构体的指针。
这里要进行一下知识补强:split ring和packed ring。
参考以下文章: 异步模式下的Vhost Packed Ring设计介绍 - 知乎
- split ring
split ring把 descriptor环形缓冲区(descriptor ring) 、 可用环形缓冲区(available ring) 和 已用环形缓冲区(used ring) 分别 使用3个数据结构来表示 ,这也是它被叫做split ring的原因。
前端作为生产者将新生产可给后端使用的descriptor在available ring中更新,然后由后端从available ring中读取到自己可用的descriptor进行使用,使用完以后将用完的descriptor写到used ring中,供前端处理,循环使用这些内存。 这里所谓的使用,可以从别的地方把数据拷贝到这些descriptor所描述的内存区域中,也可以是从这些descriptor所描述的内存区域中把数据拷贝到别的地方 。
- packed ring
packed ring是Virtio Spec 1.1提出了一种新的ring结构,前后端通信的逻辑与split ring基本相似,不过ring结构发生了些改变, 将split ring中原本分离的ring结构整合在一起全部用一个ring表示 。虽然在操作上确实复杂了一些,但是在前后端收发包操作时packed ring模式下只需访问一个ring结构, 相比于split ring每次需要操作3个ring而言,packed ring减少了所需访问的内存空间,对cache来说更加友好 。
下图是packed ring的ring结构(这个ring是前后端都可见的):
- 通过descriptor中的标记位判断这个descriptor的归属权在前端还是在后端。
- 后端使用两个指针来追踪生产和消费的进度,分别为last_avail_idx和last_used_idx。
- 前端作为生产者初始化这些descriptor,后端作为消费者不断的取用这些descriptor,并在使用结束后将使用完的descriptor写回这个ring中,前端观察到这个descriptor被写回则更新这个descriptor,使得后端在下次可以继续使用。
- 值得一提的是,前端按照ring的顺序操作这些descriptor,而后端则是按照完成顺序操作这些descriptor,且在in-order模式下descriptor的可用性一定是连续的,既若X号descriptor不可用,则默认X+1号descriptor同样不可用。这样一套前后端的协作机制构成了packed ring的基本信息传递方式。
当前位于virtqueue创建流程的以下阶段(红色矩形框中)(图片引自 https://note.youdao.com/ynoteshare/index.html?id=f247acce8c21eb4ca4a7e37403f065f9&type=note&_time=1633000040495 ):
更多详情请看下回。