FindNodeを実装
    MainLine DHT は、UDPで通信
通信ライブラリとして、hetimanetを使用しています。APIのインターフェイスは他の通信ライブラリと大きな違いはありません。なので、お使いのものと読み替えて読み進めてください。

RootingTableに、FindeNodeの機能を追加する

所持しているPeerInfoを指定されたKIDでソートして探す。
1
class KRootingTable {
2
...
3
...
4
...
5
List<KPeerInfo> findNode(KId id) {
6
List<KPeerInfo> ids = [];
7
for (KBucket b in _kBuckets) {
8
for (KPeerInfo i in b.iterable) {
9
ids.add(i);
10
}
11
}
12
ids.sort((KPeerInfo a, KPeerInfo b) {
13
return a.id.xor(id).compareTo(b.id.xor(id));
14
});
15
List<KPeerInfo> ret = [];
16
for (KPeerInfo p in ids) {
17
ret.add(p);
18
if (ret.length >= _kBucketSize) {
19
return ret;
20
}
21
}
22
return ret;
23
}
24
...
25
...
26
}
Copied!

(1) KNodeはUDPサーバー機能を持つ。

まずは、UDPを用いて、通信部分を書いてみましょう。Torrentの仕様では、DHTとして動作するPeerをNodeと読んでいます。 DHTの通信を行う主体として、KNode class を定義することにします。
UDP serverは、メッセージを受け取るメッセージはbencodeなのでした。パースしてそのMessageを使うclassに渡します。
1
class KNode {
2
bool _isStart = false;
3
bool get isStart => _isStart;
4
HetiSocketBuilder _socketBuilder = null;
5
HetiUdpSocket _udpSocket = null;
6
7
KNode(HetiSocketBuilder socketBuilder) {
8
this._socketBuilder = socketBuilder;
9
}
10
11
Future start({String ip: "0.0.0.0", int port: 28080}) async {
12
(_isStart != false ? throw "already started" : 0);
13
_udpSocket = this._socketBuilder.createUdpClient();
14
return _udpSocket.bind(ip, port, multicast: true).then((int v) {
15
_udpSocket.onReceive().listen((HetiReceiveUdpInfo info) {
16
KrpcMessage.decode(info.data, this).then((KrpcMessage message) {
17
onReceiveMessage(info, message);
18
});
19
});
20
_isStart = true;
21
});
22
}
23
24
Future stop() async {
25
if (_isStart == false || _udpSocket == null) {
26
return null;
27
}
28
return _udpSocket.close().whenComplete(() {
29
_isStart = false;
30
_ai.stop(this);
31
});
32
}
33
}
Copied!

(2) Krpc Messageをパースする機能を持つ

MainLine DHT では、PeerとPeerの通信には、Bencodeが利用されます。 すでにBencodeのパーサーは作成ずみなので、難しいことはないはずです。
1
class KrpcMessage {
2
KrpcMessage.fromMap(Map map) {
3
_messageAsMap = map;
4
}
5
6
static Future<KrpcMessage> decode(List<int> data) async {
7
Map<String, Object> messageAsMap = null;
8
try {
9
Object v = Bencode.decode(data);
10
messageAsMap = v;
11
} catch (e) {
12
throw {};
13
}
14
return new KrpcMessage.fromMap(messageAsMap);
15
}
16
}
Copied!
こんな感じです。あとは、必要に応じて、パースした結果を読み取るだけです。
1
class KrpcMessage {
2
...
3
...
4
List<int> get transactionId => _messageAsMap["t"]);
5
String get transactionIdAsString => UTF8.decode(transactionId);
6
7
//
8
List<int> get messageType => _messageAsMap["y"];
9
String get messageTypeAsString => UTF8.decode(messageType);
10
11
//
12
List<int> get query => _messageAsMap["q"];
13
...
14
...
15
}
Copied!
こんな感じです。BEP5のスペックをみると結構ありますがも気長にコーディングしていけば、半日くらいで終わると思います。

