コンテンツにスキップ

Protobufメッセージの送受信

メッセージの送受信プロセスは、TCPとWebSocketの標準で異なります。 以下では、両方のタイプの接続についてこのプロセスを詳細に説明します。

TCPを使用する

メッセージの送信

TCP接続を介してメッセージを送信するには、以下の手順を実行します:

  1. 選択したプログラミング言語の公式Google Protocol Buffer SDKを使用して、Protobufメッセージをバイトの配列(Protobufエンコーディングを使用)に変更します。

  2. ステップ1で作成された配列の長さを取得します。 この整数から新しいバイト配列を作成します。 新しいバイト配列を逆順にします。

  3. 新しいバイト配列と元のProtobufメッセージを含むバイト配列を連結します。

  4. 連結された配列を接続ストリームに送信します。

以下の例は、公式Open API SDKでこれらのステップがどのように実行されるかを示しています。

1
2
3
4
5
6
7
private async Task WriteTcp(byte[] messageByte, CancellationToken cancellationToken)
{
    byte[] array = BitConverter.GetBytes(messageByte.Length).Reverse().Concat(messageByte)
        .ToArray();
    await _sslStream.WriteAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
    await _sslStream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
1
2
3
client = Client(EndPoints.PROTOBUF_LIVE_HOST if hostType.lower() == "live" else EndPoints.PROTOBUF_DEMO_HOST, EndPoints.PROTOBUF_PORT, TcpProtocol)
request = ProtoOAApplicationAuthReq() # Can be any message
deferred = client.send(request)

Pythonの例

Twistedを使用して、Pythonの例はC#の例とほぼ同じ操作を実行します。 client.send(request)は次のように説明できます。

1
2
3
request = ProtoOAApplicationAuthReq() # Can be any message
requestAsString = request.SerializeToString() # This method is a part of the Google Protobuf SDK
requestAsInt32String = struct.pack("!H", len(requestAsString)) # The message is concatenated with the reversed array
メッセージを送信するために、Python SDKはProtocol.transport.write()メソッドを使用します。

メッセージを読む

非同期コード

すべてのOpen API SDKは非同期実行に依存しており、メッセージが到着するのを待つのではなく、動的に到着するメッセージに反応します。 その結果、メッセージの受信は通常、イベントハンドラーを介して行われます

メッセージを読むためには、メッセージを送信するために必要な手順を逆にする一連のアクションを実行する必要があります。

  1. バイト配列の最初の4バイトを受信します(これらはメッセージの長さを示します)。 これらの4バイトを逆順にして、整数に変換します。

  2. ステップ1で取得した整数であるXの量のバイトをストリームから読み取ります。

  3. Google Protobuf SDKを使用して、メッセージを有効なProtoMessageにデシリアライズします。

  4. ProtoMessageオブジェクトのpayloadTypeフィールドを使用して、その実際のタイプを見つけます。 Google Protobuf SDKを介して、ProtoMessageを必要なProtoOA...タイプのオブジェクトに変換します。

以下のコードスニペットは、公式のOpen API SDKがメッセージを読む方法を示しています。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
_tcpClient = new TcpClient
{
    LingerState = new LingerOption(enable: true, 10)
};
await _tcpClient.ConnectAsync(Host, Port).ConfigureAwait(continueOnCapturedContext: false);
SslStream _sslStream = new SslStream(_tcpClient.GetStream(), leaveInnerStreamOpen: false);
await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(continueOnCapturedContext: false);

        private async void ReadTcp(CancellationToken cancellationToken)
    {
        byte[] dataLength = new byte[4];
        byte[] data = null;
        try
        {
            while (!IsDisposed)
            {
                int num = 0;
                do
                {
                    int count = dataLength.Length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(dataLength, num, count, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < dataLength.Length);
                int length = GetLength(dataLength);
                if (length <= 0)
                {
                    continue;
                }

                data = ArrayPool<byte>.Shared.Rent(length);
                num = 0;
                do
                {
                    int count2 = length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(data, num, count2, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < length);
                ProtoMessage protoMessage = ProtoMessage.Parser.ParseFrom(data, 0, length);
                ArrayPool<byte>.Shared.Return(data);
                OnNext(protoMessage);
            }
        }
        catch (Exception innerException)
        {
            if (data != null)
            {
                ArrayPool<byte>.Shared.Return(data);
            }

            ReceiveException exception = new ReceiveException(innerException);
            OnError(exception);
        }
    }

    private int GetLength(byte[] lengthBytes)
    {
        Span<byte> span = lengthBytes.AsSpan();
        span.Reverse();
        return BitConverter.ToInt32(span);
    }

Pythonの例

Pythonの例では、メッセージを受信する際のバイト操作はすべて、以下に示すようにdataReceived()メソッドによって処理されます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def dataReceived(self, recd):
    """
    Convert int prefixed strings into calls to stringReceived.
    """
    self.recvd = self.recvd + recd
    while len(self.recvd) >= self.prefixLength and not self.paused:
        length ,= struct.unpack(
            self.structFormat, self.recvd[:self.prefixLength])
        if length > self.MAX_LENGTH:
            self.lengthLimitExceeded(length)
            return
        if len(self.recvd) < length + self.prefixLength:
            break
        packet = self.recvd[self.prefixLength:length + self.prefixLength]
        self.recvd = self.recvd[length + self.prefixLength:]
        self.stringReceived(packet)
1
2
3
4
5
6
7
8
def stringReceived(self, data):
    msg = ProtoMessage()
    msg.ParseFromString(data)

    if msg.payloadType == ProtoHeartbeatEvent().payloadType:
        self.heartbeat()
    self.factory.received(msg)
    return data

WebSocketを使用する

メッセージの送信

WebSocket接続を介してメッセージを送信するには、次のアクションを実行します:

  1. メッセージを適切なデータ形式(例えば、文字列)にシリアライズします。

  2. シリアライズされたメッセージを送信キューに追加します。

以下の例は、公式のOpen API SDKでこれらのアクションがどのように実行されるかを示しています。

C# SDKは、Websocket.Clientパッケージの一部であるWebsocketClientクラスを使用します。 以下に示すように、WebsocketClient.Send()メソッドは次のように動作します。

1
2
3
4
5
public void Send(byte[] message)
{
    Websocket.Client.Validations.Validations.ValidateInput(message, "message");
    _messagesBinaryToSendQueue.Writer.TryWrite(new ArraySegment<byte>(message));
}

ご覧の通り、クライアントは単に送信キューに配列セグメントを追加します。

Python SDKはWebSocket標準をサポートしていません。

メッセージを受信する

WebSocket接続を介してメッセージを受信するには、次の手順を実行します:

  1. cTraderバックエンドから受信したデータを取得します。

  2. データを有効なProtobufメッセージにデシリアライズします。

公式のOpen API SDKでこれがどのように行われるかの例については、以下のスニペットを参照してください。

メッセージを受信するために、WebsocketClientは新しいメッセージを受け取ったときにクライアントが何をするかを処理するコールバック関数にサブスクライブする必要があります。

1
2
var client = new WebsocketClient();
client.MessageReceived.Subscribe(msg => {Console.WriteLine(msg);})

サブスクライブする前に、.NET SDKはメッセージをProtobufメッセージに解析します。 必要なサブスクリプションは、ConnectWebSocket()コールバックの本体に追加されます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
private async Task ConnectWebSocket()
{
    DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(7, 2);
    defaultInterpolatedStringHandler.AppendLiteral("wss://");
    defaultInterpolatedStringHandler.AppendFormatted(Host);
    defaultInterpolatedStringHandler.AppendLiteral(":");
    defaultInterpolatedStringHandler.AppendFormatted(Port);
    Uri url = new Uri(defaultInterpolatedStringHandler.ToStringAndClear());
    _websocketClient = new WebsocketClient(url, () => new ClientWebSocket())
    {
        IsTextMessageConversionEnabled = false,
        ReconnectTimeout = null,
        IsReconnectionEnabled = false,
        ErrorReconnectTimeout = null
    };
    _webSocketMessageReceivedDisposable = _websocketClient.MessageReceived.Select((ResponseMessage msg) => ProtoMessage.Parser.ParseFrom(msg.Binary)).Subscribe(new Action<ProtoMessage>(OnNext));
    _webSocketDisconnectionHappenedDisposable = _websocketClient.DisconnectionHappened.Subscribe(new Action<DisconnectionInfo>(OnWebSocketDisconnectionHappened));
    await _websocketClient.StartOrFail();
}

OnNext()コールバックでは、ProtoMessageMessageFactory.GetMessage()コールバックに渡されます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void OnNext(ProtoMessage protoMessage)
{
    foreach (KeyValuePair<int, IObserver<IMessage>> observer2 in _observers)
    {
        observer2.Deconstruct(out var _, out var value);
        IObserver<IMessage> observer = value;
        try
        {
            IMessage message = MessageFactory.GetMessage(protoMessage);
            if (protoMessage.HasClientMsgId || message == null)
            {
                observer.OnNext(protoMessage);
            }

            if (message != null)
            {
                observer.OnNext(message);
            }
        }
        catch (Exception innerException)
        {
            ObserverException exception = new ObserverException(innerException, observer);
            OnError(exception);
        }
    }
}

Python SDKはWebSocket標準をサポートしていません。