[project @ Arch-1:robey@lag.net--2005-master-shake%paramiko--dev--1--patch-8]

add unit tests for the packetizer, and fix a little locking bug where i think more of the packetizer write function should be inside a lock
This commit is contained in:
Robey Pointer 2005-05-10 17:36:38 +00:00
parent e8efe095a6
commit 366f216e3f
5 changed files with 82 additions and 8 deletions

4
README
View File

@ -226,10 +226,10 @@ v0.9 FEAROW
*** MISSING LINKS *** MISSING LINKS
* ctr forms of ciphers are missing (blowfish-ctr, aes128-ctr, aes256-ctr) * ctr forms of ciphers are missing (blowfish-ctr, aes128-ctr, aes256-ctr)
* server mode needs better documentation
* why are big files so slow to transfer? profiling needed...
* would be nice to have an ftp-like interface to sftp (put, get, chdir...) * would be nice to have an ftp-like interface to sftp (put, get, chdir...)
* why are big files so slow to transfer? profiling needed...
* speed up file transfers! * speed up file transfers!
* what is psyco? * what is psyco?
* make a simple example demonstrating use of SocketServer

View File

@ -66,7 +66,8 @@ __version__ = "1.3 (marowak)"
__license__ = "GNU Lesser General Public License (LGPL)" __license__ = "GNU Lesser General Public License (LGPL)"
import transport, auth_transport, channel, rsakey, dsskey, message, ssh_exception, file import transport, auth_transport, channel, rsakey, dsskey, message
import ssh_exception, file, packet
import sftp, sftp_client, sftp_attr, sftp_file, sftp_handle, sftp_server, sftp_si import sftp, sftp_client, sftp_attr, sftp_file, sftp_handle, sftp_server, sftp_si
randpool = transport.randpool randpool = transport.randpool
@ -89,6 +90,7 @@ ServerInterface = server.ServerInterface
SubsystemHandler = server.SubsystemHandler SubsystemHandler = server.SubsystemHandler
SecurityOptions = transport.SecurityOptions SecurityOptions = transport.SecurityOptions
BufferedFile = file.BufferedFile BufferedFile = file.BufferedFile
Packetizer = packet.Packetizer
from common import AUTH_SUCCESSFUL, AUTH_PARTIALLY_SUCCESSFUL, AUTH_FAILED, \ from common import AUTH_SUCCESSFUL, AUTH_PARTIALLY_SUCCESSFUL, AUTH_FAILED, \
OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED, OPEN_FAILED_CONNECT_FAILED, \ OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED, OPEN_FAILED_CONNECT_FAILED, \

View File

@ -226,13 +226,13 @@ class Packetizer (object):
packet = self._build_packet(data) packet = self._build_packet(data)
if self.__dump_packets: if self.__dump_packets:
self._log(DEBUG, util.format_binary(packet, 'OUT: ')) self._log(DEBUG, util.format_binary(packet, 'OUT: '))
self.__write_lock.acquire()
try:
if self.__block_engine_out != None: if self.__block_engine_out != None:
out = self.__block_engine_out.encrypt(packet) out = self.__block_engine_out.encrypt(packet)
else: else:
out = packet out = packet
# + mac # + mac
self.__write_lock.acquire()
try:
if self.__block_engine_out != None: if self.__block_engine_out != None:
payload = struct.pack('>I', self.__sequence_number_out) + packet payload = struct.pack('>I', self.__sequence_number_out) + packet
out += HMAC.HMAC(self.__mac_key_out, payload, self.__mac_engine_out).digest()[:self.__mac_size_out] out += HMAC.HMAC(self.__mac_key_out, payload, self.__mac_engine_out).digest()[:self.__mac_size_out]

View File

@ -32,6 +32,7 @@ from test_message import MessageTest
from test_file import BufferedFileTest from test_file import BufferedFileTest
from test_pkey import KeyTest from test_pkey import KeyTest
from test_kex import KexTest from test_kex import KexTest
from test_packetizer import PacketizerTest
from test_transport import TransportTest from test_transport import TransportTest
from test_sftp import SFTPTest from test_sftp import SFTPTest
@ -87,6 +88,7 @@ suite.addTest(unittest.makeSuite(BufferedFileTest))
if options.use_pkey: if options.use_pkey:
suite.addTest(unittest.makeSuite(KeyTest)) suite.addTest(unittest.makeSuite(KeyTest))
suite.addTest(unittest.makeSuite(KexTest)) suite.addTest(unittest.makeSuite(KexTest))
suite.addTest(unittest.makeSuite(PacketizerTest))
if options.use_transport: if options.use_transport:
suite.addTest(unittest.makeSuite(TransportTest)) suite.addTest(unittest.makeSuite(TransportTest))
if options.use_sftp: if options.use_sftp:

70
tests/test_packetizer.py Normal file
View File

@ -0,0 +1,70 @@
# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
#
# This file is part of paramiko.
#
# Paramiko is free software; you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 2.1 of the License, or (at your option)
# any later version.
#
# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
"""
Some unit tests for the ssh2 protocol in Transport.
"""
import unittest
from loop import LoopSocket
from Crypto.Cipher import AES
from Crypto.Hash import SHA, HMAC
from paramiko import Message, Packetizer, util
class PacketizerTest (unittest.TestCase):
def test_1_write (self):
rsock = LoopSocket()
wsock = LoopSocket()
rsock.link(wsock)
p = Packetizer(wsock)
p.set_log(util.get_logger('paramiko.transport'))
p.set_hexdump(True)
cipher = AES.new('\x00' * 16, AES.MODE_CBC, '\x55' * 16)
p.set_outbound_cipher(cipher, 16, SHA, 12, '\x1f' * 20)
# message has to be at least 16 bytes long, so we'll have at least one
# block of data encrypted that contains zero random padding bytes
m = Message()
m.add_byte(chr(100))
m.add_int(100)
m.add_int(1)
m.add_int(900)
p.send_message(m)
data = rsock.recv(100)
# 32 + 12 bytes of MAC = 44
self.assertEquals(44, len(data))
self.assertEquals('\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0', data[:16])
def test_2_read (self):
rsock = LoopSocket()
wsock = LoopSocket()
rsock.link(wsock)
p = Packetizer(rsock)
p.set_log(util.get_logger('paramiko.transport'))
p.set_hexdump(True)
cipher = AES.new('\x00' * 16, AES.MODE_CBC, '\x55' * 16)
p.set_inbound_cipher(cipher, 16, SHA, 12, '\x1f' * 20)
wsock.send('C\x91\x97\xbd[P\xac%\x87\xc2\xc4k\xc7\xe98\xc0' + \
'\x90\xd2\x16V\rqsa8|L=\xfb\x97}\xe2n\x03\xb1\xa0\xc2\x1c\xd6AAL\xb4Y')
cmd, m = p.read_message()
self.assertEquals(100, cmd)
self.assertEquals(100, m.get_int())
self.assertEquals(1, m.get_int())
self.assertEquals(900, m.get_int())