通信ライブラリとして、hetimanetを使用しています。APIのインターフェイスは他の通信ライブラリと大きな違いはありません。なので、お使いのものと読み替えて読み進めてください。
RootingTableに、FindeNodeの機能を追加する
所持しているPeerInfoを指定されたKIDでソートして探す。
Copy class KRootingTable {
...
...
...
List<KPeerInfo> findNode(KId id) {
List<KPeerInfo> ids = [];
for (KBucket b in _kBuckets) {
for (KPeerInfo i in b.iterable) {
ids.add(i);
}
}
ids.sort((KPeerInfo a, KPeerInfo b) {
return a.id.xor(id).compareTo(b.id.xor(id));
});
List<KPeerInfo> ret = [];
for (KPeerInfo p in ids) {
ret.add(p);
if (ret.length >= _kBucketSize) {
return ret;
}
}
return ret;
}
...
...
}
(1) KNodeはUDPサーバー機能を持つ。
まずは、UDPを用いて、通信部分を書いてみましょう。Torrentの仕様では、DHTとして動作するPeerをNodeと読んでいます。 DHTの通信を行う主体として、KNode class を定義することにします。
UDP serverは、メッセージを受け取るメッセージはbencodeなのでした。パースしてそのMessageを使うclassに渡します。
Copy class KNode {
bool _isStart = false ;
bool get isStart => _isStart;
HetiSocketBuilder _socketBuilder = null ;
HetiUdpSocket _udpSocket = null ;
KNode ( HetiSocketBuilder socketBuilder) {
this._socketBuilder = socketBuilder;
}
Future start ({ String ip : "0.0.0.0" , int port : 28080 }) async {
(_isStart != false ? throw "already started" : 0 );
_udpSocket = this._socketBuilder. createUdpClient ();
return _udpSocket. bind (ip, port, multicast : true ). then (( int v) {
_udpSocket. onReceive (). listen (( HetiReceiveUdpInfo info) {
KrpcMessage . decode (info.data, this). then (( KrpcMessage message) {
onReceiveMessage (info, message);
});
});
_isStart = true ;
});
}
Future stop () async {
if (_isStart == false || _udpSocket == null ) {
return null ;
}
return _udpSocket. close (). whenComplete (() {
_isStart = false ;
_ai. stop (this);
});
}
}
(2) Krpc Messageをパースする機能を持つ
MainLine DHT では、PeerとPeerの通信には、Bencodeが利用されます。 すでにBencodeのパーサーは作成ずみなので、難しいことはないはずです。
Copy class KrpcMessage {
KrpcMessage.fromMap(Map map) {
_messageAsMap = map;
}
static Future<KrpcMessage> decode(List<int> data) async {
Map<String, Object> messageAsMap = null;
try {
Object v = Bencode.decode(data);
messageAsMap = v;
} catch (e) {
throw {};
}
return new KrpcMessage.fromMap(messageAsMap);
}
}
こんな感じです。あとは、必要に応じて、パースした結果を読み取るだけです。
Copy class KrpcMessage {
...
...
List<int> get transactionId => _messageAsMap["t"]);
String get transactionIdAsString => UTF8.decode(transactionId);
//
List<int> get messageType => _messageAsMap["y"];
String get messageTypeAsString => UTF8.decode(messageType);
//
List<int> get query => _messageAsMap["q"];
...
...
}
こんな感じです。BEP5のスペックをみると結構ありますがも気長にコーディングしていけば、半日くらいで終わると思います。
(3) メッセージを送信する機能を持つ
Copy class KNode {
..
..
sendMessage(KrpcMessage message, String ip, int port) {
return _udpSocket.send(message.messageAsBencode, ip, port);
}
..
}
class FindNode {
static int queryID = 0;
static KrpcMessage createQuery(List<int> queryingNodesId, List<int> targetNodeId) {
List<int> transactionId = UTF8.encode("fi${queryID++}");
return new KrpcMessage.fromMap({"a": {"id": queryingNodesId, "target": targetNodeId}, "q": "find_node", "t": transactionId, "y": "q"});
}
static KrpcMessage createResponse(List<int> compactNodeInfo, List<int> queryingNodesId, List<int> transactionId) {
return new KrpcMessage.fromMap({"r": {"id": queryingNodesId, "nodes": compactNodeInfo}, "t": transactionId, "y": "r"});
}
}
メッセージの送信は、受信用に作成したUDPSocketを利用します。こうすることで、受信相手に、ポート番号を伝えることができます。
(4) ネットワークへの参加用のコードを書く
RootingTableに所持しているデータの中から、自分自信ともっとも近いKIDをもつPeerへFindNodeクエリを送信する
Copy class KNodeWorkFindNode {
...
...
updateP2PNetworkWithoutClear(KNode node) {
node.rootingtable.findNode(node.nodeId).then((List<KPeerInfo> infos) {
for (KPeerInfo info in infos) {
if (!_findNodesInfo.rawsequential.contains(info)) {
_findNodesInfo.addLast(info);
node.sendFindNodeQuery(info.ipAsString, info.port, node.nodeId.value).catchError((_) {});
}
}
});
}
...
...
}
レスポンスを受けとったら、もう一度繰り返す
Copy class KNodeWorkFindNode {
...
...
onReceiveQuery(KNode node, HetiReceiveUdpInfo info, KrpcMessage query) {
if (query.queryAsString == KrpcMessage.QUERY_FIND_NODE) {
KrpcFindNode findNode = query.toFindNode();
return node.rootingtable.findNode(findNode.targetAsKId).then((List<KPeerInfo> infos) {
return node.sendFindNodeResponse(info.remoteAddress, info.remotePort, query.transactionId, KPeerInfo.toCompactNodeInfos(infos)).catchError((_) {});
});
}
node.rootingtable.update(new KPeerInfo(info.remoteAddress, info.remotePort, query.nodeIdAsKId));
updateP2PNetworkWithoutClear(node);
}
...
..
}
一定時間だったら、もう一度アクセスする
Copy class KNodeWorkFindNode {
...
...
onTicket(KNode node) {
_findNodesInfo.clear();
updateP2PNetworkWithoutClear(node);
}
(5) FindeNodeクエリに対応したレスポンスを返せるようにしよう
Copy class KNodeWorkFindNode {
...
...
onReceiveResponse(KNode node, HetiReceiveUdpInfo info, KrpcMessage response) {
if (response.queryFromTransactionId == KrpcMessage.QUERY_FIND_NODE) {
KrpcFindNode findNode = response.toFindNode();
for (KPeerInfo info in findNode.compactNodeInfoAsKPeerInfo) {
node.rootingtable.update(info);
}
}
node.rootingtable.update(new KPeerInfo(info.remoteAddress, info.remotePort, response.nodeIdAsKId));
updateP2PNetworkWithoutClear(node);
}
..
}