ข้ามไปที่เนื้อหา

ส่งและรับข้อความ Protobuf

กระบวนการส่งและรับข้อความแตกต่างกันระหว่างมาตรฐาน TCP และ WebSocket ด้านล่างเราอธิบายกระบวนการนี้โดยละเอียดสำหรับทั้งสองประเภทของการเชื่อมต่อ

ใช้ TCP

ส่งข้อความ

เพื่อส่งข้อความผ่านการเชื่อมต่อ TCP ให้ทำดังนี้:

  1. เปลี่ยนข้อความ Protobuf ให้เป็นอาร์เรย์ของไบต์ (โดยใช้การเข้ารหัส Protobuf) โดยใช้ SDK Google Protocol Buffer อย่างเป็นทางการสำหรับภาษาการเขียนโปรแกรมที่คุณเลือก

  2. รับความยาวของอาร์เรย์ที่สร้างขึ้นในขั้นตอนที่ 1 สร้างอาร์เรย์ไบต์ใหม่จากจำนวนเต็มนี้ ย้อนกลับอาร์เรย์ไบต์ใหม่

  3. เชื่อมต่ออาร์เรย์ไบต์ใหม่และอาร์เรย์ไบต์ที่มีข้อความ Protobuf เดิม

  4. ส่งอาร์เรย์ที่เชื่อมต่อไปยังสตรีมการเชื่อมต่อ

ตัวอย่างด้านล่างแสดงวิธีการดำเนินการขั้นตอนเหล่านี้ใน SDK Open API อย่างเป็นทางการ

