SFTP方式访问Ceph

/ S3JavaCeph / 没有评论 / 462浏览

利用java实现SFTP代理,将SFTP协议转换为S3协议。

思路:通过apache的sshd-core实现接受SFTP协议,实现FileSystem接口,将对本地文件操作转换为对云存储的操作。 github地址:https://github.com/TisDo/sftp-s3 代码片段: 读取文件:

@Override
    public int read(ByteBuffer dst) throws IOException {
        return (int) doRead(Collections.singletonList(dst), -1);
    }

    protected long doRead(List<ByteBuffer> buffers, long position) throws IOException {
        boolean eof = false;
        long curPos = (position >= 0L) ? position : posTracker.get();
        if(is == null){
            is = path.createInputStream(0L);
        }
        if(is.available()==0){
            return -1;
        }
        try {
            long totalRead = 0;
            byte[] bytes= new byte[1024];
            loop:
            for (ByteBuffer buffer : buffers) {
                while (buffer.hasRemaining()) {
                    ByteBuffer wrap = buffer;
                    if (!buffer.hasArray()) {
                        wrap = ByteBuffer.allocate(Math.min(IoUtils.DEFAULT_COPY_SIZE, buffer.remaining()));
                    }
                    int readMax= 1024<wrap.remaining()?1024:wrap.remaining();
                    int read=is.read(bytes,0,readMax);
                    if (read > 0) {
                        buffer.put(bytes,0,read);
                        curPos += read;
                        totalRead += read;
                    } else {
                        eof = read == -1;
                        break loop;
                    }
                }
            }
            if (totalRead > 0) {
                return totalRead;
            }

            if (eof) {
                return -1;
            } else {
                return 0;
            }
        } finally {
            if (position < 0L) {
                posTracker.set(curPos);
            }
        }
    }

实现创建目录

 public void mkdir(Path path){
        if(path instanceof SFTPCephPath){
            String objectName = normalizeDirectory((SFTPCephPath) path);
            try{
                client.putObject(bucketName, objectName, "sftp create");
            }catch (Exception e){
                log.error("create object fail",e);
                throw e;
            }
            log.info("mkdir {} success", objectName);
        }else {
            log.warn("path:{} not a SFTPCephPath",path.getClass().getName());
        }
    }

实现list操作

 public List<Path> list(Path path){
        if (path instanceof SFTPCephPath){
            SFTPCephPath s3Path = (SFTPCephPath) path;
            SFTPCephFileSystem fileSystem = s3Path.getFileSystem();
            String prefix = normalizeDirectory(s3Path);
            ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
            listObjectsRequest.setBucketName(bucketName);
            listObjectsRequest.setPrefix(prefix);
            listObjectsRequest.setDelimiter(PATH_SEPARATOR);
            ObjectListing objects = client.listObjects(listObjectsRequest);
            List<Path> list = new ArrayList<Path>();
            List<String> names = Collections.emptyList();
            int perm = user.isWritepermission() ? 0000200 : 0000400;
            while (true) {
                for (String commonPrefixes : objects.getCommonPrefixes()) {
                    Attributes attributes = new Attributes();
                    attributes.setSize(0);
                    attributes.setOwner(user.getName());
                    attributes.setGroup(user.getName());
                    attributes.setFileType(Attributes.Type.Directory);
                    attributes.setAccessTime(FileTime.from(new Date().getTime(), TimeUnit.MILLISECONDS));
                    attributes.setCreateTime(FileTime.from(new Date().getTime(), TimeUnit.MILLISECONDS));
                    attributes.setModifyTime(FileTime.from(new Date().getTime(), TimeUnit.MILLISECONDS));
                    attributes.setPermissions(perm);
                    String dirName = commonPrefixes;
                    if (dirName.length() > 0 && dirName.charAt(dirName.length() - 1) == '/') {
                        dirName = dirName.substring(0, dirName.length() - 1);
                    }
                    SFTPCephPath sftpCephPath = new SFTPCephPath(fileSystem, dirName.replaceFirst(prefix, ""), names, attributes);
                    list.add(sftpCephPath);
                }
                for (S3ObjectSummary s3ObjectSummary : objects.getObjectSummaries()) {
                    String objectName = s3ObjectSummary.getKey().replaceFirst(prefix, Constants.EMPTY_STRING);
                    if (StringUtils.isNotEmpty(objectName)) {
                        Attributes attributes = new Attributes();
                        attributes.setFileType(Attributes.Type.File);
                        attributes.setOwner(user.getName());
                        attributes.setGroup(user.getName());
                        attributes.setSize(s3ObjectSummary.getSize());
                        attributes.setAccessTime(FileTime.from(s3ObjectSummary.getLastModified().getTime(), TimeUnit.MILLISECONDS));
                        attributes.setCreateTime(FileTime.from(s3ObjectSummary.getLastModified().getTime(), TimeUnit.MILLISECONDS));
                        attributes.setModifyTime(FileTime.from(s3ObjectSummary.getLastModified().getTime(), TimeUnit.MILLISECONDS));
                        attributes.setPermissions(perm);
                        SFTPCephPath sftpCephPath = new SFTPCephPath(fileSystem, objectName, names, attributes);
                        list.add(sftpCephPath);
                    }
                }
                if (list.size() > 1000) {//超过1千不显示
                    break;
                }
                if (objects.isTruncated()) {
                    objects = client.listNextBatchOfObjects(objects);
                } else {
                    break;
                }
            }
            return list;
        }else {
            return Collections.emptyList();
        }
    }