bug 193779:
catch EOFError in auth, and turn it into an auth exception. add a unit
test to verify.
This commit is contained in:
Robey Pointer 2008-03-22 19:57:51 -07:00
parent 7b819f0e91
commit e5a1b4bf56
2 changed files with 19 additions and 3 deletions

View File

@ -158,7 +158,7 @@ class AuthHandler (object):
event.wait(0.1) event.wait(0.1)
if not self.transport.is_active(): if not self.transport.is_active():
e = self.transport.get_exception() e = self.transport.get_exception()
if e is None: if (e is None) or issubclass(e.__class__, EOFError):
e = AuthenticationException('Authentication failed.') e = AuthenticationException('Authentication failed.')
raise e raise e
if event.isSet(): if event.isSet():

View File

@ -25,7 +25,8 @@ import threading
import unittest import unittest
from paramiko import Transport, ServerInterface, RSAKey, DSSKey, \ from paramiko import Transport, ServerInterface, RSAKey, DSSKey, \
SSHException, BadAuthenticationType, InteractiveQuery, ChannelException SSHException, BadAuthenticationType, InteractiveQuery, ChannelException, \
AuthenticationException
from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
from loop import LoopSocket from loop import LoopSocket
@ -67,6 +68,8 @@ class NullServer (ServerInterface):
return AUTH_SUCCESSFUL return AUTH_SUCCESSFUL
if (username == 'non-utf8') and (password == '\xff'): if (username == 'non-utf8') and (password == '\xff'):
return AUTH_SUCCESSFUL return AUTH_SUCCESSFUL
if username == 'bad-server':
raise Exception("Ack!")
return AUTH_FAILED return AUTH_FAILED
def check_auth_publickey(self, username, key): def check_auth_publickey(self, username, key):
@ -147,7 +150,7 @@ class AuthTest (unittest.TestCase):
self.assert_(False) self.assert_(False)
except: except:
etype, evalue, etb = sys.exc_info() etype, evalue, etb = sys.exc_info()
self.assert_(issubclass(etype, SSHException)) self.assert_(issubclass(etype, AuthenticationException))
self.tc.auth_password(username='slowdive', password='pygmalion') self.tc.auth_password(username='slowdive', password='pygmalion')
self.verify_finished() self.verify_finished()
@ -213,3 +216,16 @@ class AuthTest (unittest.TestCase):
remain = self.tc.auth_password('non-utf8', '\xff') remain = self.tc.auth_password('non-utf8', '\xff')
self.assertEquals([], remain) self.assertEquals([], remain)
self.verify_finished() self.verify_finished()
def test_8_auth_gets_disconnected(self):
"""
verify that we catch a server disconnecting during auth, and report
it as an auth failure.
"""
self.start_server()
self.tc.connect(hostkey=self.public_host_key)
try:
remain = self.tc.auth_password('bad-server', 'hello')
except:
etype, evalue, etb = sys.exc_info()
self.assert_(issubclass(etype, AuthenticationException))