2009-07-19 22:45:02 -04:00
|
|
|
# Copyright (C) 2003-2009 Robey Pointer <robeypointer@gmail.com>
|
2005-02-26 16:12:43 -05:00
|
|
|
#
|
|
|
|
# This file is part of paramiko.
|
|
|
|
#
|
|
|
|
# Paramiko is free software; you can redistribute it and/or modify it under the
|
|
|
|
# terms of the GNU Lesser General Public License as published by the Free
|
|
|
|
# Software Foundation; either version 2.1 of the License, or (at your option)
|
|
|
|
# any later version.
|
|
|
|
#
|
2013-09-28 00:29:18 -04:00
|
|
|
# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
|
2005-02-26 16:12:43 -05:00
|
|
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
|
|
|
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
|
|
|
|
# details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU Lesser General Public License
|
|
|
|
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
|
|
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
|
|
|
|
|
|
|
|
"""
|
|
|
|
A stub SFTP server for loopback SFTP testing.
|
|
|
|
"""
|
|
|
|
|
|
|
|
import os
|
|
|
|
from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \
|
|
|
|
SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED
|
2014-03-07 19:17:19 -05:00
|
|
|
from paramiko.common import o666
|
2005-02-26 16:12:43 -05:00
|
|
|
|
|
|
|
|
|
|
|
class StubServer (ServerInterface):
|
|
|
|
def check_auth_password(self, username, password):
|
|
|
|
# all are allowed
|
|
|
|
return AUTH_SUCCESSFUL
|
|
|
|
|
|
|
|
def check_channel_request(self, kind, chanid):
|
|
|
|
return OPEN_SUCCEEDED
|
|
|
|
|
|
|
|
|
|
|
|
class StubSFTPHandle (SFTPHandle):
|
|
|
|
def stat(self):
|
|
|
|
try:
|
|
|
|
return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno()))
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
|
|
|
|
|
|
|
def chattr(self, attr):
|
|
|
|
# python doesn't have equivalents to fchown or fchmod, so we have to
|
|
|
|
# use the stored filename
|
|
|
|
try:
|
|
|
|
SFTPServer.set_file_attr(self.filename, attr)
|
2006-02-19 22:17:41 -05:00
|
|
|
return SFTP_OK
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
|
|
|
|
|
|
|
|
|
|
|
class StubSFTPServer (SFTPServerInterface):
|
|
|
|
# assume current folder is a fine root
|
|
|
|
# (the tests always create and eventualy delete a subfolder, so there shouldn't be any mess)
|
|
|
|
ROOT = os.getcwd()
|
2005-04-06 03:24:28 -04:00
|
|
|
|
2005-02-26 16:12:43 -05:00
|
|
|
def _realpath(self, path):
|
|
|
|
return self.ROOT + self.canonicalize(path)
|
|
|
|
|
|
|
|
def list_folder(self, path):
|
|
|
|
path = self._realpath(path)
|
|
|
|
try:
|
2014-03-07 23:45:26 -05:00
|
|
|
out = []
|
2005-02-26 16:12:43 -05:00
|
|
|
flist = os.listdir(path)
|
|
|
|
for fname in flist:
|
|
|
|
attr = SFTPAttributes.from_stat(os.stat(os.path.join(path, fname)))
|
|
|
|
attr.filename = fname
|
|
|
|
out.append(attr)
|
|
|
|
return out
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
|
|
|
|
|
|
|
def stat(self, path):
|
|
|
|
path = self._realpath(path)
|
|
|
|
try:
|
|
|
|
return SFTPAttributes.from_stat(os.stat(path))
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
|
|
|
|
|
|
|
def lstat(self, path):
|
|
|
|
path = self._realpath(path)
|
|
|
|
try:
|
|
|
|
return SFTPAttributes.from_stat(os.lstat(path))
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
|
|
|
|
|
|
|
def open(self, path, flags, attr):
|
|
|
|
path = self._realpath(path)
|
|
|
|
try:
|
2014-03-07 23:45:26 -05:00
|
|
|
binary_flag = getattr(os, 'O_BINARY', 0)
|
2006-03-26 19:17:26 -05:00
|
|
|
flags |= binary_flag
|
|
|
|
mode = getattr(attr, 'st_mode', None)
|
|
|
|
if mode is not None:
|
|
|
|
fd = os.open(path, flags, mode)
|
|
|
|
else:
|
2006-09-07 22:28:16 -04:00
|
|
|
# os.open() defaults to 0777 which is
|
|
|
|
# an odd default mode for files
|
2013-10-30 20:14:52 -04:00
|
|
|
fd = os.open(path, flags, o666)
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
|
|
|
if (flags & os.O_CREAT) and (attr is not None):
|
2006-03-26 19:17:26 -05:00
|
|
|
attr._flags &= ~attr.FLAG_PERMISSIONS
|
2005-02-26 16:12:43 -05:00
|
|
|
SFTPServer.set_file_attr(path, attr)
|
|
|
|
if flags & os.O_WRONLY:
|
2006-08-21 22:56:01 -04:00
|
|
|
if flags & os.O_APPEND:
|
|
|
|
fstr = 'ab'
|
|
|
|
else:
|
|
|
|
fstr = 'wb'
|
2005-02-26 16:12:43 -05:00
|
|
|
elif flags & os.O_RDWR:
|
2006-08-21 22:56:01 -04:00
|
|
|
if flags & os.O_APPEND:
|
|
|
|
fstr = 'a+b'
|
|
|
|
else:
|
|
|
|
fstr = 'r+b'
|
2005-02-26 16:12:43 -05:00
|
|
|
else:
|
|
|
|
# O_RDONLY (== 0)
|
2006-03-26 19:17:26 -05:00
|
|
|
fstr = 'rb'
|
2005-02-26 16:12:43 -05:00
|
|
|
try:
|
|
|
|
f = os.fdopen(fd, fstr)
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
2006-08-21 22:56:01 -04:00
|
|
|
fobj = StubSFTPHandle(flags)
|
2005-02-26 16:12:43 -05:00
|
|
|
fobj.filename = path
|
|
|
|
fobj.readfile = f
|
|
|
|
fobj.writefile = f
|
|
|
|
return fobj
|
|
|
|
|
|
|
|
def remove(self, path):
|
|
|
|
path = self._realpath(path)
|
|
|
|
try:
|
|
|
|
os.remove(path)
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
|
|
|
return SFTP_OK
|
|
|
|
|
|
|
|
def rename(self, oldpath, newpath):
|
|
|
|
oldpath = self._realpath(oldpath)
|
|
|
|
newpath = self._realpath(newpath)
|
|
|
|
try:
|
|
|
|
os.rename(oldpath, newpath)
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
|
|
|
return SFTP_OK
|
|
|
|
|
|
|
|
def mkdir(self, path, attr):
|
|
|
|
path = self._realpath(path)
|
|
|
|
try:
|
|
|
|
os.mkdir(path)
|
|
|
|
if attr is not None:
|
|
|
|
SFTPServer.set_file_attr(path, attr)
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
|
|
|
return SFTP_OK
|
|
|
|
|
|
|
|
def rmdir(self, path):
|
|
|
|
path = self._realpath(path)
|
|
|
|
try:
|
|
|
|
os.rmdir(path)
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
|
|
|
return SFTP_OK
|
|
|
|
|
|
|
|
def chattr(self, path, attr):
|
|
|
|
path = self._realpath(path)
|
|
|
|
try:
|
|
|
|
SFTPServer.set_file_attr(path, attr)
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
|
|
|
return SFTP_OK
|
|
|
|
|
|
|
|
def symlink(self, target_path, path):
|
|
|
|
path = self._realpath(path)
|
|
|
|
if (len(target_path) > 0) and (target_path[0] == '/'):
|
|
|
|
# absolute symlink
|
|
|
|
target_path = os.path.join(self.ROOT, target_path[1:])
|
|
|
|
if target_path[:2] == '//':
|
|
|
|
# bug in os.path.join
|
|
|
|
target_path = target_path[1:]
|
|
|
|
else:
|
|
|
|
# compute relative to path
|
|
|
|
abspath = os.path.join(os.path.dirname(path), target_path)
|
|
|
|
if abspath[:len(self.ROOT)] != self.ROOT:
|
|
|
|
# this symlink isn't going to work anyway -- just break it immediately
|
|
|
|
target_path = '<error>'
|
|
|
|
try:
|
|
|
|
os.symlink(target_path, path)
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
|
|
|
return SFTP_OK
|
|
|
|
|
|
|
|
def readlink(self, path):
|
|
|
|
path = self._realpath(path)
|
|
|
|
try:
|
|
|
|
symlink = os.readlink(path)
|
2013-11-19 11:06:35 -05:00
|
|
|
except OSError as e:
|
2005-02-26 16:12:43 -05:00
|
|
|
return SFTPServer.convert_errno(e.errno)
|
|
|
|
# if it's absolute, remove the root
|
|
|
|
if os.path.isabs(symlink):
|
|
|
|
if symlink[:len(self.ROOT)] == self.ROOT:
|
|
|
|
symlink = symlink[len(self.ROOT):]
|
|
|
|
if (len(symlink) == 0) or (symlink[0] != '/'):
|
|
|
|
symlink = '/' + symlink
|
|
|
|
else:
|
|
|
|
symlink = '<error>'
|
|
|
|
return symlink
|