FindNodeを実装

  • MainLine DHT は、UDPで通信

通信ライブラリとして、hetimanetを使用しています。APIのインターフェイスは他の通信ライブラリと大きな違いはありません。なので、お使いのものと読み替えて読み進めてください。

RootingTableに、FindeNodeの機能を追加する

所持しているPeerInfoを指定されたKIDでソートして探す。

class KRootingTable {
...
...
...
List<KPeerInfo> findNode(KId id) {
List<KPeerInfo> ids = [];
for (KBucket b in _kBuckets) {
for (KPeerInfo i in b.iterable) {
ids.add(i);
}
}
ids.sort((KPeerInfo a, KPeerInfo b) {
return a.id.xor(id).compareTo(b.id.xor(id));
});
List<KPeerInfo> ret = [];
for (KPeerInfo p in ids) {
ret.add(p);
if (ret.length >= _kBucketSize) {
return ret;
}
}
return ret;
}
...
...
}

(1) KNodeはUDPサーバー機能を持つ。

まずは、UDPを用いて、通信部分を書いてみましょう。Torrentの仕様では、DHTとして動作するPeerをNodeと読んでいます。 DHTの通信を行う主体として、KNode class を定義することにします。

UDP serverは、メッセージを受け取るメッセージはbencodeなのでした。パースしてそのMessageを使うclassに渡します。

class KNode {
bool _isStart = false;
bool get isStart => _isStart;
HetiSocketBuilder _socketBuilder = null;
HetiUdpSocket _udpSocket = null;
KNode(HetiSocketBuilder socketBuilder) {
this._socketBuilder = socketBuilder;
}
Future start({String ip: "0.0.0.0", int port: 28080}) async {
(_isStart != false ? throw "already started" : 0);
_udpSocket = this._socketBuilder.createUdpClient();
return _udpSocket.bind(ip, port, multicast: true).then((int v) {
_udpSocket.onReceive().listen((HetiReceiveUdpInfo info) {
KrpcMessage.decode(info.data, this).then((KrpcMessage message) {
onReceiveMessage(info, message);
});
});
_isStart = true;
});
}
Future stop() async {
if (_isStart == false || _udpSocket == null) {
return null;
}
return _udpSocket.close().whenComplete(() {
_isStart = false;
_ai.stop(this);
});
}
}

(2) Krpc Messageをパースする機能を持つ

MainLine DHT では、PeerとPeerの通信には、Bencodeが利用されます。 すでにBencodeのパーサーは作成ずみなので、難しいことはないはずです。

class KrpcMessage {
KrpcMessage.fromMap(Map map) {
_messageAsMap = map;
}
static Future<KrpcMessage> decode(List<int> data) async {
Map<String, Object> messageAsMap = null;
try {
Object v = Bencode.decode(data);
messageAsMap = v;
} catch (e) {
throw {};
}
return new KrpcMessage.fromMap(messageAsMap);
}
}

こんな感じです。あとは、必要に応じて、パースした結果を読み取るだけです。

class KrpcMessage {
...
...
List<int> get transactionId => _messageAsMap["t"]);
String get transactionIdAsString => UTF8.decode(transactionId);
//
List<int> get messageType => _messageAsMap["y"];
String get messageTypeAsString => UTF8.decode(messageType);
//
List<int> get query => _messageAsMap["q"];
...
...
}

こんな感じです。BEP5のスペックをみると結構ありますがも気長にコーディングしていけば、半日くらいで終わると思います。

(3) メッセージを送信する機能を持つ

class KNode {
..
..
sendMessage(KrpcMessage message, String ip, int port) {
return _udpSocket.send(message.messageAsBencode, ip, port);
}
..
}
class FindNode {
static int queryID = 0;
static KrpcMessage createQuery(List<int> queryingNodesId, List<int> targetNodeId) {
List<int> transactionId = UTF8.encode("fi${queryID++}");
return new KrpcMessage.fromMap({"a": {"id": queryingNodesId, "target": targetNodeId}, "q": "find_node", "t": transactionId, "y": "q"});
}
static KrpcMessage createResponse(List<int> compactNodeInfo, List<int> queryingNodesId, List<int> transactionId) {
return new KrpcMessage.fromMap({"r": {"id": queryingNodesId, "nodes": compactNodeInfo}, "t": transactionId, "y": "r"});
}
}

メッセージの送信は、受信用に作成したUDPSocketを利用します。こうすることで、受信相手に、ポート番号を伝えることができます。

(4) ネットワークへの参加用のコードを書く

RootingTableに所持しているデータの中から、自分自信ともっとも近いKIDをもつPeerへFindNodeクエリを送信する

class KNodeWorkFindNode {
...
...
updateP2PNetworkWithoutClear(KNode node) {
node.rootingtable.findNode(node.nodeId).then((List<KPeerInfo> infos) {
for (KPeerInfo info in infos) {
if (!_findNodesInfo.rawsequential.contains(info)) {
_findNodesInfo.addLast(info);
node.sendFindNodeQuery(info.ipAsString, info.port, node.nodeId.value).catchError((_) {});
}
}
});
}
...
...
}

レスポンスを受けとったら、もう一度繰り返す

class KNodeWorkFindNode {
...
...
onReceiveQuery(KNode node, HetiReceiveUdpInfo info, KrpcMessage query) {
if (query.queryAsString == KrpcMessage.QUERY_FIND_NODE) {
KrpcFindNode findNode = query.toFindNode();
return node.rootingtable.findNode(findNode.targetAsKId).then((List<KPeerInfo> infos) {
return node.sendFindNodeResponse(info.remoteAddress, info.remotePort, query.transactionId, KPeerInfo.toCompactNodeInfos(infos)).catchError((_) {});
});
}
node.rootingtable.update(new KPeerInfo(info.remoteAddress, info.remotePort, query.nodeIdAsKId));
updateP2PNetworkWithoutClear(node);
}
...
..
}

一定時間だったら、もう一度アクセスする

class KNodeWorkFindNode {
...
...
onTicket(KNode node) {
_findNodesInfo.clear();
updateP2PNetworkWithoutClear(node);
}

(5) FindeNodeクエリに対応したレスポンスを返せるようにしよう

class KNodeWorkFindNode {
...
...
onReceiveResponse(KNode node, HetiReceiveUdpInfo info, KrpcMessage response) {
if (response.queryFromTransactionId == KrpcMessage.QUERY_FIND_NODE) {
KrpcFindNode findNode = response.toFindNode();
for (KPeerInfo info in findNode.compactNodeInfoAsKPeerInfo) {
node.rootingtable.update(info);
}
}
node.rootingtable.update(new KPeerInfo(info.remoteAddress, info.remotePort, response.nodeIdAsKId));
updateP2PNetworkWithoutClear(node);
}
..
}