1
2
3
4
5
6
7
private async Task WriteTcp(byte[] messageByte, CancellationToken cancellationToken)
{
    byte[] array = BitConverter.GetBytes(messageByte.Length).Reverse().Concat(messageByte)
        .ToArray();
    await _sslStream.WriteAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
    await _sslStream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
1
2
3
client = Client(EndPoints.PROTOBUF_LIVE_HOST if hostType.lower() == "live" else EndPoints.PROTOBUF_DEMO_HOST, EndPoints.PROTOBUF_PORT, TcpProtocol)
request = ProtoOAApplicationAuthReq() # Can be any message
deferred = client.send(request)

เวอร์ชัน Python

เมื่อใช้ Twisted ตัวอย่าง Python จะดำเนินการเกือบเหมือนกับตัวอย่าง C# client.send(request) สามารถอธิบายได้ดังนี้

1
2
3
request = ProtoOAApplicationAuthReq() # Can be any message
requestAsString = request.SerializeToString() # This method is a part of the Google Protobuf SDK
requestAsInt32String = struct.pack("!H", len(requestAsString)) # The message is concatenated with the reversed array
สำหรับการส่งข้อความ Python SDK ใช้เมธอด Protocol.transport.write()

อ่านข้อความ

โค้ดแบบอะซิงโครนัส

Open API SDK ทั้งหมดทำงานแบบ asynchronous หมายความว่าพวกมันไม่รอให้ข้อความมาถึง แต่จะตอบสนองต่อข้อความที่มาถึงแบบไดนามิก ผลที่ได้คือ การรับข้อความมักจะทำผ่าน event handlers

เพื่ออ่านข้อความ คุณจะต้องทำลำดับการกระทำที่ย้อนกลับขั้นตอนที่จำเป็นสำหรับการส่งข้อความ

  1. รับสี่ไบต์แรกของอาร์เรย์ไบต์ (จำไว้ว่า พวกมันแสดงความยาวของข้อความ) ย้อนกลับสี่ไบต์เหล่านี้และเปลี่ยนเป็นจำนวนเต็ม

  2. อ่านไบต์จำนวน X จากสตรีม โดยที่ X คือจำนวนเต็มที่คุณได้จากขั้นตอนที่ 1

  3. ใช้ Google Protobuf SDK เพื่อ deserialise ข้อความเป็น ProtoMessage ที่ถูกต้อง

  4. ใช้ฟิลด์ payloadType ของออบเจ็กต์ ProtoMessage เพื่อหาประเภทจริงของมัน ผ่าน Google Protobuf SDK เปลี่ยน ProtoMessage เป็นออบเจ็กต์ของประเภท ProtoOA... ที่ต้องการ

ตัวอย่างโค้ดด้านล่างแสดงให้เห็นว่า Open API SDKs อย่างเป็นทางการจัดการกับการอ่านข้อความอย่างไร

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
_tcpClient = new TcpClient
{
    LingerState = new LingerOption(enable: true, 10)
};
await _tcpClient.ConnectAsync(Host, Port).ConfigureAwait(continueOnCapturedContext: false);
SslStream _sslStream = new SslStream(_tcpClient.GetStream(), leaveInnerStreamOpen: false);
await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(continueOnCapturedContext: false);

        private async void ReadTcp(CancellationToken cancellationToken)
    {
        byte[] dataLength = new byte[4];
        byte[] data = null;
        try
        {
            while (!IsDisposed)
            {
                int num = 0;
                do
                {
                    int count = dataLength.Length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(dataLength, num, count, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < dataLength.Length);
                int length = GetLength(dataLength);
                if (length <= 0)
                {
                    continue;
                }

                data = ArrayPool<byte>.Shared.Rent(length);
                num = 0;
                do
                {
                    int count2 = length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(data, num, count2, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < length);
                ProtoMessage protoMessage = ProtoMessage.Parser.ParseFrom(data, 0, length);
                ArrayPool<byte>.Shared.Return(data);
                OnNext(protoMessage);
            }
        }
        catch (Exception innerException)
        {
            if (data != null)
            {
                ArrayPool<byte>.Shared.Return(data);
            }

            ReceiveException exception = new ReceiveException(innerException);
            OnError(exception);
        }
    }

    private int GetLength(byte[] lengthBytes)
    {
        Span<byte> span = lengthBytes.AsSpan();
        span.Reverse();
        return BitConverter.ToInt32(span);
    }

เวอร์ชัน Python

ในตัวอย่าง Python การดำเนินการทั้งหมดกับไบต์ในการรับข้อความจะถูกจัดการโดยเมธอด dataReceived() ดังที่แสดงด้านล่าง

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def dataReceived(self, recd):
    """
    Convert int prefixed strings into calls to stringReceived.
    """
    self.recvd = self.recvd + recd
    while len(self.recvd) >= self.prefixLength and not self.paused:
        length ,= struct.unpack(
            self.structFormat, self.recvd[:self.prefixLength])
        if length > self.MAX_LENGTH:
            self.lengthLimitExceeded(length)
            return
        if len(self.recvd) < length + self.prefixLength:
            break
        packet = self.recvd[self.prefixLength:length + self.prefixLength]
        self.recvd = self.recvd[length + self.prefixLength:]
        self.stringReceived(packet)
1
2
3
4
5
6
7
8
def stringReceived(self, data):
    msg = ProtoMessage()
    msg.ParseFromString(data)

    if msg.payloadType == ProtoHeartbeatEvent().payloadType:
        self.heartbeat()
    self.factory.received(msg)
    return data

ใช้ WebSocket

ส่งข้อความ

เพื่อส่งข้อความผ่านการเชื่อมต่อ WebSocket ให้ทำตามขั้นตอนต่อไปนี้:

  1. Serialise ข้อความเป็นรูปแบบข้อมูลที่เหมาะสม (เช่น สตริง)

  2. เพิ่มข้อความที่ serialise แล้วเข้าไปในคิวส่งของคุณ

ตัวอย่างด้านล่างแสดงให้เห็นว่าขั้นตอนเหล่านี้ดำเนินการอย่างไรใน Open API SDKs อย่างเป็นทางการ

C# SDK ใช้คลาส WebsocketClient ซึ่งเป็นส่วนหนึ่งของแพ็คเกจ Websocket.Client ดังที่แสดงด้านล่าง เมธอด WebsocketClient.Send() ทำงานดังนี้

1
2
3
4
5
public void Send(byte[] message)
{
    Websocket.Client.Validations.Validations.ValidateInput(message, "message");
    _messagesBinaryToSendQueue.Writer.TryWrite(new ArraySegment<byte>(message));
}

อย่างที่คุณเห็น ไคลเอนต์เพียงเพิ่มเซ็กเมนต์อาร์เรย์เข้าไปในคิวส่ง

SDK Python ไม่รองรับมาตรฐาน WebSocket

รับข้อความ

เพื่อรับข้อความผ่านการเชื่อมต่อ WebSocket ให้ทำตามขั้นตอนต่อไปนี้:

  1. ดึงข้อมูลที่ได้รับจาก backend ของ cTrader

  2. Deserialise ข้อมูลเป็น Protobuf message ที่ถูกต้อง

สำหรับการแสดงวิธีการนี้ใน Open API SDKs อย่างเป็นทางการ ดูตัวอย่างโค้ดด้านล่าง

เพื่อรับข้อความ WebsocketClient จำเป็นต้องสมัครสมาชิกกับฟังก์ชัน callback ที่จัดการสิ่งที่ไคลเอนต์ทำเมื่อรับข้อความใหม่

1
2
var client = new WebsocketClient();
client.MessageReceived.Subscribe(msg => {Console.WriteLine(msg);})

ก่อนสมัครสมาชิก .NET SDK จะแยกข้อความเป็น Protobuf message การสมัครสมาชิกที่จำเป็นจะถูกเพิ่มในเนื้อหาของ callback ConnectWebSocket()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
private async Task ConnectWebSocket()
{
    DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(7, 2);
    defaultInterpolatedStringHandler.AppendLiteral("wss://");
    defaultInterpolatedStringHandler.AppendFormatted(Host);
    defaultInterpolatedStringHandler.AppendLiteral(":");
    defaultInterpolatedStringHandler.AppendFormatted(Port);
    Uri url = new Uri(defaultInterpolatedStringHandler.ToStringAndClear());
    _websocketClient = new WebsocketClient(url, () => new ClientWebSocket())
    {
        IsTextMessageConversionEnabled = false,
        ReconnectTimeout = null,
        IsReconnectionEnabled = false,
        ErrorReconnectTimeout = null
    };
    _webSocketMessageReceivedDisposable = _websocketClient.MessageReceived.Select((ResponseMessage msg) => ProtoMessage.Parser.ParseFrom(msg.Binary)).Subscribe(new Action<ProtoMessage>(OnNext));
    _webSocketDisconnectionHappenedDisposable = _websocketClient.DisconnectionHappened.Subscribe(new Action<DisconnectionInfo>(OnWebSocketDisconnectionHappened));
    await _websocketClient.StartOrFail();
}

ใน callback OnNext() ProtoMessage จะถูกส่งไปยัง callback MessageFactory.GetMessage()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void OnNext(ProtoMessage protoMessage)
{
    foreach (KeyValuePair<int, IObserver<IMessage>> observer2 in _observers)
    {
        observer2.Deconstruct(out var _, out var value);
        IObserver<IMessage> observer = value;
        try
        {
            IMessage message = MessageFactory.GetMessage(protoMessage);
            if (protoMessage.HasClientMsgId || message == null)
            {
                observer.OnNext(protoMessage);
            }

            if (message != null)
            {
                observer.OnNext(message);
            }
        }
        catch (Exception innerException)
        {
            ObserverException exception = new ObserverException(innerException, observer);
            OnError(exception);
        }
    }
}

SDK Python ไม่รองรับมาตรฐาน WebSocket