天天看点

分布式文件存储seaweedFS简介与Mount功能原理揭密

作者:闪念基因

前言

为了满足业务的需求,信也科技基于SeaweedFS搭建了分布式文件存储服务,用于替换现有的Ceph存储和部分公有云存储。目前已经在生产环境落地,接入了部分业务,积累了一些实践经验。本文将介绍SeaweedFS Mount功能的实现原理,并对核心源码进行解读。

SeaweedFS简介

SeaweedFS是什么?

SeaweedFS是一个基于Facebook的《Finding a needle in Haystack: Facebook's photo storage》论文设计的简单且高度可扩展的分布式文件系统,可以支持数十亿的文件存储并提供高性能的文件存储服务。并实现了文件存储、S3对象存储、FUSE Mount、WebDAV等功能。

SeaweedFS架构介绍

分布式文件存储seaweedFS简介与Mount功能原理揭密

可能许多人对SeaweedFS并不了解,所以先介绍下基本架构。SeaweedFS集群主要由Master、Volume、Filer三种类型的节点组成。其中Volume节点主要负责数据的写入和读取,支持多副本存储和EC存储两种模式,并参考了Haystack中的小文件存储设计将多个小文件存储为一个volume,优化了大量小文件场景下的存储性能。Master节点主要负责管理集群维护集群的拓扑结构,在文件读写的过程中负责分配文件id、查询文件存储的位置。Filer节点基于Master节点和Volume节点实现了基本的文件操作API,并在数据库中维护了文件的元数据信息和文件目录结构。三种节点组成SeaweedFS集群对外提供分布式文件存储服务。基于Filer节点又扩展出了S3网关,可以提供S3协议的对象存储服务。

除了基本的文件存储功能之外,SeaweedFS还实现了许多功能,Mount就是其中之一,下面会重点介绍一下Mount功能的实现原理。Mount功能需要在客户端执行weed mount指令开启,执行指令可以将本地的目录挂载到远程的SeaweedFS分布式存储中,可以像操作普通文件一样操作其中的文件,在挂载目录中写入的文件会同步至远程并按照备份策略存储多个备份,使文件操作不会受到本地磁盘大小的限制,并且可以与其他的客户端共享文件。

Mount实现原理

核心流程

在客户端执行weed mount指令之后会在客户端启动SeaweedFS用户态文件系统,该文件系统基于Fuse协议实现了所有的文件操作接口。当我们读写文件时操作系统发现该目录挂载到了Fuse文件系统就会将读写请求交由Fuse内核模块处理,Fuse内核模块将请求封装为特定的格式,然后通过/dev/fuse将请求传递到实现了Fuse协议的用户态文件系统(也就是SeaweedFS Mount客户端实现的用户态文件系统,后续简称用户态文件系统),用户态文件系统会解析请求中的操作码、NodeId等信息判断操作类型并调用SeaweedFS服务端接口执行对应的操作,最后将操作结果返回给Fuse内核,过程如下图所示:

分布式文件存储seaweedFS简介与Mount功能原理揭密
  • FUSE内核:与VFS交互,与普通的文件系统差不多。在接到文件操作请求后不会处理请求,而是将请求保存在队列中并转发给用户空间的进程。
  • GO-FUSE:负责与FUSE内核通信。
  • WFS:用户态文件系统的业务逻辑层,实现了GO-FUSE包中定义的文件系统接口。负责与SeaweedFS进行交互。

源码解读

由于源码较长,本文中贴出的源码会进行一些省略,只讲解核心步骤。

weed mount指令做了什么?

源码链接:https://github.com/seaweedfs/seaweedfs/blob/master/weed/command/mount_std.go

  1. 先将dir从原文件系统unmount。
  2. 创建实现了go-fuse中定义的文件系统接口的seaweedFileSystem对象(WFS)。
  3. 启动用户态文件系统。启动时会调用go-fuse中的mount函数向fuse内核注册用户态文件系统
