# Twisted, the Framework of Your Internet # Copyright (C) 2001 Matthew W. Lefkowitz # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # TODO: # failure detection # voluntary delinking # successor list # HyperChord import random import sha import socket import struct from twisted.spread import pb NBIT = 160 # Size (in bits) of Chord identifiers vNodes = 1 # sure hope you dont want more than 256. def between(n, a, b): """Check if n in interval(a, b) on the circle.""" if a == b: return n != a # n is the only node not in (n, n) elif a < b: return n > a and n < b else: return n > a or n < b def betweenL(n, a, b): """Check if n in interval[a, b) on the circle.""" if a == b and n == a: return 1 elif a < b: return n >= a and n < b else: return n >= a or n < b def betweenR(n, a, b): """Check if n in interval(a, b] on the circle.""" if a == b and n == a: return 1 elif a < b: return n > a and n <= b else: return n > a or n <= b class Node(pb.Copyable, pb.Perspective): """A node in a Chord network.""" def __init__(self, address, perspectiveName, identityName, port=pb.portno): global vNodes pb.Perspective.__init__(self, perspectiveName, identityName) self.id = sha.new(socket.inet_aton(address) + struct.pack('!h', port) + chr(vNodes)).digest() self.address = (address, port, vNodes) vNodes = vNodes + 1 self.finger = [None] * (NBIT+1) self.lastFixed = 1 # Needed for fixFingers self.pred = None def getStateToCopyFor(self, persp): return {"id": self.id, "address": self.address} def __repr__(self): return "" def join(self, n2): """Intialize finger tables of local node. n2 is a node already on the network or None if we're the first.""" if n2: n2.findSuccessor(self.id).addCallback(lambda x, self=self: self.getNodeAt(x).addCallback(self._setFingerCallback, callbackArgs=(1,))) self.finger[1].notify(self.address, pbanswer=0) self.findPredecessor(self.id).addCallback(self._setPredCallback) for i in range(1, NBIT): if betweenL(self.id, self.start(i+1), self.finger[i].id): self.finger[i+1] = self.finger[i] else: n2.findSuccessor(self.start(i+1)).addCallback(lambda x, self=self, i=i: self.getNodeAt(x).addCallback(self._setFingerCallback, callbackArgs=(i+1,))) n2.notify(self.address, pbanswer=0) else: self.finger[1] = None self.pred = None def _setFingerCallback(self, x, i): #CRUM! DEFEATED BY DISTANCE self.finger[i] = x def _setPredCallback(self, n): #ALSO TOO FAR! self.pred = n def perspective_getSuccessor(self): return self.finger[1] def perspective_getPredecessor(self): return self.pred def perspective_findSuccessor(self, id): self.findPredecessor(id).addCallback(self.findSuccessor_1) def findSuccessor_1(self, n2): n2.getSuccessor().addCallbacks(self.findSuccessor_2, callbackArgs=n2) def findSuccessor_2(self, address, n2): if address: return address else: return n2.address def findPredecessor(self, id): if self.finger[1] is None: return self n2 = self return self.findPredecessor_1(n2, id) def findPredecessor_1(self, n2, id): n2.getSuccessor().addCallback(self.findPredecessor_2, n2, id) def findPredecessor_2(self, n3, n2, id): if not betweenR(id, n2.id, n3.id): n2.closestPrecedingFinger(id).addCallback(lambda a, id=id, self=self: self.getNodeAt(a).addCallback(lambda n, id=id, self=self: self.findPredecessor_1(n, id))) else: return n2 def perspective_closestPrecedingFinger(self, id): for i in xrange(NBIT, 0, -1): if not self.finger[i]: continue if between(self.finger[i].id, self.id, id): return self.finger[i].address return self.address def getNodeAt(self, address): return pb.connect(address[0], address[1], "chord", "", "chord").addCallback(lambda n: n.getSelf()) def getSelf(self): # necessary to produce proper Copyable behaviour return self def stabilise(self): """Verify our immediate successor and tell them about us. Called periodically.""" self.finger[1].getPredecessor().addCallback(self.stabilise_1) def stabilise_1(self, p): if p != self.address: self.perspective_notify(p) def fixFingers(self): """Refresh a random finger table entry. Called periodically.""" i = random.randrange(1, len(self.finger)+1) self.finger[i] = self.findSuccessor(self.start(i)) def perspective_notify(self, addr): """n thinks it might be our predecessor.""" self.getNodeAt(addr).addCallback(self.notify) def notify(self, n): if (self.pred is self or self.pred is None or between(n.id, self.pred.id, self.id)): self.pred = n n.notify(self.address, pbanswer=0) if self.finger[1] is None or between(n.id, self.id, self.finger[1].id): self.finger[1] = n n.notify(self.address) for i in xrange(2, len(self.finger)): if self.finger[i] is None or betweenL(n.id, self.start(i), self.finger[i].id): self.finger[i] = n def start(self, k): assert 1 <= k <= NBIT r = (self.id + 2L**(k-1)) % 2L**NBIT if r == 0: return 2L**NBIT else: return r class ChordService(pb.Service): def __init__(self, address, port, serviceName, application=None): pb.Service.__init__(self, serviceName, application) self.address = address self.portno = port def createPerspective(self, name): p = Node(self.address, name, name, self.portno) self.perspectives[name] = p p.setService(self) return p