Bitcoin の P2P に Scala で通信してみる(後編)

P2P お知らせ
Table of Contents

前回までで Peer への接続とパケットの解析(パース)は出来るようになりました。これらを使ってパケットで Message をやりとりして、ハンドシェイクをしていきます。

  1. 接続してバイト列をやりとりする
  2. バイト列をパースする
  3. バージョン番号などのハンドシェイク (← 今ここ)

Message の実装

前回定義した Message はまだ trait だけで具象クラスが無い状態でした。
ここでは以下の2つの Message types の実装が必要です。

VerAck はメッセージ本文である payload が空で、実装も以下のように簡単です。

case object VerAck extends Message {
  def toByteString: ByteString = ByteString.empty
  def parse(bs: ByteString): Option[(VerAck.type, ByteString)] = Some(VerAck -> bs)
}

Version はもっと複雑でここに掲載するには長すぎるので割愛します。最低限のパースの実装方法は解説できたの思うので頑張ってみて下さい。

Message の種別名との結び付け

ヘッダの command フィールドで指定された種別名でパーサを特定できるように、「種別名」と「パーサ」の組み合わせをMessage のコンパニオンオブジェクトに実装します。

object Message {
  val parsers: Map[String, ParseByteString[_ <: Message]] = Map(
    "version" -> Version,
    "verack" -> Verack
  )
  def getParser(name: String): Option[ParseByteString[_ <: Message]] = parsers.lift(name)
}

これを見て scala.reflect を使って自動的に結び付けられるのでは?と思われた方は鋭い!
ここでは2つだけなのでこうして列挙できますが、全ての Message types をいちいち列挙するのはバグの元なので出来れば避けるべきでしょう。

Message への応答

ここまでで、送られてきたバイト列をその種別名に応じてパースできるようになったので、次はその意味に応じた処理が必要です。
ここでは Akka Actor を使っていきます。

接続を管理する Actor

まずは接続処理のところです。
この Actor が受け付けられるメッセージは Ctl として定義します。
Send で相手に送る Message を受け付けて、Disconnect で切断できるようにします。

sealed trait Ctl
object Ctl {
  case class Send(msg: Message) extends Ctl
  case object Disconnect extends Ctl
}

Actor は Behavior[Ctl] を定義することで実装されます。

object Connection {
  def apply(host: String, port: Int)(implicit magic: Magic): Behavior[Ctl] =
    Behaviors.setup { context =>
      val dispatcher = context.spawn(Dispatcher(host, port, context.self), "dispatcher")
      val (queue, ins) = connect(host, port)

      def send(msg: Message) = {
        import context.executionContext
        val packet = Packet(msg)
        context.log.info("送信する Pakcet: {}", packet)
        queue.offer(packet.toByteString).foreach { result =>
          context.log.info("Queue result: {}", result)
          // queue あふれ等の問題に対処する
        }
      }

      def startReceiving(): Unit = {
        @tailrec
        def receive(stream: Stream[Byte]): Unit =
          if (stream.isEmpty) {
            context.log.info("空 Stream です!")
            context.self ! Ctl.Disconnect
          } else {
            context.log.info("Stream から Packet を読み込みます")
            Packet.parseStream(stream) match {
              case None =>
                context.log.error("パースエラー!")
                context.self ! Ctl.Disconnect
              case Some((packet, next)) =>
                context.log.info("受信した Packet: {}", packet)
                dispatcher ! packet.body
                receive(next)
            }
          }

        val executionContextToParse = ExecutionContext.fromExecutor(
          java.util.concurrent.Executors.newCachedThreadPool()
        )

        Future(receive {
          val buf = new java.io.BufferedInputStream(ins)
          Stream.continually(buf.read()).takeWhile(_ != -1).map(_.toByte)
        })(executionContextToParse)
      }

      startReceiving()

      Behaviors.receiveMessage {
        case Ctl.Send(msg) =>
          send(msg)
          Behaviors.same
        case Ctl.Disconnect =>
          // 終了処理
          Behaviors.stopped
      }
    }
}

4行目の Dispatcher は後で定義します。
6行目の connect前回解説した Flow[ByteString].runWith の処理をする呼び出しです。

send メソッドでは queue にメッセージを放り込んでいます。この queue.offer の戻り値に「ちゃんと受け付けられた」とか「キュー溢れした」とかいう結果が返ってきますので、リトライなどの処理をここに盛り込む事になります。

