#!/usr/bin/env python # Copyright (C) 2003-2007 Robey Pointer # # This file is part of paramiko. # # Paramiko is free software; you can redistribute it and/or modify it under the # terms of the GNU Lesser General Public License as published by the Free # Software Foundation; either version 2.1 of the License, or (at your option) # any later version. # # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. import base64 from binascii import hexlify import os import socket import sys import threading import traceback import paramiko from paramiko.py3compat import b, u, decodebytes # setup logging paramiko.util.log_to_file('demo_server.log') host_key = paramiko.RSAKey(filename='test_rsa.key') #host_key = paramiko.DSSKey(filename='test_dss.key') print('Read key: ' + u(hexlify(host_key.get_fingerprint()))) class Server (paramiko.ServerInterface): # 'data' is the output of base64.encodestring(str(key)) # (using the "user_rsa_key" files) data = b('AAAAB3NzaC1yc2EAAAABIwAAAIEAyO4it3fHlmGZWJaGrfeHOVY7RWO3P9M7hp' + \ 'fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC' + \ 'KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT' + \ 'UWT10hcuO4Ks8=') good_pub_key = paramiko.RSAKey(data=decodebytes(data)) def __init__(self): self.event = threading.Event() def check_channel_request(self, kind, chanid): if kind == 'session': return paramiko.OPEN_SUCCEEDED return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED def check_auth_password(self, username, password): if (username == 'robey') and (password == 'foo'): return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED def check_auth_publickey(self, username, key): print('Auth attempt with key: ' + u(hexlify(key.get_fingerprint()))) if (username == 'robey') and (key == self.good_pub_key): return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED def get_allowed_auths(self, username): return 'password,publickey' def check_channel_shell_request(self, channel): self.event.set() return True def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes): return True # now connect try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('', 2200)) except Exception: e = sys.exc_info()[1] print('*** Bind failed: ' + str(e)) traceback.print_exc() sys.exit(1) try: sock.listen(100) print('Listening for connection ...') client, addr = sock.accept() except Exception: e = sys.exc_info()[1] print('*** Listen/accept failed: ' + str(e)) traceback.print_exc() sys.exit(1) print('Got a connection!') try: t = paramiko.Transport(client) try: t.load_server_moduli() except: print('(Failed to load moduli -- gex will be unsupported.)') raise t.add_server_key(host_key) server = Server() try: t.start_server(server=server) except paramiko.SSHException: print('*** SSH negotiation failed.') sys.exit(1) # wait for auth chan = t.accept(20) if chan is None: print('*** No channel.') sys.exit(1) print('Authenticated!') server.event.wait(10) if not server.event.isSet(): print('*** Client never asked for a shell.') sys.exit(1) chan.send('\r\n\r\nWelcome to my dorky little BBS!\r\n\r\n') chan.send('We are on fire all the time! Hooray! Candy corn for everyone!\r\n') chan.send('Happy birthday to Robot Dave!\r\n\r\n') chan.send('Username: ') f = chan.makefile('rU') username = f.readline().strip('\r\n') chan.send('\r\nI don\'t like you, ' + username + '.\r\n') chan.close() except Exception: e = sys.exc_info()[1] print('*** Caught exception: ' + str(e.__class__) + ': ' + str(e)) traceback.print_exc() try: t.close() except: pass sys.exit(1)