2009-07-19 22:45:02 -04:00
|
|
|
# Copyright (C) 2003-2009 Robey Pointer <robeypointer@gmail.com>
|
2005-05-10 13:36:38 -04:00
|
|
|
#
|
|
|
|
# This file is part of paramiko.
|
|
|
|
#
|
|
|
|
# Paramiko is free software; you can redistribute it and/or modify it under the
|
|
|
|
# terms of the GNU Lesser General Public License as published by the Free
|
|
|
|
# Software Foundation; either version 2.1 of the License, or (at your option)
|
|
|
|
# any later version.
|
|
|
|
#
|
2013-09-28 00:29:18 -04:00
|
|
|
# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
|
2005-05-10 13:36:38 -04:00
|
|
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
|
|
|
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
|
|
|
|
# details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU Lesser General Public License
|
|
|
|
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
|
|
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
|
|
|
|
|
|
|
|
"""
|
|
|
|
Some unit tests for the ssh2 protocol in Transport.
|
|
|
|
"""
|
|
|
|
|
|
|
|
import unittest
|
2013-10-30 19:19:30 -04:00
|
|
|
from tests.loop import LoopSocket
|
2005-05-10 13:36:38 -04:00
|
|
|
from Crypto.Cipher import AES
|
|
|
|
from Crypto.Hash import SHA, HMAC
|
|
|
|
from paramiko import Message, Packetizer, util
|
2013-10-31 13:01:21 -04:00
|
|
|
from paramiko.common import *
|
|
|
|
|
|
|
|
if PY3:
|
|
|
|
x55 = b'\x55'
|
|
|
|
x1f = b'\x1f'
|
|
|
|
else:
|
|
|
|
x55 = '\x55'
|
|
|
|
x1f = '\x1f'
|
|
|
|
|
2005-05-10 13:36:38 -04:00
|
|
|
|
|
|
|
class PacketizerTest (unittest.TestCase):
|
|
|
|
|
|
|
|
def test_1_write (self):
|
|
|
|
rsock = LoopSocket()
|
|
|
|
wsock = LoopSocket()
|
|
|
|
rsock.link(wsock)
|
|
|
|
p = Packetizer(wsock)
|
|
|
|
p.set_log(util.get_logger('paramiko.transport'))
|
|
|
|
p.set_hexdump(True)
|
2013-10-31 13:01:21 -04:00
|
|
|
cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16)
|
|
|
|
p.set_outbound_cipher(cipher, 16, SHA, 12, x1f * 20)
|
2005-05-10 13:36:38 -04:00
|
|
|
|
|
|
|
# message has to be at least 16 bytes long, so we'll have at least one
|
|
|
|
# block of data encrypted that contains zero random padding bytes
|
|
|
|
m = Message()
|
2013-10-30 20:09:34 -04:00
|
|
|
m.add_byte(byte_chr(100))
|
2005-05-10 13:36:38 -04:00
|
|
|
m.add_int(100)
|
|
|
|
m.add_int(1)
|
|
|
|
m.add_int(900)
|
|
|
|
p.send_message(m)
|
|
|
|
data = rsock.recv(100)
|
|
|
|
# 32 + 12 bytes of MAC = 44
|
|
|
|
self.assertEquals(44, len(data))
|
2013-10-31 13:01:21 -04:00
|
|
|
if PY3:
|
|
|
|
self.assertEquals(b'\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0', data[:16])
|
|
|
|
else:
|
|
|
|
self.assertEquals('\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0', data[:16])
|
|
|
|
|
2005-05-10 13:36:38 -04:00
|
|
|
def test_2_read (self):
|
|
|
|
rsock = LoopSocket()
|
|
|
|
wsock = LoopSocket()
|
|
|
|
rsock.link(wsock)
|
|
|
|
p = Packetizer(rsock)
|
|
|
|
p.set_log(util.get_logger('paramiko.transport'))
|
|
|
|
p.set_hexdump(True)
|
2013-10-31 13:01:21 -04:00
|
|
|
cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16)
|
|
|
|
p.set_inbound_cipher(cipher, 16, SHA, 12, x1f * 20)
|
|
|
|
if PY3:
|
|
|
|
wsock.send(b'C\x91\x97\xbd[P\xac%\x87\xc2\xc4k\xc7\xe98\xc0' + \
|
|
|
|
b'\x90\xd2\x16V\rqsa8|L=\xfb\x97}\xe2n\x03\xb1\xa0\xc2\x1c\xd6AAL\xb4Y')
|
|
|
|
else:
|
|
|
|
wsock.send('C\x91\x97\xbd[P\xac%\x87\xc2\xc4k\xc7\xe98\xc0' + \
|
|
|
|
'\x90\xd2\x16V\rqsa8|L=\xfb\x97}\xe2n\x03\xb1\xa0\xc2\x1c\xd6AAL\xb4Y')
|
2005-05-10 13:36:38 -04:00
|
|
|
cmd, m = p.read_message()
|
|
|
|
self.assertEquals(100, cmd)
|
|
|
|
self.assertEquals(100, m.get_int())
|
|
|
|
self.assertEquals(1, m.get_int())
|
|
|
|
self.assertEquals(900, m.get_int())
|