انتقل إلى المحتوى

إرسال واستقبال رسائل Protobuf

تختلف عملية إرسال واستقبال الرسالة بين معايير TCP و WebSocket. أدناه، نشرح هذه العملية بالتفصيل لكلا النوعين من الاتصالات.

استخدام TCP

إرسال رسالة

لإرسال رسالة عبر اتصال TCP، قم بما يلي:

  1. قم بتغيير رسالة Protobuf إلى مصفوفة من البايتات (باستخدام ترميز Protobuf) باستخدام SDK الرسمي لـ Google Protocol Buffer للغة البرمجة التي اخترتها.

  2. احصل على طول المصفوفة التي تم إنشاؤها خلال الخطوة 1. قم بإنشاء مصفوفة بايت جديدة من هذا العدد الصحيح. اعكس مصفوفة البايت الجديدة.

  3. اجمع مصفوفة البايت الجديدة ومصفوفة البايت التي تحتوي على رسالة Protobuf الأصلية.

  4. أرسل المصفوفة المجمعة إلى دفق الاتصال.

توضح الأمثلة أدناه كيف يتم تنفيذ هذه الخطوات في SDK الرسمية لـ Open API.

1
2
3
4
5
6
7
private async Task WriteTcp(byte[] messageByte, CancellationToken cancellationToken)
{
    byte[] array = BitConverter.GetBytes(messageByte.Length).Reverse().Concat(messageByte)
        .ToArray();
    await _sslStream.WriteAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
    await _sslStream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
1
2
3
client = Client(EndPoints.PROTOBUF_LIVE_HOST if hostType.lower() == "live" else EndPoints.PROTOBUF_DEMO_HOST, EndPoints.PROTOBUF_PORT, TcpProtocol)
request = ProtoOAApplicationAuthReq() # Can be any message
deferred = client.send(request)

مثال Python

باستخدام Twisted، يقوم مثال Python بتنفيذ نفس العمليات تقريباً مثل C#. يمكن شرح client.send(request) كما يلي.

1
2
3
request = ProtoOAApplicationAuthReq() # Can be any message
requestAsString = request.SerializeToString() # This method is a part of the Google Protobuf SDK
requestAsInt32String = struct.pack("!H", len(requestAsString)) # The message is concatenated with the reversed array
لإرسال الرسالة، تستخدم مكتبة Python SDK طريقة Protocol.transport.write().

قراءة رسالة

الكود غير المتزامن

تعتمد جميع مكتبات Open API SDK على التنفيذ غير المتزامن، مما يعني أنها لا تنتظر وصول الرسائل ولكن بدلاً من ذلك تتفاعل مع الرسائل الواردة ديناميكياً. نتيجة لذلك، عادةً ما يتم استلام الرسالة عبر معالجات الأحداث

لقراءة رسالة، سيتعين عليك تنفيذ سلسلة من الإجراءات التي تعكس الخطوات المطلوبة لإرسال رسالة.

  1. استلام أول أربعة بايت من مصفوفة البايت (تذكر، تشير إلى طول الرسالة). اعكس هذه البايتات الأربعة وقم بتحويلها إلى عدد صحيح.

  2. اقرأ X بايت من التدفق حيث X هو العدد الصحيح الذي حصلت عليه خلال الخطوة 1.

  3. استخدم مكتبة Google Protobuf SDK لفك تسلسل الرسالة إلى ProtoMessage صالحة.

  4. استخدم حقل payloadType من كائن ProtoMessage للعثور على نوعه الفعلي. عبر مكتبة Google Protobuf SDK، قم بتغيير ProtoMessage إلى كائن من نوع ProtoOA... المطلوب.

توضح مقتطفات الكود أدناه كيف تتعامل مكتبات Open API SDK الرسمية مع قراءة الرسائل.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
_tcpClient = new TcpClient
{
    LingerState = new LingerOption(enable: true, 10)
};
await _tcpClient.ConnectAsync(Host, Port).ConfigureAwait(continueOnCapturedContext: false);
SslStream _sslStream = new SslStream(_tcpClient.GetStream(), leaveInnerStreamOpen: false);
await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(continueOnCapturedContext: false);

        private async void ReadTcp(CancellationToken cancellationToken)
    {
        byte[] dataLength = new byte[4];
        byte[] data = null;
        try
        {
            while (!IsDisposed)
            {
                int num = 0;
                do
                {
                    int count = dataLength.Length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(dataLength, num, count, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < dataLength.Length);
                int length = GetLength(dataLength);
                if (length <= 0)
                {
                    continue;
                }

                data = ArrayPool<byte>.Shared.Rent(length);
                num = 0;
                do
                {
                    int count2 = length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(data, num, count2, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < length);
                ProtoMessage protoMessage = ProtoMessage.Parser.ParseFrom(data, 0, length);
                ArrayPool<byte>.Shared.Return(data);
                OnNext(protoMessage);
            }
        }
        catch (Exception innerException)
        {
            if (data != null)
            {
                ArrayPool<byte>.Shared.Return(data);
            }

            ReceiveException exception = new ReceiveException(innerException);
            OnError(exception);
        }
    }

    private int GetLength(byte[] lengthBytes)
    {
        Span<byte> span = lengthBytes.AsSpan();
        span.Reverse();
        return BitConverter.ToInt32(span);
    }

مثال Python

في مثال Python، تتم معالجة جميع العمليات مع البايتات عند استلام الرسائل بواسطة طريقة dataReceived() كما هو موضح أدناه.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def dataReceived(self, recd):
    """
    Convert int prefixed strings into calls to stringReceived.
    """
    self.recvd = self.recvd + recd
    while len(self.recvd) >= self.prefixLength and not self.paused:
        length ,= struct.unpack(
            self.structFormat, self.recvd[:self.prefixLength])
        if length > self.MAX_LENGTH:
            self.lengthLimitExceeded(length)
            return
        if len(self.recvd) < length + self.prefixLength:
            break
        packet = self.recvd[self.prefixLength:length + self.prefixLength]
        self.recvd = self.recvd[length + self.prefixLength:]
        self.stringReceived(packet)
1
2
3
4
5
6
7
8
def stringReceived(self, data):
    msg = ProtoMessage()
    msg.ParseFromString(data)

    if msg.payloadType == ProtoHeartbeatEvent().payloadType:
        self.heartbeat()
    self.factory.received(msg)
    return data

استخدام WebSocket

إرسال رسالة

لإرسال رسالة عبر اتصال WebSocket، قم بتنفيذ الإجراءات التالية:

  1. قم بتسلسل الرسالة إلى أي تنسيق بيانات مناسب (على سبيل المثال، سلسلة نصية).

  2. أضف الرسالة المتسلسلة إلى قائمة الإرسال الخاصة بك.

توضح الأمثلة أدناه كيف يتم تنفيذ هذه الإجراءات في مكتبات Open API SDK الرسمية.

تستخدم مكتبة C# SDK فئة WebsocketClient وهي جزء من حزمة Websocket.Client. كما هو موضح أدناه، تعمل طريقة WebsocketClient.Send() كما يلي.

1
2
3
4
5
public void Send(byte[] message)
{
    Websocket.Client.Validations.Validations.ValidateInput(message, "message");
    _messagesBinaryToSendQueue.Writer.TryWrite(new ArraySegment<byte>(message));
}

كما ترى، يقوم العميل ببساطة بإضافة قطعة المصفوفة إلى قائمة الإرسال.

لا تدعم حزمة تطوير البرمجيات SDK لـ Python معيار WebSocket.

استلام رسالة

لاستلام رسالة عبر اتصال WebSocket، قم بما يلي:

  1. استرجع البيانات المستلمة من خادم cTrader.

  2. قم بفك تسلسل البيانات إلى رسالة Protobuf صالحة.

للتوضيح حول كيفية تنفيذ ذلك في مكتبات Open API SDK الرسمية، انظر المقتطفات أدناه.

لاستلام الرسائل، يحتاج WebsocketClient إلى الاشتراك في دالة رد النداء التي تتعامل مع ما يفعله العميل عند قبول رسالة جديدة.

1
2
var client = new WebsocketClient();
client.MessageReceived.Subscribe(msg => {Console.WriteLine(msg);})

قبل الاشتراك، تقوم مكتبة .NET SDK بتحليل الرسالة إلى رسالة Protobuf. تتم إضافة الاشتراكات اللازمة في نص دالة رد النداء ConnectWebSocket().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
private async Task ConnectWebSocket()
{
    DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(7, 2);
    defaultInterpolatedStringHandler.AppendLiteral("wss://");
    defaultInterpolatedStringHandler.AppendFormatted(Host);
    defaultInterpolatedStringHandler.AppendLiteral(":");
    defaultInterpolatedStringHandler.AppendFormatted(Port);
    Uri url = new Uri(defaultInterpolatedStringHandler.ToStringAndClear());
    _websocketClient = new WebsocketClient(url, () => new ClientWebSocket())
    {
        IsTextMessageConversionEnabled = false,
        ReconnectTimeout = null,
        IsReconnectionEnabled = false,
        ErrorReconnectTimeout = null
    };
    _webSocketMessageReceivedDisposable = _websocketClient.MessageReceived.Select((ResponseMessage msg) => ProtoMessage.Parser.ParseFrom(msg.Binary)).Subscribe(new Action<ProtoMessage>(OnNext));
    _webSocketDisconnectionHappenedDisposable = _websocketClient.DisconnectionHappened.Subscribe(new Action<DisconnectionInfo>(OnWebSocketDisconnectionHappened));
    await _websocketClient.StartOrFail();
}

في دالة رد النداء OnNext()، يتم تمرير ProtoMessage إلى دالة رد النداء MessageFactory.GetMessage().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void OnNext(ProtoMessage protoMessage)
{
    foreach (KeyValuePair<int, IObserver<IMessage>> observer2 in _observers)
    {
        observer2.Deconstruct(out var _, out var value);
        IObserver<IMessage> observer = value;
        try
        {
            IMessage message = MessageFactory.GetMessage(protoMessage);
            if (protoMessage.HasClientMsgId || message == null)
            {
                observer.OnNext(protoMessage);
            }

            if (message != null)
            {
                observer.OnNext(message);
            }
        }
        catch (Exception innerException)
        {
            ObserverException exception = new ObserverException(innerException, observer);
            OnError(exception);
        }
    }
}

لا تدعم حزمة تطوير البرمجيات SDK لـ Python معيار WebSocket.