P2P Programming Framework – Python

The P2P Framework Implementation (Python version)

This page walks through a Python implementation of the P2P
framework library itself. I will assume that you are familiar with
Python. I will also assume you are familiar with the general
concepts of socket programming, though you may not necessarily have
used the Python networking or threading libraries. The complete
source code may be downloaded here: btpeer.py.

^ TOP

Initializing a peer

Let us first examine how a peer node is initialized. As discussed
above, the overall operation of a node is managed by the Peer
class. The constructor of the class stores the canonical identifier
(name) of the peer, the port on which it listens for connections,
and the maximum size of the list of known peers that the node will
maintain (this can be set to 0 to allow an unlimited number of peers
in the list). The constructor also initializes several fields whose
use is described in the following subsections
shutdown, handlers,
and router. The following definition illustrates the
Python code for accomplishing these tasks:

    def __init__( self, maxpeers, serverport, myid=None, serverhost = None ):
	self.debug = 0

	self.maxpeers = int(maxpeers)
	self.serverport = int(serverport)

        # If not supplied, the host name/IP address will be determined
	# by attempting to connect to an Internet host like Google.
	if serverhost: self.serverhost = serverhost
	else: self.__initserverhost()

        # If not supplied, the peer id will be composed of the host address
        # and port number
	if myid: self.myid = myid
	else: self.myid = '%s:%d' % (self.serverhost, self.serverport)

        # list (dictionary/hash table) of known peers
	self.peers = {}  

        # used to stop the main loop
	self.shutdown = False  

	self.handlers = {}
	self.router = None
    # end constructor

Every peer node performs operations
common to traditional network client and server
applications. First, let us walk through the server-related
operations, as described in the previous section.

^ TOP

The main loop

The main loop of a peer begins by setting up a socket that
listens for incoming connections from other peers. To do this, we
must (1) create the socket object, (2) set its options, (3) bind it
to a port on which to listen for connections, and (4) actually begin
listening for connections. Here is a Python method that accomplishes
this, returning the initialized socket:

    def makeserversocket( self, port, backlog=5 ):
	s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
	s.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
	s.bind( ( '', port ) )
	s.listen( backlog )
	return s

The first statement creates a socket that will communicate using the
IPv4 (AF_INET) protocol with TCP
(SOCK_STREAM). By setting the SO_REUSEADDR
option, the port number of the socket will be immediately reusable
after the socket is closed (otherwise the operating system may prevent
the port from being reused after the server exits, until a certain
amount of time has passed). Then the socket is bound to the specified
port and is set up to receive connections. The backlog
parameter indicates how many incoming connections should be queued
up. A value of 5 is conventionally used, although with a multithreaded
server (as we are building) the parameter’s value is not very
significant.

Having created a server socket, the main loop of a peer loops
continously, accepting connections. When an incoming connection is
accepted, the server will have a new socket object used to send and
receive data on the connection. The main loop then calls a separate
method to handle communication with this connection in a new
thread. A simple for of the main loop would thus look like this:

  s = self.makeserversocket( self.serverport )
  
  while 1:
     clientsock, clientaddr = s.accept()
  
     t = threading.Thread( target = self.__handlepeer, args = [ clientsock ] )
     t.start()

In reality, we also need to handle any errors that may occur in the
process of accepting a connection, and we need to provide a mechanism
so that the loop may somehow be nicely terminated (for example, when
the user indicates that the program should exit). To do this, we set
up the server socket to time out every 2 seconds (an arbitrary choice)
and make the loop termination condition dependent on a boolean
(instance) variable, shutdown. Also, we set up an
exception handler to allow the main loop to be stopped by the user
pressing the “Ctrl”+”Break” (or “Ctrl”+”c”) keys. Here, then, is the
complete mainloop method.

    def mainloop( self ):
	s = self.makeserversocket( self.serverport )
	s.settimeout(2)
	self.__debug( 'Server started: %s (%s:%d)'
		      % ( self.myid, self.serverhost, self.serverport ) )

	while not self.shutdown:
	    try:
		self.__debug( 'Listening for connections...' )
		clientsock, clientaddr = s.accept()
		clientsock.settimeout(None)

		t = threading.Thread( target = self.__handlepeer, args = [ clientsock ] )
		t.start()
	    except KeyboardInterrupt:
		self.shutdown = True
		continue
	    except:
		if self.debug:
		    traceback.print_exc()
		    continue
	# end while loop

	self.__debug( 'Main loop exiting' )
	s.close()
    # end mainloop method

The debug method will output various messages to an appropriate
location – for example, the screen or a log
file. The myid field is the identifier of the peer, and
the serverhost field stores the peer’s IP address (or
host name). These values are initialized by the constructor for the
peer object.

^ TOP

Handling a peer connection

The handlepeer method takes a newly formed peer
connection, reads in a request from it, and dispatches the request
to an appropriate handler (function or method) for processing. The
particular handlers and types of requests will be specified by the
programmer using this framework to implement a particular
protocol. The handlepeer method simply looks for the
appropriate handler for a message, if there is one registered with
the peer object, and calls it.

handlepeer begins by encapsulating the socket
connection in a PeerConnection object, to allow easy
sending/receiving and encoding/decoding of P2P messages in the
system.

	host, port = clientsock.getpeername()
	peerconn = BTPeerConnection( None, host, port, clientsock, debug=False )

Then, handlepeer attempts to receive some data from
the connection and determine what to do with it:

	    msgtype, msgdata = peerconn.recvdata()
	    if msgtype: msgtype = msgtype.upper()
	    if msgtype not in self.handlers:
		self.__debug( 'Not handled: %s: %s' % (msgtype, msgdata) )
	    else:
		self.__debug( 'Handling peer msg: %s: %s' % (msgtype, msgdata) )
		self.handlers[ msgtype ]( peerconn, msgdata )

