bitcoind は RPC で一通りの情報が取れますが、そこをちょっと踏み込んで Scala で P2P を実装して Bitcoin の Peer のフリをしてみようと思います。
細かくやると大変なので 「とりあえず繋がる」 を目標にします。
P2P のプロトコルについては公式ドキュメントなどでちゃんと解説があるので、何となく分かった気にはなれますが、「で、実際どうすんの?」っていうところで二の足を踏んでいる人に少しでもお役に立てればと思います。
Scala やそのライブラリについての深い解説はしませんので、あしからず。
以下の手順で進めます。
- 接続してバイト列をやりとりする
- バイト列をパースする (← 前編ではここまで)
- バージョン番号などのハンドシェイク
接続してバイト列をやりとりする
Scala ではとりあえずメジャーな Akka Streams を使います。
いろいろな組み合わせが出来すぎてなんだか分かりにくいライブラリですが、使い所を押さえれば便利で頼もしいライブラリです。
ここでは現行バージョンの 2.6.0 を使います。
準備として必要なパッケージをインポートしておきます。
import akka.stream._
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.duration._
入力と出力の形態を定義
bitcoind からの送信を延々とバイト列として受け取って、こちらからも延々とバイト列を送りたいので、Sink
(入力) と Source
(出力) を以下のように定義します。
※ このネーミングが分かり難い。Source
は 「接続先に提供するソース」 って意味ですかね。
val timeout = 10 seconds // 適宜調整
val sink = StreamConverters.asInputStream(timeout)
val queueSize = 8 // 適宜調整
val source = Source.queue[ByteString](queueSize, OverflowStrategy.dropNew)
OverflowStrategy
は処理の適性で選択するのですが、dropNew
が分かり易いと思います。
接続
用意した Sink
と Souce
とを組み合わせて接続します。
この時 ActorSystem
が必要になりますので、適当(切)なヤツを作成しておきます。通常はデフォルト設定で充分です。
Akka 2.5 まではここで Materializer
が必須でしたが、2.6 からはデフォルトが用意されるようになりました。
implicit val actorSystem = akka.actor.ActorSystem("sample")
val host: String = ...
val port: Int = ...
val tcp = Tcp().outgoingConnection(host, port)
val (queue, ins) = Flow[ByteString].runWith(source via tcp, sink)
これで接続完了です。
出来上がった以下の2つを使って通信します。
queue
:SourceQueueWithComplete
[ByteString]
offer
というメソッドでByteString
を接続先に送ってくれます。ins
:java.io.InputStream
接続先から送られてきたバイト列を読み込んでいくために使います。
InputStream を Stream に変換
場合によっては java.io.InputStream
のままでもいいのですが、Scala での使い易さを考えて scala.collection.immutable.Stream
に変換します。これにより無限長のバイト配列として扱うことができます。
val bufIns = new java.io.BufferedInputStream(ins)
val infBytes = Stream.continually(bufIns.read()).takeWhile(_ != -1).map(_.toByte)
read
メソッドにより1バイトずつ読み込んでいるので一見非効率に見えますが BufferedInputStream
が内部バッファにため込んでくれているので大丈夫です。コンストラクタの第2引数を省略すればメモリサイズに応じたバッファを作ってくれます。気になる人は自分の運用に応じた値を指定して下さい。
バイト列をパースする
ここからが本番です。
受け取ったバイト列をプロトコルに従って解析し意味のある構造にします。
こちらのページが詳しいです。 https://en.bitcoin.it/wiki/Protocol_documentation
パケットの構造
サイズ | 名前 | 型 | 説明 |
---|---|---|---|
4 | magic | UInt32 | 各P2Pネットワークで定まっている値です。 Bitcoin の testnet3 なら 0x0709110B です。 |
12 | command | String | payload に入っているメッセージの種別名です。 12バイトよりも短い場合は 0x00 で埋められます。 |
4 | length | UInt32 | payload の長さです。 |
4 | checksum | UInt32 | payload の double-sha256 の先頭 4 バイトです。 |
可変 | payload | ByteString | メッセージ本体です。 command にある種別名に従って解析します。 |
Bitcoin なので(?) UInt32 は Little Endian です。magic
は "0B 11 09 07" となり、length
が300(0x12C)バイトなら "2C 01 00 00" となります。ややこしいのは checksum
で反転の反転になるのか double-sha256 のバイト列の先頭4バイトがそのままセットされます。
これによると、パケットを受け取ったら
- まず先頭の24バイト(4+12+4+4)を解析し
magic
をチェックした後でlength
の長さ分だけ続きを読み込み- その double-sha256 を
checksum
でチェックし command
の種別名で解析
すれば良いことが分かります。
パーサの作成
準備
関数型でパースと言えば、
ByteString => Option[(A, ByteString)]
つまり、ByteString を受け取って、型Aとしてパース出来れば、A と残りの ByteString の組を返す、というのが定石だと思います。
ここではこれに従ってパース用のメソッドを作っていきます。パーサは ParseByteString
を実装することにします。
trait ParseByteString[A] {
def parse(bs: ByteString): Option[(A, ByteString)]
}
次に ByteString
のユーティリティメソッドのために以下の implicit class
を任意の package object
内に作ります。
もちろんこのパッケージをインポートするのを忘れずに!
implicit class ByteStringUtil(private val bs: ByteString) extends AnyVal {
// ここにユーティリティメソッドを追加
}
まず定義するのは固定長のパーツの読み込みです。f
はlen
長の ByteString
を A
に変換します。
def parseFixed[A](len: Int)(f: ByteString => A): Option[(A, ByteString)] = {
val (a, b) = bs.splitAt(len)
if (a.size < len) None else Some(f(a) -> b)
}
double-sha256 も hash256
として定義しておきます。
def hash256 = {
def sha256(b: Array[Byte]) = java.security.MessageDigest.getInstance("SHA-256").digest(b)
ByteString(sha256(sha256(bs.toArray)))
}
case class の用意
パケットの構造を表現するための case class
を定義していきます。
各クラスはバイト列にもできるように、以下の CanBeByteString
を実装することとします。
trait CanBeByteString {
def toByteString: ByteString
protected def concat(xs: CanBeByteString*): ByteString =
xs.foldLeft(new akka.util.ByteStringBuilder)(_ append _.toByteString).result
}
まずはヘッダに使われているパーツです。ズラズラと長いですが、よく見ると単調なことに気付くと思います。
case class Magic(toByteString: ByteString) extends CanBeByteString
object Magic extends ParseByteString[Magic] {
val length = 4
def parse(bs: ByteString) = bs.parseFixed(length)(Magic(_))
val testnet3 = Magic(ByteString.fromInts(0x0B, 0x11, 0x09, 0x07))
}
case class Checksum(toByteString: ByteString) extends CanBeByteString
object Checksum extends ParseByteString[Checksum] {
val length = 4
def parse(bs: ByteString) = bs.parseFixed(length)(Checksum(_))
def calc(bs: ByteString) = Checksum(bs.hash256.take(length))
}
case class BSLength(toByteString: ByteString) extends CanBeByteString {
lazy val value = BigInt(toByteString.reverse.toArray)
}
object BSLength extends ParseByteString[BSLength] {
val length = 4
def parse(bs: ByteString) = bs.parseFixed(length)(BSLength(_))
def bySizeOf(bs: ByteString) = {
val v = BigInt(bs.size)
val array = v.toByteArray.reverse.padTo(length, 0.toByte).take(length)
BSLength(ByteString(array))
}
}
case class CommandName(toByteString: ByteString) extends CanBeByteString {
lazy val string = toByteString.utf8String.trim
override def toString = string
}
object CommandName extends ParseByteString[CommandName] {
val length = 12
def parse(bs: ByteString) = bs.parseFixed(length)(CommandName(_))
def apply(s: String): CommandName = {
val array = s.getBytes.padTo(length, 0.toByte).take(length)
CommandName(ByteString(array))
}
}
これらのパーツを組み合わせてヘッダを作ります。
case class Header(
magic: Magic,
command: CommandName,
length: BSLength,
checksum: Checksum
) extends CanBeByteString {
lazy val toByteString = concat(magic, command, length, checksum)
}
object Header extends ParseByteString[Header] {
val length = Magic.length + CommandName.length + UInt32.length + Checksum.length
def parse(bs: ByteString) = for {
(magic, next) <- Magic.parse(bs)
(command, next) <- CommandName.parse(next)
(length, next) <- BSLength.parse(next)
(checksum, next) <- Checksum.parse(next)
} yield Header(magic, command, length, checksum) -> next
}
パケットの解析
最後にヘッダをチェックしながらメッセージを読み込みます。
Packet
は Stream[Byte]
をパースするので、ParseByteString
とは違います。
Stream
は無限長なので、ここに出てくるnext
をByteString
にしようとするとずーーっと終わらないので気を付けて下さい。
case class Packet(
header: Header,
body: Message
) extends CanBeByteString {
def toByteString = concat(header, body)
}
object Packet {
private def splitStream(len: Int)(s: Stream[Byte]): Option[(ByteString, Stream[Byte])] = {
val (a, b) = s.splitAt(len)
val bs = ByteString(a.toArray)
if (bs.size < len) None else Some(bs -> b)
}
def parseStream(stream: Stream[Byte])(implicit magic: Magic): Option[(Packet, Stream[Byte])] = for {
(headerBs, next) <- splitStream(Header.length)(stream)
(header, left) <- Header.parse(headerBs)
if left.isEmpty
if header.magic == magic
(payload, next) <- splitStream(header.length.value.toInt)(next)
if header.checksum == Checksum.calc(payload)
mp <- Message.getParser(header.command.toString)
(body, left) <- mp.parse(payload)
if left.isEmpty
} yield Packet(header, body) -> next
}
left.isEmpty
のチェックは、残らず消費していることを確認しています。
Message
は以下のように定義されているものとします。
trait Message extends CanBeByteString {
val name: String
}
object Message {
def getParser(name: String): Option[ParseByteString[_ <: Message]] = ???
}
パケットの作成
パケットの受信が出来るようになりましたので、次は送信のために Packet
を作成するメソッドをコンパニオンオブジェクトに定義します。
作成された Packet
の toByteString
を、接続している queue
に offer
するわけです。
def apply(msg: Message)(implicit magic: Magic): Packet = {
val payload = msg.toByteString
val header = Header(
magic = magic,
command = CommandName(msg.name)
length = BSLength.bySizeOf(payload),
checksum = Checksum.calc(payload)
)
Packet(header, msg)
}
ここまでで、P2P のパケットを送受信する事が出来るようになりました。
後編では実際に Message
を定義してやりとりしながら、ハンドシェイクをしていきます。