func RunMount(option *MountOptions, umask os.FileMode) bool {

	......

	// 先将需要mount的dir从原文件系统unmount
	unmount.Unmount(dir)

	......

	// 创建SeaweedFileSystem。该函数会创建并返回一个WFS的对象,该对象实现了Fuse模块中定义的文件操作接口。
	seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{
		MountDirectory:     dir,
		FilerAddresses:     filerAddresses,
		GrpcDialOption:     grpcDialOption,
		FilerMountRootPath: mountRoot,
		Collection:         *option.collection,
		Replication:        *option.replication,
		TtlSec:             int32(*option.ttlSec),
		DiskType:           types.ToDiskType(*option.diskType),
		ChunkSizeLimit:     int64(chunkSizeLimitMB) * 1024 * 1024,
		ConcurrentWriters:  *option.concurrentWriters,
		CacheDir:           *option.cacheDir,
		CacheSizeMB:        *option.cacheSizeMB,
		DataCenter:         *option.dataCenter,
		Quota:              int64(*option.collectionQuota) * 1024 * 1024,
		MountUid:           uid,
		MountGid:           gid,
		MountMode:          mountMode,
		MountCtime:         fileInfo.ModTime(),
		MountMtime:         time.Now(),
		Umask:              umask,
		VolumeServerAccess: *mountOptions.volumeServerAccess,
		Cipher:             cipher,
		UidGidMapper:       uidGidMapper,
		DisableXAttr:       *option.disableXAttr,
	})

	// 创建并启动SeaweedFS用户态文件系统服务端,对于文件的操作会转发到该server处理。
	// 在NewServer方法内部会调用go-fuse包中的mount方法,将目录mount到Fuse文件系统
	server, err := fuse.NewServer(seaweedFileSystem, dir, fuseMountOptions)
	if err != nil {
		glog.Fatalf("Mount fail: %v", err)
	}
	grace.OnInterrupt(func() {
		unmount.Unmount(dir)
	})

	grpcS := pb.NewGrpcServer()
	mount_pb.RegisterSeaweedMountServer(grpcS, seaweedFileSystem)
	reflection.Register(grpcS)
	go grpcS.Serve(montSocketListener)

	seaweedFileSystem.StartBackgroundTasks()

	fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)

	// 启动用户态文件系统服务端,启动后会轮询接收文件操作请求,并执行对应的文件操作Handler
	server.Serve()

	return true
}
           

注册用户态文件系统

源码链接:https://github.com/hanwen/go-fuse/blob/master/fuse/mount_linux.go

go-fuse中Mount流程如下

  1. 执行open操作,拿到fd(文件描述符)
  2. 构造mount请求参数,fd传入参数
  3. 执行mount操作,在fuse内核中注册用户态文件系统
  4. return fd。这里的fd会保存在内存中,通过read fd与fuse内核交互。
func mountDirect(mountPoint string, opts *MountOptions, ready chan<- error) (fd int, err error) {
	// open /dev/fuse 字符设备,拿到fd
	fd, err = syscall.Open("/dev/fuse", os.O_RDWR, 0) // use syscall.Open since we want an int fd
	if err != nil {
		return
	}

	// managed to open dev/fuse, attempt to mount
	source := opts.FsName
	if source == "" {
		source = opts.Name
	}

	var flags uintptr
	flags |= syscall.MS_NOSUID | syscall.MS_NODEV

	// 将fd传入mount请求参数
	// some values we need to pass to mount, but override possible since opts.Options comes after
	var r = []string{
		fmt.Sprintf("fd=%d", fd),
		"rootmode=40000",
		"user_id=0",
		"group_id=0",
	}
	r = append(r, opts.Options...)

	if opts.AllowOther {
		r = append(r, "allow_other")
	}

	// 执行mount操作
	err = syscall.Mount(opts.FsName, mountPoint, "fuse."+opts.Name, opts.DirectMountFlags, strings.Join(r, ","))
	if err != nil {
		syscall.Close(fd)
		return
	}

	// success
	close(ready)
	return
}
           

与Fuse内核交互

用户态和内核态的交互是通过用户态不断地轮询读取/dev/fuse设备实现的。当用户态文件系统通过open /dev/fuse拿到的文件描述符读取/dev/fuse文件时,VFS会将请求转发给Fuse内核处理,Fuse内核从pending队列中取出一个文件操作请求返回给用户态,并将请求从pending队列删除,加入processing队列,如果当前pending队列为空则该read调用会阻塞。

  • go-fuse源码:轮询获取文件操作请求,并异步(或同步)处理

源码链接:https://github.com/hanwen/go-fuse/blob/master/fuse/server.go

// 用户态文件系统服务端启动后会启动一个协程执行loop方法
func (ms *Server) loop(exitIdle bool) {
	defer ms.loops.Done()
exit:
	for {
		// 获取请求。
		// ms.readRequest()函数内部通过read系统调用syscall.Read(fd, buff)读取注册用户态文件系统时返回的文件描述符fd。
		// 该系统调用会由fuse内核的fuse_dev_read()函数处理。内核调用链路 sys_read() -> vfs_read() -> do_sync_read() -> fuse_dev_read() -> fuse_dev_do_read()
		req, errNo := ms.readRequest(exitIdle)
		switch errNo {
		case OK:
			if req == nil {
				break exit
			}
		case ENOENT:
			continue
		case ENODEV:
			// unmount
			if ms.opts.Debug {
				log.Printf("received ENODEV (unmount request), thread exiting")
			}
			break exit
		default: // some other error?
			log.Printf("Failed to read from fuse conn: %v", errNo)
			break exit
		}

		// 处理请求
		if ms.singleReader {
			go ms.handleRequest(req)
		} else {
			ms.handleRequest(req)
		}
	}
}
           
  • linux内核fuse_dev_do_read
  1. 等待队列有请求
  2. 从pending队列获取请求
  3. 将请求内容copy到用户态 4. 将请求移动到processing队列中