The handlers field is a dictionary (hash table),
mapping message types (4-character strings) to function pointers. If
the message type has a corresponding entry in the dictionary, the
function pointer is extracted and invoked, passing it the
PeerConnection object and the actual data of the message. Upon
completion of Before returning, handlepeer closes the
connection. Here, then, is the complete definition of the method:

    def __handlepeer( self, clientsock ):
	self.__debug( 'Connected ' + str(clientsock.getpeername()) )

	host, port = clientsock.getpeername()
	peerconn = BTPeerConnection( None, host, port, clientsock, debug=False )
	
	try:
	    msgtype, msgdata = peerconn.recvdata()
	    if msgtype: msgtype = msgtype.upper()
	    if msgtype not in self.handlers:
		self.__debug( 'Not handled: %s: %s' % (msgtype, msgdata) )
	    else:
		self.__debug( 'Handling peer msg: %s: %s' % (msgtype, msgdata) )
		self.handlers[ msgtype ]( peerconn, msgdata )
	except KeyboardInterrupt:
	    raise
	except:
	    if self.debug:
		traceback.print_exc()
	
	self.__debug( 'Disconnecting ' + str(clientsock.getpeername()) )
	peerconn.close()

    # end handlepeer method

^ TOP

Routing and sending messages

Using the addrouter method, the programmer may
register a routing function (or method) with the Peer class to help
decide how messages should be forwarded, given a destination peer
id. The routing function should expect as a paremeter the name of a
peer (which may not necessarily be present in the list of known
peers of the node), and decide which of the known peer the message
should be routed to next in order to (hopefully) reach the desired
peer. The routing function should return a tuple of three values:
(next-peer-id, host, port) where the host and port are the IP
address of the peer identified by next-peer-id. If the message
cannot be routed, the next-peer-id should be None.

The sendtopeer method takes a message type and data,
along with a destination peer id, and uses the routing function to
decide where to send the message next. If no routing function has
been registered by the programmer, or if the routing function fails
for some reason, the method fails. If the routing function
successfully returns the next host/port combination to which the
message should be sent, sendtopeer calls
the connectandsend method to actually make the
connection to the peer, package up, and send the data. If the
programmer desires to receive a response from the next peer before
the communication socket is closed, it will be returned by these
methods.

    def sendtopeer( self, peerid, msgtype, msgdata, waitreply=True ):
	if self.router:
	    nextpid, host, port = self.router( peerid )
	if not self.router or not nextpid:
	    self.__debug( 'Unable to route %s to %s' % (msgtype, peerid) )
	    return None
	return self.connectandsend( host, port, msgtype, msgdata, pid=nextpid,
				    waitreply=waitreply )

    # end sendtopeer method

The connectandsend method connects and sends a message
to a peer at the specified IP address and port. The host’s reply, if
desired by the caller, will be returned as a list of pairs, where
each pair contains (1) the type and (2) the actual data of the
message.

    def connectandsend( self, host, port, msgtype, msgdata, pid=None, waitreply=True ):
	msgreply = []   # list of replies
	try:
	    peerconn = BTPeerConnection( pid, host, port, debug=self.debug )
	    peerconn.senddata( msgtype, msgdata )
	    self.__debug( 'Sent %s: %s' % (pid, msgtype) )
	    
	    if waitreply:
		onereply = peerconn.recvdata()
		while (onereply != (None,None)):
		    msgreply.append( onereply )
		    self.__debug( 'Got reply %s: %s' % ( pid, str(msgreply) ) )
		    onereply = peerconn.recvdata()
	    peerconn.close()
	except KeyboardInterrupt:
	    raise
	except:
	    if self.debug:
		traceback.print_exc()
	
	return msgreply

    # end connectsend method

^ TOP

Additional methods

The Peer class provides methods supporting other fundamental
functionalities of a peer node. Briefly, these include:

  • startstabilizer(stabilizer, delay):
    Runs the provided ‘stabilizer’ function in a separate thread,
    activating it repeatedly after every delay seconds, until
    the shutdown flag of the Peer object is set.
  • addhandler(msgtype, handler):
    Registers a handler function for the given message type with the
    Peer object. Only one handler function may be provided per message
    type. Message types do not have to be defined in advance of
    calling this method.
  • addrouter(router):
    Registers a routing function with this peer. Read the
    section on routing above for details.
  • addpeer(peerid, host, port):
    Adds a peer name and IP address/port mapping to the known list of
    peers.
  • getpeer(peerid): Returns the (host,port)
    pair for the given peer name.
  • removepeer(peerid):
    Removes the entry corresponding to the supplied peerid from the
    list of known pairs.
  • addpeerat(loc, peerid, host, port)/
    getpeerat(loc)/
    removepeerat(loc):
    Analogous to the prior three
    methods, except that they access direct (numeric) positions within
    the list of peers (as opposed to using the peerid as a hash
    key). These functions should not be used concurrently with the
    prior three.
  • getpeerids(): Return a list of all known
    peer ids.
  • numberofpeers():
  • maxpeersreached():
  • checklivepeers(): Attempt to connect and
    send a ‘PING’ message to all currently known peers to ensure that
    they are still alive. For any connections that fail, the
    corresponding entry is removed from the list. This method can be
    used as a simple ‘stabilizer’ function for a P2P protocol.

^ TOP

The PeerConnection class

The PeerConnection class encapsulates a socket connection to a peer
node. An object of this class may be initialized with an
already-connected socket, or with an IP address/port combination
that will be used to open a new socket connection.