前回までで Peer への接続とパケットの解析(パース)は出来るようになりました。これらを使ってパケットで Message
をやりとりして、ハンドシェイクをしていきます。
- 接続してバイト列をやりとりする
- バイト列をパースする
- バージョン番号などのハンドシェイク (← 今ここ)
Message の実装
前回定義した Message
はまだ trait だけで具象クラスが無い状態でした。
ここでは以下の2つの Message types の実装が必要です。
VerAck
はメッセージ本文である payload
が空で、実装も以下のように簡単です。
case object VerAck extends Message {
def toByteString: ByteString = ByteString.empty
def parse(bs: ByteString): Option[(VerAck.type, ByteString)] = Some(VerAck -> bs)
}
Version
はもっと複雑でここに掲載するには長すぎるので割愛します。最低限のパースの実装方法は解説できたの思うので頑張ってみて下さい。
Message の種別名との結び付け
ヘッダの command
フィールドで指定された種別名でパーサを特定できるように、「種別名」と「パーサ」の組み合わせをMessage
のコンパニオンオブジェクトに実装します。
object Message {
val parsers: Map[String, ParseByteString[_ <: Message]] = Map(
"version" -> Version,
"verack" -> Verack
)
def getParser(name: String): Option[ParseByteString[_ <: Message]] = parsers.lift(name)
}
これを見て scala.reflect
を使って自動的に結び付けられるのでは?と思われた方は鋭い!
ここでは2つだけなのでこうして列挙できますが、全ての Message types
をいちいち列挙するのはバグの元なので出来れば避けるべきでしょう。
Message への応答
ここまでで、送られてきたバイト列をその種別名に応じてパースできるようになったので、次はその意味に応じた処理が必要です。
ここでは Akka Actor を使っていきます。
接続を管理する Actor
まずは接続処理のところです。
この Actor が受け付けられるメッセージは Ctl
として定義します。
Send
で相手に送る Message
を受け付けて、Disconnect
で切断できるようにします。
sealed trait Ctl
object Ctl {
case class Send(msg: Message) extends Ctl
case object Disconnect extends Ctl
}
Actor は Behavior[Ctl]
を定義することで実装されます。
object Connection {
def apply(host: String, port: Int)(implicit magic: Magic): Behavior[Ctl] =
Behaviors.setup { context =>
val dispatcher = context.spawn(Dispatcher(host, port, context.self), "dispatcher")
val (queue, ins) = connect(host, port)
def send(msg: Message) = {
import context.executionContext
val packet = Packet(msg)
context.log.info("送信する Pakcet: {}", packet)
queue.offer(packet.toByteString).foreach { result =>
context.log.info("Queue result: {}", result)
// queue あふれ等の問題に対処する
}
}
def startReceiving(): Unit = {
@tailrec
def receive(stream: Stream[Byte]): Unit =
if (stream.isEmpty) {
context.log.info("空 Stream です!")
context.self ! Ctl.Disconnect
} else {
context.log.info("Stream から Packet を読み込みます")
Packet.parseStream(stream) match {
case None =>
context.log.error("パースエラー!")
context.self ! Ctl.Disconnect
case Some((packet, next)) =>
context.log.info("受信した Packet: {}", packet)
dispatcher ! packet.body
receive(next)
}
}
val executionContextToParse = ExecutionContext.fromExecutor(
java.util.concurrent.Executors.newCachedThreadPool()
)
Future(receive {
val buf = new java.io.BufferedInputStream(ins)
Stream.continually(buf.read()).takeWhile(_ != -1).map(_.toByte)
})(executionContextToParse)
}
startReceiving()
Behaviors.receiveMessage {
case Ctl.Send(msg) =>
send(msg)
Behaviors.same
case Ctl.Disconnect =>
// 終了処理
Behaviors.stopped
}
}
}
4行目の Dispatcher
は後で定義します。
6行目の connect
は前回解説した Flow[ByteString].runWith
の処理をする呼び出しです。
send
メソッドでは queue
にメッセージを放り込んでいます。この queue.offer
の戻り値に「ちゃんと受け付けられた」とか「キュー溢れした」とかいう結果が返ってきますので、リトライなどの処理をここに盛り込む事になります。
startReceiving
メソッド内で Future
を使ってフォークして無限ループに入っています。Akka Stream でうまいことブロックしないように出来ればいいのですが、今のところいい方法が見つかってません。誰か教えて下さい (^^;
なお、パースエラーなどの場合にいきなりブチッと切るのではなく、先方に「なんか変だから切るね」と reject メッセージ で伝えてあげるのが(いちおう)マナーですが、ここでは割愛しています。
Mesasge を処理する Actor
次は実際に Message
を処理する Actor です。ここでハンドシェークの実装をします。
受け付けるメッセージは Message
(ネーミングが紛らわしかったな)ですが、ハンドシェークをする Hand
とハンドシェークが終わった後の Conclusion
に処理の背景が別れます。これを Phase
として表現します。
sealed trait Phase
object Phase {
case class Hand(
connection: ActorRef[Ctl],
myVersion: Version,
maybeVersion: Option[Version],
isAcked: Boolean
) extends Phase
case class Conclusion(
connection: ActorRef[Ctl],
myVersion: Version,
version: Version
) extends Phase
}
ActorRef[Ctl]
とあるのは、上で定義した Connection
の Actor の事です。
この Phase
を使った Actor を以下に定義します。
Behavior[Message]
が handshaking
と conclusion
のメソッドに別れていて、それぞれが Phase.Hand
と Phase.Conclusion
に対応しています。
object Dispatcher {
def apply(
host: String,
port: Int,
connection: ActorRef[Ctl]
): Behavior[Message] = {
val myVersion = mkVersion(host, port)
connection ! Ctl.Send(myVersion)
context.log.info("自分のバージョン: {}", myVersion)
handshaking(Phase.Hand(connection, myVersion, None, false))
}
def handshaking(phase: Phase.Hand): Behavior[Message] =
Behaviors.receive { (context, msg) =>
(phase, msg) match {
case (
Phase.Hand(connection, myVersion, None, false),
version: Version
) =>
context.log.info("相手のバージョン: {}", version)
val isOk = checkVersion(connection, myVersion)(version)
if (isOk) {
context.log.info("バージョンを受け入れます {}", version)
handshaking(Phase.Hand(connection, myVersion, Some(version), false))
} else {
Behaviors.stopped
}
case (
Phase.Hand(connection, myVersion, None, false),
ack: Verack
) =>
context.log.info("送ったバージョンが受け入れられました")
handshaking(Phase.Hand(connection, myVersion, None, true))
case (
Phase.Hand(connection, myVersion, None, true),
version: Version
) =>
context.log.info("相手のバージョン: {}", version)
val isOk = checkVersion(connection, myVersion)(version)
if (isOk) {
context.log.info("バージョンを受け入れます {}", version)
conclusion(Phase.Conclusion(connection, myVersion, version))
} else {
Behaviors.stopped
}
case (
Phase.Hand(connection, myVersion, Some(version), false),
ack: Verack
) =>
context.log.info("送ったバージョンが受け入れられました")
conclusion(Phase.Conclusion(connection, myVersion, version))
case (_, unknownMsg) =>
context.log.info("ハンドシェイク中にありえないメッセージを受信しました。 {}", unknownMsg)
phase.connection ! Ctl.Disconnect
Behaviors.unhandled
}
}
def conclusion(phase: Phase.Conclusion): Behavior[Message] =
Behaviors.receive { (context, msg) =>
msg match {
case _ =>
context.log.info("Unhandled msg: {}", msg)
Behaviors.unhandled
}
}
}
apply
では自分のバージョンを送信してから handshaking
に処理を移行します。
handshaking
内でバージョン情報を交換して、双方で受け入れられれば conclusion
に移行します。
conclusion
ではハンドシェーク後の通常メッセージを処理し続ける事になりますが、ここでは割愛します。
ハンドシェイク
本筋のハンドシェーク処理について解説します。
シンプルに言い切れば、お互いに求める要件を提示して確認し合う事が目的です。
version の送信
こちらから接続していますので、こちらから version
を送らないと何も始まりません。
送る情報の具体例はこんな感じで、上の mkVersion
の実装になります。
Version(
version = 70015, // 使いたいプロトコルのバージョン番号
services = Version.Services( // この通信で有効にするサービス
isNetwork = true,
isGetUTXO = true,
isBloom = true,
isWitness = true
),
timestamp = System.currentTimeMillis.milliseconds.toSeconds,
addressRecv = NetworkAddress(
new InetSocketAddress(remote, port),
Version.Services( // 相手に要求するサービス
isNetwork = true,
isGetUTXO = true,
isBloom = true,
isWitness = true
)
),
addressFrom = NetworkAddress(
new InetSocketAddress("localhost", 0),
Version.Services() // 自分が提供するサービス
),
nonce = BigInt(8, scala.util.Random),
userAgent = "My App 0.1",
startHeight = 0,
relay = true
)
たくさんの項目があるのですが、重要なのはプロトコルのバージョン番号と Services
です。
addressFrom
のところは自分が提供できる Services
なので空にしています。
relay = true
のところはブロックに取り込まれる前のブロードキャストされた Tx も送ってくるかどうか(BIP0037)です。
startHeight
は実際には getheaders
で必要なブロックを指定するのであまり意味はなく、とりあえず 0 でいいと思います。
version への応答
バージョンが送られてきたら要求するバージョンかどうかをチェックします。(上の checkVersion
に該当します)
通常はバージョン番号さえ確認すれば問題ないはずです。追加で Services
のチェックになると思います。
OK なら verack
を返します。これで相手のバージョンを承認した事になります。
handshaking
の実装では「自分が承認する前に向こうから承認された場合」、「自分が承認した後に向こうが承認した場合」などをそのまま書いているので重複したところが多いですが、分かりやすいと思います。
最終的に双方で verack
が受信されればハンドシェーク完了となります。
走らせる!
最後にこれらを起動するための main 関数は以下のようになります。
object Main {
def main(args: Array[String]): Unit = {
val host = args(0)
val port = args(1).toInt
implicit val magic = Magic.testnet3
ActorSystem(Connection(host, port), "connection")
}
}
引数にホスト名とポート番号を与えて起動すると、ズラズラとハンドシェークのログが出るはずです。
ここでは接続先の検索は解説しませんでしたが、P2P といえばそこからイメージされる方も多いと思います。
そのあたりについては Scala ではありませんが、他のメンバが書いたこちらのブログを参考にしてみて下さい。
BitcoinプロトコルをRustでお話してみる