static ssize_t fuse_dev_do_read(struct fuse_dev *fud, struct file *file,
                                struct fuse_copy_state *cs, size_t nbytes)
{
        ssize_t err;
        struct fuse_conn *fc = fud->fc;
        struct fuse_iqueue *fiq = &fc->iq;
        struct fuse_pqueue *fpq = &fud->pq;
        struct fuse_req *req;
        struct fuse_in *in;
        unsigned reqsize;

 restart:
        spin_lock(&fiq->waitq.lock);
        err = -EAGAIN;
        if ((file->f_flags & O_NONBLOCK) && fiq->connected &&
            !request_pending(fiq))
                goto err_unlock;

        err = wait_event_interruptible_exclusive_locked(fiq->waitq,
                                !fiq->connected || request_pending(fiq));
        if (err)
                goto err_unlock;

        err = -ENODEV;
        if (!fiq->connected)
                goto err_unlock;

        if (!list_empty(&fiq->interrupts)) {
                req = list_entry(fiq->interrupts.next, struct fuse_req,
                                 intr_entry);
                return fuse_read_interrupt(fiq, cs, nbytes, req);
        }

        if (forget_pending(fiq)) {
                if (list_empty(&fiq->pending) || fiq->forget_batch-- > 0)
                        return fuse_read_forget(fc, fiq, cs, nbytes);

                if (fiq->forget_batch <= -8)
                        fiq->forget_batch = 16;
        }
        // 从pending队列中获取请求
        req = list_entry(fiq->pending.next, struct fuse_req, list);
        clear_bit(FR_PENDING, &req->flags);
        list_del_init(&req->list);
        spin_unlock(&fiq->waitq.lock);

        in = &req->in;
        reqsize = in->h.len;

        if (task_active_pid_ns(current) != fc->pid_ns) {
                rcu_read_lock();
                in->h.pid = pid_vnr(find_pid_ns(in->h.pid, fc->pid_ns));
                rcu_read_unlock();
        }

        /* If request is too large, reply with an error and restart the read */
        if (nbytes < reqsize) {
                req->out.h.error = -EIO;
                /* SETXATTR is special, since it may contain too large data */
                if (in->h.opcode == FUSE_SETXATTR)
                        req->out.h.error = -E2BIG;
                request_end(fc, req);
                goto restart;
        }
        spin_lock(&fpq->lock);
        list_add(&req->list, &fpq->io);
        spin_unlock(&fpq->lock);
        cs->req = req;
				// 将请求copy到用户态
        err = fuse_copy_one(cs, &in->h, sizeof(in->h));
        if (!err)
                err = fuse_copy_args(cs, in->numargs, in->argpages,
                                     (struct fuse_arg *) in->args, 0);
        fuse_copy_finish(cs);
        spin_lock(&fpq->lock);
        clear_bit(FR_LOCKED, &req->flags);
        if (!fpq->connected) {
                err = -ENODEV;
                goto out_end;
        }
        if (err) {
                req->out.h.error = -EIO;
                goto out_end;
        }
        if (!test_bit(FR_ISREPLY, &req->flags)) {
                err = reqsize;
                goto out_end;
        }
				// 将req放到processing队列中
        list_move_tail(&req->list, &fpq->processing);
        spin_unlock(&fpq->lock);
        set_bit(FR_SENT, &req->flags);
        /* matches barrier in request_wait_answer() */
        smp_mb__after_atomic();
        if (test_bit(FR_INTERRUPTED, &req->flags))
                queue_interrupt(fiq, req);

        return reqsize;

out_end:
        if (!test_bit(FR_PRIVATE, &req->flags))
                list_del_init(&req->list);
        spin_unlock(&fpq->lock);
        request_end(fc, req);
        return err;

 err_unlock:
        spin_unlock(&fiq->waitq.lock);
        return err;
}
           

用户态文件系统如何与SeaweedFS交互

首先用户态文件系统(WeedFS)实现了一系列的文件操作Handler,用于处理从Fuse内核中拿到的文件操作请求。下面我们以link操作的Handler举例讲解用户态文件系统的实现。