startReceiving メソッド内で Future を使ってフォークして無限ループに入っています。Akka Stream でうまいことブロックしないように出来ればいいのですが、今のところいい方法が見つかってません。誰か教えて下さい (^^;

なお、パースエラーなどの場合にいきなりブチッと切るのではなく、先方に「なんか変だから切るね」と reject メッセージ で伝えてあげるのが(いちおう)マナーですが、ここでは割愛しています。

Mesasge を処理する Actor

次は実際に Message を処理する Actor です。ここでハンドシェークの実装をします。
受け付けるメッセージは Message (ネーミングが紛らわしかったな)ですが、ハンドシェークをする Hand とハンドシェークが終わった後の Conclusion に処理の背景が別れます。これを Phase として表現します。

sealed trait Phase
object Phase {
  case class Hand(
      connection: ActorRef[Ctl],
      myVersion: Version,
      maybeVersion: Option[Version],
      isAcked: Boolean
  ) extends Phase
  case class Conclusion(
      connection: ActorRef[Ctl],
      myVersion: Version,
      version: Version
  ) extends Phase
}

ActorRef[Ctl] とあるのは、上で定義した Connection の Actor の事です。
この Phase を使った Actor を以下に定義します。
Behavior[Message]handshakingconclusion のメソッドに別れていて、それぞれが Phase.HandPhase.Conclusion に対応しています。

object Dispatcher {
  def apply(
      host: String,
      port: Int,
      connection: ActorRef[Ctl]
  ): Behavior[Message] = {
    val myVersion = mkVersion(host, port)
    connection ! Ctl.Send(myVersion)
    context.log.info("自分のバージョン: {}", myVersion)
    handshaking(Phase.Hand(connection, myVersion, None, false))
  }

  def handshaking(phase: Phase.Hand): Behavior[Message] =
    Behaviors.receive { (context, msg) =>
      (phase, msg) match {
        case (
            Phase.Hand(connection, myVersion, None, false),
            version: Version
            ) =>
          context.log.info("相手のバージョン: {}", version)
          val isOk = checkVersion(connection, myVersion)(version)
          if (isOk) {
            context.log.info("バージョンを受け入れます {}", version)
            handshaking(Phase.Hand(connection, myVersion, Some(version), false))
          } else {
            Behaviors.stopped
          }

        case (
            Phase.Hand(connection, myVersion, None, false),
            ack: Verack
            ) =>
          context.log.info("送ったバージョンが受け入れられました")
          handshaking(Phase.Hand(connection, myVersion, None, true))

        case (
            Phase.Hand(connection, myVersion, None, true),
            version: Version
            ) =>
          context.log.info("相手のバージョン: {}", version)
          val isOk = checkVersion(connection, myVersion)(version)
          if (isOk) {
            context.log.info("バージョンを受け入れます {}", version)
            conclusion(Phase.Conclusion(connection, myVersion, version))
          } else {
            Behaviors.stopped
          }

        case (
            Phase.Hand(connection, myVersion, Some(version), false),
            ack: Verack
            ) =>
          context.log.info("送ったバージョンが受け入れられました")
          conclusion(Phase.Conclusion(connection, myVersion, version))

        case (_, unknownMsg) =>
          context.log.info("ハンドシェイク中にありえないメッセージを受信しました。 {}", unknownMsg)
          phase.connection ! Ctl.Disconnect
          Behaviors.unhandled
      }
    }

  def conclusion(phase: Phase.Conclusion): Behavior[Message] =
    Behaviors.receive { (context, msg) =>
      msg match {
        case _ =>
          context.log.info("Unhandled msg: {}", msg)
          Behaviors.unhandled
      }
    }
}

apply では自分のバージョンを送信してから handshaking に処理を移行します。
handshaking 内でバージョン情報を交換して、双方で受け入れられれば conclusion に移行します。
conclusion ではハンドシェーク後の通常メッセージを処理し続ける事になりますが、ここでは割愛します。

ハンドシェイク

本筋のハンドシェーク処理について解説します。
シンプルに言い切れば、お互いに求める要件を提示して確認し合う事が目的です。

version の送信

こちらから接続していますので、こちらから version を送らないと何も始まりません。
送る情報の具体例はこんな感じで、上の mkVersion の実装になります。

      Version(
        version = 70015, // 使いたいプロトコルのバージョン番号
        services = Version.Services( // この通信で有効にするサービス
          isNetwork = true,
          isGetUTXO = true,
          isBloom = true,
          isWitness = true
        ),
        timestamp = System.currentTimeMillis.milliseconds.toSeconds,
        addressRecv = NetworkAddress(
          new InetSocketAddress(remote, port),
          Version.Services( // 相手に要求するサービス
            isNetwork = true,
            isGetUTXO = true,
            isBloom = true,
            isWitness = true
          )
        ),
        addressFrom = NetworkAddress(
          new InetSocketAddress("localhost", 0),
          Version.Services() // 自分が提供するサービス
        ),
        nonce = BigInt(8, scala.util.Random),
        userAgent = "My App 0.1",
        startHeight = 0,
        relay = true
      )

たくさんの項目があるのですが、重要なのはプロトコルのバージョン番号と Services です。
addressFrom のところは自分が提供できる Services なので空にしています。

relay = true のところはブロックに取り込まれる前のブロードキャストされた Tx も送ってくるかどうか(BIP0037)です。

startHeight は実際には getheaders で必要なブロックを指定するのであまり意味はなく、とりあえず 0 でいいと思います。

version への応答

バージョンが送られてきたら要求するバージョンかどうかをチェックします。(上の checkVersion に該当します)
通常はバージョン番号さえ確認すれば問題ないはずです。追加で Services のチェックになると思います。

OK なら verack を返します。これで相手のバージョンを承認した事になります。

handshaking の実装では「自分が承認する前に向こうから承認された場合」、「自分が承認した後に向こうが承認した場合」などをそのまま書いているので重複したところが多いですが、分かりやすいと思います。

最終的に双方で verack が受信されればハンドシェーク完了となります。

走らせる!

最後にこれらを起動するための main 関数は以下のようになります。

object Main {
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    implicit val magic = Magic.testnet3
    ActorSystem(Connection(host, port), "connection")
  }
}

引数にホスト名とポート番号を与えて起動すると、ズラズラとハンドシェークのログが出るはずです。

ここでは接続先の検索は解説しませんでしたが、P2P といえばそこからイメージされる方も多いと思います。
そのあたりについては Scala ではありませんが、他のメンバが書いたこちらのブログを参考にしてみて下さい。
BitcoinプロトコルをRustでお話してみる

タイトルとURLをコピーしました