add unit test for compression
This commit is contained in:
parent
fdf927cffd
commit
93ee12a194
|
@ -540,3 +540,34 @@ class TransportTest (unittest.TestCase):
|
||||||
self.assertNotEquals(self.tc.H, self.tc.session_id)
|
self.assertNotEquals(self.tc.H, self.tc.session_id)
|
||||||
|
|
||||||
schan.close()
|
schan.close()
|
||||||
|
|
||||||
|
def test_G_compression(self):
|
||||||
|
"""
|
||||||
|
verify that zlib compression is basically working.
|
||||||
|
"""
|
||||||
|
host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
|
||||||
|
public_host_key = RSAKey(data=str(host_key))
|
||||||
|
self.ts.add_server_key(host_key)
|
||||||
|
self.ts.get_security_options().compression = ('zlib',)
|
||||||
|
self.tc.get_security_options().compression = ('zlib',)
|
||||||
|
event = threading.Event()
|
||||||
|
server = NullServer()
|
||||||
|
self.ts.start_server(event, server)
|
||||||
|
self.tc.connect(hostkey=public_host_key,
|
||||||
|
username='slowdive', password='pygmalion')
|
||||||
|
event.wait(1.0)
|
||||||
|
self.assert_(event.isSet())
|
||||||
|
self.assert_(self.ts.is_active())
|
||||||
|
|
||||||
|
chan = self.tc.open_session()
|
||||||
|
self.assert_(chan.exec_command('yes'))
|
||||||
|
schan = self.ts.accept(1.0)
|
||||||
|
|
||||||
|
bytes = self.tc.packetizer._Packetizer__sent_bytes
|
||||||
|
chan.send('x' * 1024)
|
||||||
|
bytes2 = self.tc.packetizer._Packetizer__sent_bytes
|
||||||
|
# tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :)
|
||||||
|
self.assert_(bytes2 - bytes < 1024)
|
||||||
|
|
||||||
|
chan.close()
|
||||||
|
schan.close()
|
||||||
|
|
Loading…
Reference in New Issue