https://github.com/seaweedfs/seaweedfs/tree/master/weed/mount

  1. 在WeedFS中维护的inode->path的Map中查找文件path
  2. 通过文件path在本地缓存中查找文件元数据
  3. 通过filer客户端发起gRPC调用,将文件元数据保存到远程SeaweedFS集群
  4. 更新本地元数据缓存
  5. 更新本地inode->path映射
func (wfs *WFS) Link(cancel <-chan struct{}, in *fuse.LinkIn, name string, out *fuse.EntryOut) (code fuse.Status) {

	if wfs.IsOverQuota {
		return fuse.Status(syscall.ENOSPC)
	}

	if s := checkName(name); s != fuse.OK {
		return s
	}

	// 从WeedFS维护的inode->path的Map中,根据NodeId查找文件path
	newParentPath, code := wfs.inodeToPath.GetPath(in.NodeId)
	if code != fuse.OK {
		return
	}
	oldEntryPath, code := wfs.inodeToPath.GetPath(in.Oldnodeid)
	if code != fuse.OK {
		return
	}
	oldParentPath, _ := oldEntryPath.DirAndName()

	// 从本地缓存中读取文件元数据
	oldEntry, status := wfs.maybeLoadEntry(oldEntryPath)
	if status != fuse.OK {
		return status
	}

	// update old file to hardlink mode
	if len(oldEntry.HardLinkId) == 0 {
		oldEntry.HardLinkId = filer.NewHardLinkId()
		oldEntry.HardLinkCounter = 1
	}
	oldEntry.HardLinkCounter++
	updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
		Directory:  oldParentPath,
		Entry:      oldEntry,
		Signatures: []int32{wfs.signature},
	}

	// CreateLink 1.2 : update new file to hardlink mode
	oldEntry.Attributes.Mtime = time.Now().Unix()
	request := &filer_pb.CreateEntryRequest{
		Directory: string(newParentPath),
		Entry: &filer_pb.Entry{
			Name:            name,
			IsDirectory:     false,
			Attributes:      oldEntry.Attributes,
			Chunks:          oldEntry.Chunks,
			Extended:        oldEntry.Extended,
			HardLinkId:      oldEntry.HardLinkId,
			HardLinkCounter: oldEntry.HardLinkCounter,
		},
		Signatures: []int32{wfs.signature},
	}

	// 通过Filer客户端与远程的SeaweedFS集群Filer节点交互
	// apply changes to the filer, and also apply to local metaCache
	err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {

		wfs.mapPbIdFromLocalToFiler(request.Entry)
		defer wfs.mapPbIdFromFilerToLocal(request.Entry)
		
		// 发起gRPC调用,更新文件元数据信息
		if err := filer_pb.UpdateEntry(client, updateOldEntryRequest); err != nil {
			return err
		}
		// 更新WeedFS中的元数据
		wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(updateOldEntryRequest.Directory, updateOldEntryRequest.Entry))
		// 发起gRPC调用,写入Link文件元数据信息
		if err := filer_pb.CreateEntry(client, request); err != nil {
			return err
		}
		// 更新WeedFS中的元数据缓存
		wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))

		return nil
	})

	newEntryPath := newParentPath.Child(name)

	if err != nil {
		glog.V(0).Infof("Link %v -> %s: %v", oldEntryPath, newEntryPath, err)
		return fuse.EIO
	}
	
  // 更新inode->Path的Map
	wfs.inodeToPath.AddPath(oldEntry.Attributes.Inode, newEntryPath)
  // 将操作结果回写到response中
	wfs.outputPbEntry(out, oldEntry.Attributes.Inode, request.Entry)

	return fuse.OK
}
           

总结

SeaweedFS基于Fuse用户态文件系统实现了Mount功能,其实现主要包括Fuse内核、Mount客户端、SeaweedFS服务端三部分。客户端基于go-fuse实现了用户态文件系统(WeedFS),并在用户态文件系统中维护了inode->path映射、filehandle->inode映射、inode->filehandle映射和元数据缓存等结构,提高了查询性能。Fuse内核注册的/dev/fuse字符设备,打通了Fuse内核与用户态文件系统的通信。SeaweedFS服务端Filer节点提供了基本的文件操作的gRPC API。本文简单讲解了上述三部分各自的实现和相互之间的交互流程,希望读者在阅读完这篇文章后能对SeaweedFS和Fuse文件系统有一个初步的了解。

作者简介

LGW 信也科技基础架构研发专家,主要参与分布式文件存储的研发工作。

来源:微信公众号:拍码场

出处:https://mp.weixin.qq.com/s/42vdVGhodcQATvEkF6Isnw

继续阅读