(3) メッセージを送信する機能を持つ

1
class KNode {
2
..
3
..
4
sendMessage(KrpcMessage message, String ip, int port) {
5
return _udpSocket.send(message.messageAsBencode, ip, port);
6
}
7
..
8
}
9
10
class FindNode {
11
12
static int queryID = 0;
13
14
static KrpcMessage createQuery(List<int> queryingNodesId, List<int> targetNodeId) {
15
List<int> transactionId = UTF8.encode("fi${queryID++}");
16
return new KrpcMessage.fromMap({"a": {"id": queryingNodesId, "target": targetNodeId}, "q": "find_node", "t": transactionId, "y": "q"});
17
}
18
19
static KrpcMessage createResponse(List<int> compactNodeInfo, List<int> queryingNodesId, List<int> transactionId) {
20
return new KrpcMessage.fromMap({"r": {"id": queryingNodesId, "nodes": compactNodeInfo}, "t": transactionId, "y": "r"});
21
}
22
}
Copied!
メッセージの送信は、受信用に作成したUDPSocketを利用します。こうすることで、受信相手に、ポート番号を伝えることができます。

(4) ネットワークへの参加用のコードを書く

RootingTableに所持しているデータの中から、自分自信ともっとも近いKIDをもつPeerへFindNodeクエリを送信する
1
class KNodeWorkFindNode {
2
...
3
...
4
updateP2PNetworkWithoutClear(KNode node) {
5
node.rootingtable.findNode(node.nodeId).then((List<KPeerInfo> infos) {
6
for (KPeerInfo info in infos) {
7
if (!_findNodesInfo.rawsequential.contains(info)) {
8
_findNodesInfo.addLast(info);
9
node.sendFindNodeQuery(info.ipAsString, info.port, node.nodeId.value).catchError((_) {});
10
}
11
}
12
});
13
}
14
...
15
...
16
}
Copied!
レスポンスを受けとったら、もう一度繰り返す
1
class KNodeWorkFindNode {
2
...
3
...
4
onReceiveQuery(KNode node, HetiReceiveUdpInfo info, KrpcMessage query) {
5
if (query.queryAsString == KrpcMessage.QUERY_FIND_NODE) {
6
KrpcFindNode findNode = query.toFindNode();
7
return node.rootingtable.findNode(findNode.targetAsKId).then((List<KPeerInfo> infos) {
8
return node.sendFindNodeResponse(info.remoteAddress, info.remotePort, query.transactionId, KPeerInfo.toCompactNodeInfos(infos)).catchError((_) {});
9
});
10
}
11
node.rootingtable.update(new KPeerInfo(info.remoteAddress, info.remotePort, query.nodeIdAsKId));
12
updateP2PNetworkWithoutClear(node);
13
}
14
...
15
..
16
}
Copied!
一定時間だったら、もう一度アクセスする
1
class KNodeWorkFindNode {
2
...
3
...
4
5
onTicket(KNode node) {
6
_findNodesInfo.clear();
7
updateP2PNetworkWithoutClear(node);
8
}
Copied!

(5) FindeNodeクエリに対応したレスポンスを返せるようにしよう

1
class KNodeWorkFindNode {
2
...
3
...
4
onReceiveResponse(KNode node, HetiReceiveUdpInfo info, KrpcMessage response) {
5
if (response.queryFromTransactionId == KrpcMessage.QUERY_FIND_NODE) {
6
KrpcFindNode findNode = response.toFindNode();
7
for (KPeerInfo info in findNode.compactNodeInfoAsKPeerInfo) {
8
node.rootingtable.update(info);
9
}
10
}
11
node.rootingtable.update(new KPeerInfo(info.remoteAddress, info.remotePort, response.nodeIdAsKId));
12
updateP2PNetworkWithoutClear(node);
13
}
14
..
15
}
Copied!
Last modified 3yr ago