diff --git a/src/app/Context.hpp b/src/app/Context.hpp new file mode 100644 index 0000000..8af5fd2 --- /dev/null +++ b/src/app/Context.hpp @@ -0,0 +1,224 @@ +/*++ + +Module Name: + + Client.hpp + +Notices: + + Dial Module + +Author: + + Copyright (c) Prepodobny Alen + + mailto: alienufo@inbox.ru + mailto: ufocomp@gmail.com + +--*/ + +#ifndef APOSTOL_DM_CONTEXT_HPP +#define APOSTOL_DM_CONTEXT_HPP +//---------------------------------------------------------------------------------------------------------------------- + +#define BPS_DEFAULT_SYNC_PERIOD 30 +#define BPS_SERVER_PORT 4977 +#define BPS_SERVER_URL "http://localhost:4977" +#define BPS_BM_SERVER_ADDRESS "BM-2cX8y9u9yEi3fdqQfndx9F6NdR5Hv79add" +#define BPS_BM_DEBUG_ADDRESS "BM-2cXtL92m3CavBKx8qsV2LbZtAU3eQxW2rB" +//---------------------------------------------------------------------------------------------------------------------- + +#ifdef __cplusplus +extern "C++" { +#endif // __cplusplus + +namespace Apostol { + + namespace Context { + + struct CKeyContext { + + CString Name; + CString Key; + + enum CKeyStatus { + ksUnknown = -1, + ksFetching, + ksSuccess, + ksError, + } Status; + + CDateTime StatusTime; + CDateTime RunTime; + + CKeyContext(): Status(ksUnknown), StatusTime(0), RunTime(0) { + Name = "PUBLIC"; + } + + CKeyContext(const CString &Name, const CString &Key): CKeyContext() { + this->Name = Name; + this->Key = Key; + } + + CKeyContext(const CKeyContext &KeyContext): CKeyContext() { + Assign(KeyContext); + } + + void Assign(const CKeyContext &KeyContext) { + this->Name = KeyContext.Name; + this->Key = KeyContext.Key; + this->Status = KeyContext.Status; + this->StatusTime = KeyContext.StatusTime; + this->RunTime = KeyContext.RunTime; + } + + void Clear() { + Name.Clear(); + Key.Clear(); + Status = ksUnknown; + StatusTime = 0; + RunTime = 0; + } + + CKeyContext& operator= (const CKeyContext &KeyContext) { + if (this != &KeyContext) { + Assign(KeyContext); + } + return *this; + }; + + }; + + //-------------------------------------------------------------------------------------------------------------- + + //-- CContext -------------------------------------------------------------------------------------------------- + + //-------------------------------------------------------------------------------------------------------------- + + enum CContextStatus { csInitialization = 0, csInitialized, csAuthorization, csAuthorized, csInProgress, csRunning }; + //-------------------------------------------------------------------------------------------------------------- + + class CContext: public CObject { + private: + + CContextStatus m_Status = csInitialization; + + CDateTime m_CheckDate = 0; + CDateTime m_FixedDate = 0; + + CString m_Name {}; + + CLocation m_URL {}; + + CString m_Session {}; + CString m_Secret {}; + + CKeyContext m_PGP {}; + CKeyContext m_BTC {}; + + CStringList m_BTCKeys {}; + CStringList m_Tokens {}; + + CProviders m_Providers {}; + + public: + + CContext() = default; + + CContext(const CContext &Context): CContext() { + Assign(Context); + } + + explicit CContext(const CLocation &URL): CContext() { + m_URL = URL; + } + + void Assign(const CContext &Context) { + m_Status = Context.m_Status; + m_CheckDate = Context.m_CheckDate; + m_FixedDate = Context.m_FixedDate; + + m_Name = Context.m_Name; + m_URL = Context.m_URL; + m_Session = Context.m_Session; + m_Secret = Context.m_Secret; + m_PGP = Context.m_PGP; + m_BTC = Context.m_BTC; + m_BTCKeys = Context.m_BTCKeys; + m_Tokens = Context.m_Tokens; + m_Providers = Context.m_Providers; + } + + virtual void Clear() { + m_Status = csInitialization; + m_CheckDate = 0; + m_FixedDate = 0; + + m_Name.Clear(); + m_URL.Clear(); + m_Session.Clear(); + m_Secret.Clear(); + m_PGP.Clear(); + m_BTC.Clear(); + m_BTCKeys.Clear(); + m_Tokens.Clear(); + m_Providers.Clear(); + }; + + const CLocation& URL() const { return m_URL; } + + CContextStatus Status() const { return m_Status; } + void SetStatus(CContextStatus Status) { m_Status = Status; } + + CDateTime CheckDate() const { return m_CheckDate; } + void SetCheckDate(CDateTime Value) { m_CheckDate = Value; } + + CDateTime FixedDate() const { return m_FixedDate; } + void SetFixedDate(CDateTime Value) { m_FixedDate = Value; } + + CString& Name() { return m_Name; } + const CString& Name() const { return m_Name; } + + CString& Session() { return m_Session; } + const CString& Session() const { return m_Session; } + + CString& Secret() { return m_Secret; } + const CString& Secret() const { return m_Secret; } + + CKeyContext& PGP() { return m_PGP; } + const CKeyContext& PGP() const { return m_PGP; } + + CKeyContext& BTC() { return m_BTC; } + const CKeyContext& BTC() const { return m_BTC; } + + CStringList& BTCKeys() { return m_BTCKeys; } + const CStringList& BTCKeys() const { return m_BTCKeys; } + + CStringList& Tokens() { return m_Tokens; } + const CStringList& Tokens() const { return m_Tokens; } + + CProviders& Providers() { return m_Providers; } + const CProviders& Providers() const { return m_Providers; } + + CContext& operator= (const CContext &Context) { + if (this != &Context) { + Assign(Context); + } + return *this; + }; + + }; + //-------------------------------------------------------------------------------------------------------------- + + typedef TPair CContextPair; + typedef TPairs CContextList; + + } +} + +using namespace Apostol::Context; +#ifdef __cplusplus +} +#endif // __cplusplus + +#endif //APOSTOL_DM_CONTEXT_HPP diff --git a/src/modules/Workers/WebSocket/WebSocket.cpp b/src/modules/Workers/WebSocket/WebSocket.cpp new file mode 100644 index 0000000..b9c94d8 --- /dev/null +++ b/src/modules/Workers/WebSocket/WebSocket.cpp @@ -0,0 +1,727 @@ +/*++ + +Program name: + + dm + +Module Name: + + WebSocket.cpp + +Notices: + + Module: WebSocket + +Author: + + Copyright (c) Prepodobny Alen + + mailto: alienufo@inbox.ru + mailto: ufocomp@gmail.com + +--*/ + +//---------------------------------------------------------------------------------------------------------------------- + +#include "Core.hpp" +#include "WebSocket.hpp" +//---------------------------------------------------------------------------------------------------------------------- + +#include +//---------------------------------------------------------------------------------------------------------------------- + +#define BPS_PGP_HASH "SHA512" +#define BM_PREFIX "BM-" +//---------------------------------------------------------------------------------------------------------------------- + +#define SYSTEM_PROVIDER_NAME "system" +#define SERVICE_APPLICATION_NAME "service" +#define CONFIG_SECTION_NAME "module" +//---------------------------------------------------------------------------------------------------------------------- + +extern "C++" { + +namespace Apostol { + + namespace Module { + + //-------------------------------------------------------------------------------------------------------------- + + //-- CWebSocketModule ------------------------------------------------------------------------------------------ + + //-------------------------------------------------------------------------------------------------------------- + + CWebSocketModule::CWebSocketModule(CModuleProcess *AProcess) : CApostolModule(AProcess, "web socket", "module/WebSocket") { + m_SyncPeriod = BPS_DEFAULT_SYNC_PERIOD; + m_Headers.Add("Authorization"); + + CWebSocketModule::InitMethods(); + + InitServerList(); + Reload(); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::InitMethods() { +#if defined(_GLIBCXX_RELEASE) && (_GLIBCXX_RELEASE >= 9) + m_pMethods->AddObject(_T("GET") , (CObject *) new CMethodHandler(true , [this](auto && Connection) { DoGet(Connection); })); + m_pMethods->AddObject(_T("POST") , (CObject *) new CMethodHandler(true , [this](auto && Connection) { DoPost(Connection); })); + m_pMethods->AddObject(_T("HEAD") , (CObject *) new CMethodHandler(true , [this](auto && Connection) { DoHead(Connection); })); + m_pMethods->AddObject(_T("OPTIONS"), (CObject *) new CMethodHandler(true , [this](auto && Connection) { DoOptions(Connection); })); + m_pMethods->AddObject(_T("PUT") , (CObject *) new CMethodHandler(false, [this](auto && Connection) { MethodNotAllowed(Connection); })); + m_pMethods->AddObject(_T("DELETE") , (CObject *) new CMethodHandler(false, [this](auto && Connection) { MethodNotAllowed(Connection); })); + m_pMethods->AddObject(_T("TRACE") , (CObject *) new CMethodHandler(false, [this](auto && Connection) { MethodNotAllowed(Connection); })); + m_pMethods->AddObject(_T("PATCH") , (CObject *) new CMethodHandler(false, [this](auto && Connection) { MethodNotAllowed(Connection); })); + m_pMethods->AddObject(_T("CONNECT"), (CObject *) new CMethodHandler(false, [this](auto && Connection) { MethodNotAllowed(Connection); })); +#else + m_pMethods->AddObject(_T("GET") , (CObject *) new CMethodHandler(true, std::bind(&CWebSocketModule::DoGet, this, _1))); + m_pMethods->AddObject(_T("POST") , (CObject *) new CMethodHandler(true, std::bind(&CWebSocketModule::DoPost, this, _1))); + m_pMethods->AddObject(_T("HEAD") , (CObject *) new CMethodHandler(true, std::bind(&CWebSocketModule::DoHead, this, _1))); + m_pMethods->AddObject(_T("OPTIONS"), (CObject *) new CMethodHandler(true, std::bind(&CWebSocketModule::DoOptions, this, _1))); + m_pMethods->AddObject(_T("PUT") , (CObject *) new CMethodHandler(false, std::bind(&CWebSocketModule::MethodNotAllowed, this, _1))); + m_pMethods->AddObject(_T("DELETE") , (CObject *) new CMethodHandler(false, std::bind(&CWebSocketModule::MethodNotAllowed, this, _1))); + m_pMethods->AddObject(_T("TRACE") , (CObject *) new CMethodHandler(false, std::bind(&CWebSocketModule::MethodNotAllowed, this, _1))); + m_pMethods->AddObject(_T("PATCH") , (CObject *) new CMethodHandler(false, std::bind(&CWebSocketModule::MethodNotAllowed, this, _1))); + m_pMethods->AddObject(_T("CONNECT"), (CObject *) new CMethodHandler(false, std::bind(&CWebSocketModule::MethodNotAllowed, this, _1))); +#endif + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::InitServerList() { + m_DefaultServer.Value().Name() = m_DefaultServer.Name(); + m_DefaultServer.Value().PGP().Name = "PUBLIC"; + + if (m_Servers.Count() == 0) { +#ifdef _DEBUG + int index = m_Servers.AddPair(BPS_BM_DEBUG_ADDRESS, CClientContext(CLocation(BPS_SERVER_URL))); + m_Servers[index].Value().Name() = m_Servers[index].Name(); + m_Servers[index].Value().PGP().Name = "PUBLIC"; +#else + m_Servers.Add(m_DefaultServer); +#endif + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::UpdateServerList(const CString &Key) { + CStringPairs ServerList; + CStringList Keys; + + ParsePGPKey(Key, ServerList, Keys); + + if (ServerList.Count() != 0) { + CStringPairs::ConstEnumerator em(ServerList); + while (em.MoveNext()) { + const auto ¤t = em.Current(); + if (m_Servers.IndexOfName(current.Name()) == -1) { + m_Servers.AddPair(current.Name(), CClientContext(CLocation(current.Value()))); + auto &Context = m_Servers.Last().Value(); + Context.Name() = current.Name(); + Context.PGP().Name = "PUBLIC"; + Context.PGP().Key = Key; + Context.BTCKeys() = Keys; + } + } + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::UpdateOAuth2() { + const auto &oauth2 = Config()->IniFile().ReadString(CONFIG_SECTION_NAME, "oauth2", "oauth2/service.json"); + const auto &provider = CString(SYSTEM_PROVIDER_NAME); + const auto &application = CString(SERVICE_APPLICATION_NAME); + + for (int i = 0; i < m_Servers.Count(); i++) { + auto &Context = m_Servers[i].Value(); + if (!oauth2.empty() && Context.Status() == Context::csInitialization) { + LoadOAuth2(oauth2, provider, application, Context.Providers()); + Context.SetStatus(Context::csInitialized); + } + Context.SetCheckDate(0); + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::LoadOAuth2(const CString &FileName, const CString &ProviderName, const CString &ApplicationName, CProviders &Providers) { + CString ConfigFile(FileName); + + if (!path_separator(ConfigFile.front())) { + ConfigFile = Config()->Prefix() + ConfigFile; + } + + if (FileExists(ConfigFile.c_str())) { + CJSONObject Json; + Json.LoadFromFile(ConfigFile.c_str()); + + int index = Providers.IndexOfName(ProviderName); + if (index == -1) + index = Providers.AddPair(ProviderName, CProvider(ProviderName)); + auto& Provider = Providers[index].Value(); + Provider.Applications().AddPair(ApplicationName, Json); + } else { + Log()->Error(APP_LOG_WARN, 0, APP_FILE_NOT_FOUND, ConfigFile.c_str()); + } + } + //-------------------------------------------------------------------------------------------------------------- + + bool CWebSocketModule::FindURLInLine(const CString &Line, CStringList &List) { + CString URL; + + TCHAR ch; + int length = 0; + size_t startPos, pos; + + pos = 0; + while ((startPos = Line.Find(HTTP_PREFIX, pos)) != CString::npos) { + + URL.Clear(); + + pos = startPos + HTTP_PREFIX_SIZE; + if (Line.Length() < 5) + return false; + + URL.Append(HTTP_PREFIX); + ch = Line.at(pos); + if (ch == 's') { + URL.Append(ch); + pos++; + } + + if (Line.Length() < 7 || Line.at(pos++) != ':' || Line.at(pos++) != '/' || Line.at(pos++) != '/') + return false; + + URL.Append("://"); + + length = 0; + ch = Line.at(pos); + while (ch != 0 && (IsChar(ch) || IsNumeral(ch) || ch == ':' || ch == '.' || ch == '-')) { + URL.Append(ch); + length++; + ch = Line.at(++pos); + } + + if (length < 3) { + return false; + } + + if (startPos == 0) { + List.Add(URL); + } else { + ch = Line.at(startPos - 1); + switch (ch) { + case ' ': + case ',': + case ';': + List.Add(URL); + break; + default: + return false; + } + } + } + + return true; + } + //-------------------------------------------------------------------------------------------------------------- + + CWebSocketClient *CWebSocketModule::GetWebSocketClient(CClientContext &Context) { + auto pClient = Context.ClientManager().Add(&Context, CLocation(Context.URL().Origin() + "/session/" + Context.Session())); + + pClient->Session() = Context.Session(); + + pClient->ClientName() = GApplication->Title(); + pClient->AutoConnect(false); + pClient->AllocateEventHandlers(Server()); + +#if defined(_GLIBCXX_RELEASE) && (_GLIBCXX_RELEASE >= 9) + pClient->OnVerbose([this](auto && Sender, auto && AConnection, auto && AFormat, auto && args) { DoVerbose(Sender, AConnection, AFormat, args); }); + pClient->OnException([this](auto && AConnection, auto && AException) { DoException(AConnection, AException); }); + pClient->OnEventHandlerException([this](auto && AHandler, auto && AException) { DoEventHandlerException(AHandler, AException); }); + pClient->OnNoCommandHandler([this](auto && Sender, auto && AData, auto && AConnection) { DoNoCommandHandler(Sender, AData, AConnection); }); + pClient->OnWebSocketError([this](auto && AConnection) { DoWebSocketError(AConnection); }); + pClient->OnConnected([this](auto && Sender) { DoClientConnected(Sender); }); + pClient->OnDisconnected([this](auto && Sender) { DoClientDisconnected(Sender); }); + pClient->OnMessage([this](auto && Sender, auto && Message) { DoClientMessage(Sender, Message); }); + pClient->OnError([this](auto && Sender, int Code, auto && Message) { DoClientError(Sender, Code, Message); }); + pClient->OnHeartbeat([this](auto && Sender) { DoClientHeartbeat(Sender); }); + pClient->OnTimeOut([this](auto && Sender) { DoClientTimeOut(Sender); }); + pClient->OnAuthorize([this](auto && Sender, auto && Request, auto && Response) { DoAuthorizeEvent(Sender, Request, Response); }); + pClient->OnSubscribe([this](auto && Sender, auto && Request, auto && Response) { DoSubscribeEvent(Sender, Request, Response); }); + pClient->OnKey([this](auto && Sender, auto && Request, auto && Response) { DoKeyEvent(Sender, Request, Response); }); + pClient->OnWebSocketEvent([this](auto && Sender, auto && Request, auto && Response) { DoWebSocketClientEvent(Sender, Request, Response); }); +#else + pClient->OnVerbose(std::bind(&CWebSocketModule::DoVerbose, this, _1, _2, _3, _4)); + pClient->OnException(std::bind(&CWebSocketModule::DoException, this, _1, _2)); + pClient->OnEventHandlerException(std::bind(&CWebSocketModule::DoEventHandlerException, this, _1, _2)); + pClient->OnNoCommandHandler(std::bind(&CWebSocketModule::DoNoCommandHandler, this, _1, _2, _3)); + pClient->OnWebSocketError(std::bind(&CWebSocketModule::DoWebSocketError, this, _1)); + pClient->OnConnected(std::bind(&CWebSocketModule::DoClientConnected, this, _1)); + pClient->OnDisconnected(std::bind(&CWebSocketModule::DoClientDisconnected, this, _1)); + pClient->OnMessage(std::bind(&CWebSocketModule::DoClientMessage, this, _1, _2)); + pClient->OnError(std::bind(&CWebSocketModule::DoClientError, this, _1, _2, _3)); + pClient->OnHeartbeat(std::bind(&CWebSocketModule::DoClientHeartbeat, this, _1)); + pClient->OnTimeOut(std::bind(&CWebSocketModule::DoClientTimeOut, this, _1)); + pClient->OnAuthorize(std::bind(&CWebSocketModule::DoAuthorizeEvent, this, _1, _2, _3)); + pClient->OnSubscribe(std::bind(&CWebSocketModule::DoSubscribeEvent, this, _1, _2, _3)); + pClient->OnKey(std::bind(&CWebSocketModule::DoKeyEvent, this, _1, _2, _3)); + pClient->OnWebSocketEvent(std::bind(&CWebSocketModule::DoWebSocketClientEvent, this, _1, _2, _3)); +#endif + return pClient; + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::CreateWebSocketClient(CClientContext &Context) { + auto pClient = GetWebSocketClient(Context); + try { + pClient->Active(true); + + Context.SetFixedDate(0); + Context.SetStatus(csRunning); + } catch (std::exception &e) { + Log()->Error(APP_LOG_ERR, 0, e.what()); + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::ParsePGPKey(const CString &Key, CStringPairs &ServerList, CStringList &BTCKeys) { + if (Key.IsEmpty()) + return; + + const Apostol::PGP::Key pgp(Key.c_str()); + if (!pgp.meaningful()) + return; + + CStringList Data; + CPGPUserIdList List; + CStringList KeyList; + + pgp.ExportUID(List); + + for (int i = 0; i < List.Count(); i++) { + + const auto& uid = List[i]; + + DebugMessage("%s (%s)\n", uid.Name.c_str(), uid.Desc.c_str()); + + const auto& name = uid.Name.Lower(); + const auto& data = uid.Desc.Lower(); + + if (name == "technical_data") { + SplitColumns(data, Data, ';'); + + if (Data.Count() == 3) { + m_SyncPeriod = StrToIntDef(Data[1].Trim().c_str(), BPS_DEFAULT_SYNC_PERIOD); + } else if (Data.Count() == 2) { + m_SyncPeriod = StrToIntDef(Data[0].Trim().c_str(), BPS_DEFAULT_SYNC_PERIOD); + } else if (Data.Count() == 1) { + m_SyncPeriod = StrToIntDef(Data[0].Trim().c_str(), BPS_DEFAULT_SYNC_PERIOD); + } + } if (uid.Name.Length() >= 35 && uid.Name.SubString(0, 3) == BM_PREFIX) { + CStringList urlList; + if (FindURLInLine(uid.Desc, urlList)) { + for (int l = 0; l < urlList.Count(); l++) { + ServerList.AddPair(uid.Name, urlList[l]); + } + } + } else if (name.Find("bitcoin_key") != CString::npos) { + const auto& key = wallet::ec_public(data.c_str()); + if (verify(key)) + KeyList.AddPair(name, key.encoded()); + } + } + + CString Name; + for (int i = 1; i <= KeyList.Count(); i++) { + Name = "bitcoin_key"; + Name << i; + const auto& key = KeyList[Name]; + if (!key.IsEmpty()) { + BTCKeys.Add(key); + } + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::CreateAccessToken(const CProvider &Provider, const CString &Application, CClientContext &Context) { + + auto OnDone = [&Context](CTCPConnection *Sender) { + + auto pConnection = dynamic_cast (Sender); + auto pReply = pConnection->Reply(); + + DebugReply(pReply); + + if (pReply->Status == CHTTPReply::ok) { + const CJSON Json(pReply->Content); + + Context.SetStatus(csAuthorized); + + Context.Session() = Json["session"].AsString(); + Context.Secret() = Json["secret"].AsString(); + + Context.SetFixedDate(0); + Context.SetCheckDate(Now() + (CDateTime) 55 / MinsPerDay); // 55 min + } + + return true; + }; + + auto OnHTTPClient = [this](const CLocation &URI) { + return GetClient(URI.hostname, URI.port); + }; + + CString server_uri(Context.URL().Origin()); + + const auto &token_uri = Provider.TokenURI(Application); + const auto &service_token = CToken::CreateToken(Provider, Application); + + Context.Tokens().Values("service_token", service_token); + + if (!token_uri.IsEmpty()) { + CToken::FetchAccessToken(token_uri.front() == '/' ? server_uri + token_uri : token_uri, service_token, OnHTTPClient, OnDone); + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::FetchProviders(CDateTime Now, CClientContext &Context) { + for (int i = 0; i < Context.Providers().Count(); i++) { + auto &Provider = Context.Providers()[i].Value(); + for (int j = 0; j < Provider.Applications().Count(); ++j) { + const auto &app = Provider.Applications().Members(j); + if (app["type"].AsString() == "service_account") { + if (Provider.KeyStatus() == ksUnknown) { + CreateAccessToken(Provider, app.String(), Context); + Provider.KeyStatusTime(Now); + Provider.KeyStatus(ksSuccess); + } + } + } + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::CheckProviders(CDateTime Now, CClientContext &Context) { + for (int i = 0; i < Context.Providers().Count(); i++) { + auto& Provider = Context.Providers()[i].Value(); + if (Provider.KeyStatus() != ksUnknown) { + Provider.KeyStatusTime(Now); + Provider.KeyStatus(ksUnknown); + } + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoClientConnected(CObject *Sender) { + auto pConnection = dynamic_cast(Sender); + if (pConnection != nullptr) { + auto pBinding = pConnection->Socket()->Binding(); + if (pBinding != nullptr) { + Log()->Notice(_T("[%s:%d] [%s] WebSocket client connected."), + pConnection->Socket()->Binding()->IP(), + pConnection->Socket()->Binding()->Port(), + pConnection->Session().c_str()); + } else { + Log()->Notice(_T("[%s] WebSocket client connected."), + pConnection->Session().c_str()); + } + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoClientDisconnected(CObject *Sender) { + auto pConnection = dynamic_cast(Sender); + if (pConnection != nullptr) { + auto pBinding = pConnection->Socket()->Binding(); + if (pBinding != nullptr) { + Log()->Notice(_T("[%s:%d] [%s] WebSocket client disconnected."), + pConnection->Socket()->Binding()->IP(), + pConnection->Socket()->Binding()->Port(), + pConnection->Session().c_str()); + } else { + Log()->Notice(_T("[%s] WebSocket client disconnected."), + pConnection->Session().c_str()); + } + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoClientHeartbeat(CObject *Sender) { + auto pClient = dynamic_cast (Sender); + chASSERT(pClient); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoClientTimeOut(CObject *Sender) { + auto pClient = dynamic_cast (Sender); + chASSERT(pClient); + auto pContext = pClient->Context(); + chASSERT(pContext); + pClient->SwitchConnection(nullptr); + pClient->Reload(); + pContext->SetFixedDate(0); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoClientMessage(CObject *Sender, const CWSMessage &Message) { + auto pClient = dynamic_cast (Sender); + chASSERT(pClient); + Log()->Message("[%s] [%s] [%s] [%s] %s", pClient->Session().c_str(), + Message.UniqueId.c_str(), + Message.Action.IsEmpty() ? "Unknown" : Message.Action.c_str(), + CWSMessage::MessageTypeIdToString(Message.MessageTypeId).c_str(), + Message.Payload.IsNull() ? "{}" : Message.Payload.ToString().c_str()); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoClientError(CObject *Sender, int Code, const CString &Message) { + Log()->Error(APP_LOG_ERR, 0, "[%d] %s", Code, Message.c_str()); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoAuthorizeEvent(CObject *Sender, const CWSMessage &Request, const CWSMessage &Response) { + auto pClient = dynamic_cast (Sender); + chASSERT(pClient); + auto pContext = pClient->Context(); + chASSERT(pContext); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoSubscribeEvent(CObject *Sender, const CWSMessage &Request, const CWSMessage &Response) { + auto pClient = dynamic_cast (Sender); + chASSERT(pClient); + auto pContext = pClient->Context(); + chASSERT(pContext); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoKeyEvent(CObject *Sender, const CWSMessage &Request, const CWSMessage &Response) { + auto pClient = dynamic_cast (Sender); + chASSERT(pClient); + auto pContext = pClient->Context(); + chASSERT(pContext); + + pContext->PGP().StatusTime = Now(); + pContext->PGP().Status = CKeyContext::ksSuccess; + pContext->PGP().RunTime = GetRandomDate(10 * 60, m_SyncPeriod * 60, pContext->PGP().StatusTime); // 10..m_SyncPeriod min + + if (Response.Payload.HasOwnProperty("data")) { + UpdateServerList(Response.Payload["data"].AsString()); + UpdateOAuth2(); + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoWebSocketClientEvent(CObject *Sender, const CWSMessage &Request, const CWSMessage &Response) { + auto pClient = dynamic_cast (Sender); + chASSERT(pClient); + auto pContext = pClient->Context(); + chASSERT(pContext); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoWebSocketError(CTCPConnection *AConnection) { + auto pConnection = dynamic_cast (AConnection); + auto pClient = dynamic_cast (pConnection->Client()); + chASSERT(pClient); + auto pContext = pClient->Context(); + chASSERT(pContext); + auto pReply = pConnection->Reply(); + + if (pReply->Status == CHTTPReply::moved_permanently || pReply->Status == CHTTPReply::moved_temporarily) { + const auto &caLocation = pReply->Headers["Location"]; + if (!caLocation.IsEmpty()) { + pClient->SetURI(CLocation(caLocation)); + Log()->Notice(_T("[%s] Redirect to %s."), pClient->Session().c_str(), pClient->URI().href().c_str()); + } + pContext->SetFixedDate(0); + } else { + auto pBinding = pConnection->Socket()->Binding(); + if (pBinding != nullptr) { + Log()->Warning(_T("[%s:%d] [%s] WebSocket client failed to establish connection"), + pConnection->Socket()->Binding()->IP(), + pConnection->Socket()->Binding()->Port(), + pConnection->Session().c_str()); + } else { + Log()->Warning(_T("[%s] WebSocket client failed to establish connection."), + pConnection->Session().c_str()); + } + pContext->SetFixedDate(Now() + (CDateTime) 1 / MinsPerDay); // 1 min + } + + pConnection->CloseConnection(true); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoUser(CHTTPServerConnection *AConnection, const CString &Method, const CString &URI) { + + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoDeal(CHTTPServerConnection *AConnection, const CString &Method, const CString &URI, const CString &Action) { + + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoSignature(CHTTPServerConnection *AConnection) { + + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoAPI(CHTTPServerConnection *AConnection) { + + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoGet(CHTTPServerConnection *AConnection) { + + auto pRequest = AConnection->Request(); + + CString sPath(pRequest->Location.pathname); + + // Request sPath must be absolute and not contain "..". + if (sPath.empty() || sPath.front() != '/' || sPath.find(_T("..")) != CString::npos) { + AConnection->SendStockReply(CHTTPReply::bad_request); + return; + } + + if (sPath.SubString(0, 5) == "/api/") { + DoAPI(AConnection); + return; + } + + SendResource(AConnection, sPath); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::DoPost(CHTTPServerConnection *AConnection) { + + auto pRequest = AConnection->Request(); + auto pReply = AConnection->Reply(); + + pReply->ContentType = CHTTPReply::json; + + CStringList slRouts; + SplitColumns(pRequest->Location.pathname, slRouts, '/'); + + if (slRouts.Count() < 3) { + AConnection->SendStockReply(CHTTPReply::not_found); + return; + } + + const auto& caService = slRouts[0].Lower(); + const auto& caVersion = slRouts[1].Lower(); + const auto& caCommand = slRouts[2].Lower(); + const auto& caAction = slRouts.Count() == 4 ? slRouts[3].Lower() : ""; + + if (caService != "api") { + AConnection->SendStockReply(CHTTPReply::not_found); + return; + } + + if (caVersion != "v1") { + AConnection->SendStockReply(CHTTPReply::not_found); + return; + } + + CString sRoute; + for (int i = 0; i < slRouts.Count(); ++i) { + sRoute.Append('/'); + sRoute.Append(slRouts[i]); + } + + try { + + if (caCommand == "account") { + + DoUser(AConnection, "POST", sRoute); + + } else if (caCommand == "deal") { + + DoDeal(AConnection, "POST", sRoute, caAction); + + } else if (caCommand == "signature") { + + DoSignature(AConnection); + + } else { + + AConnection->SendStockReply(CHTTPReply::not_found); + + } + + } catch (std::exception &e) { + ExceptionToJson(0, e, pReply->Content); + + AConnection->SendReply(CHTTPReply::internal_server_error); + Log()->Error(APP_LOG_EMERG, 0, e.what()); + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::Heartbeat(CDateTime Now) { + for (int i = 0; i < m_Servers.Count(); i++) { + auto &Context = m_Servers[i].Value(); + + if ((Now >= Context.CheckDate())) { + Context.SetCheckDate(Now + (CDateTime) 30 / SecsPerDay); // 30 sec + if (Context.Status() == Context::csInitialized) { + Context.SetStatus(Context::csAuthorization); + + CheckProviders(Now, Context); + FetchProviders(Now, Context); + } + } + + if (Context.Status() == Context::csAuthorized) { + if ((Now >= Context.FixedDate())) { + Context.SetFixedDate(Now + (CDateTime) 30 / SecsPerDay); // 30 sec + Context.SetStatus(Context::csInProgress); + CreateWebSocketClient(Context); + } + } + + if (Context.Status() == Context::csRunning) { + if ((Now >= Context.FixedDate())) { + Context.SetFixedDate(Now + (CDateTime) 30 / SecsPerDay); // 30 sec + + for (int j = 0; j < Context.ClientManager().Count(); ++j) { + auto pClient = Context.ClientManager()[j]; + + if (!pClient->Active()) + pClient->Active(true); + + if (!pClient->Connected() && !pClient->Session().IsEmpty()) { + Log()->Notice(_T("[%s] Trying connect to %s."), pClient->Session().c_str(), pClient->URI().href().c_str()); + pClient->ConnectStart(); + } + } + } + } + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketModule::Reload() { + const auto& caPublicKey = Config()->IniFile().ReadString("pgp", "public", "dm.pub"); + + if (FileExists(caPublicKey.c_str())) { + CString Key; + Key.LoadFromFile(caPublicKey.c_str()); + + UpdateServerList(Key); + UpdateOAuth2(); + } else { + Log()->Error(APP_LOG_WARN, 0, APP_FILE_NOT_FOUND, caPublicKey.c_str()); + } + } + //-------------------------------------------------------------------------------------------------------------- + + bool CWebSocketModule::Enabled() { + if (m_ModuleStatus == msUnknown) + m_ModuleStatus = Config()->IniFile().ReadBool(SectionName().c_str(), "enable", false) ? msEnabled: msDisabled; + return m_ModuleStatus == msEnabled; + } + //-------------------------------------------------------------------------------------------------------------- + } +} +} \ No newline at end of file diff --git a/src/modules/Workers/WebSocket/WebSocket.hpp b/src/modules/Workers/WebSocket/WebSocket.hpp new file mode 100644 index 0000000..eb491c9 --- /dev/null +++ b/src/modules/Workers/WebSocket/WebSocket.hpp @@ -0,0 +1,143 @@ +/*++ + +Program name: + + dm + +Module Name: + + WebSocketModule.hpp + +Notices: + + Module: WebSocket + +Author: + + Copyright (c) Prepodobny Alen + + mailto: alienufo@inbox.ru + mailto: ufocomp@gmail.com + +--*/ + +#ifndef DM_WEB_SOCKET_MODULE_HPP +#define DM_WEB_SOCKET_MODULE_HPP +//---------------------------------------------------------------------------------------------------------------------- + +extern "C++" { + +namespace Apostol { + + namespace Module { + + class CClientContext: public CContext { + private: + + CWebSocketClientManager m_ClientManager {}; + + public: + + CClientContext() = default; + + CClientContext(const CClientContext &Context): CContext(Context) { + Assign(Context); + } + + explicit CClientContext(const CLocation &URI): CContext(URI) { + + } + + CWebSocketClientManager& ClientManager() { return m_ClientManager; } + + const CWebSocketClientManager& ClientManager() const { return m_ClientManager; } + + }; + //-------------------------------------------------------------------------------------------------------------- + + typedef TPair CClientContextPair; + typedef TPairs CClientContextList; + + //-------------------------------------------------------------------------------------------------------------- + + //-- CWebSocketModule ------------------------------------------------------------------------------------------ + + //-------------------------------------------------------------------------------------------------------------- + + class CWebSocketModule: public CApostolModule { + private: + + int m_SyncPeriod; + + CClientContextPair m_DefaultServer { BPS_BM_SERVER_ADDRESS, CClientContext(CLocation(BPS_SERVER_URL)) }; + + CClientContextList m_Servers {}; + + void InitMethods() override; + + void InitServerList(); + void UpdateServerList(const CString &Key); + void UpdateOAuth2(); + + void FetchProviders(CDateTime Now, CClientContext &Context); + void CheckProviders(CDateTime Now, CClientContext &Context); + + void CreateAccessToken(const CProvider &Provider, const CString &Application, CClientContext &Context); + void ParsePGPKey(const CString& Key, CStringPairs& ServerList, CStringList& BTCKeys); + + CWebSocketClient *GetWebSocketClient(CClientContext &Context); + void CreateWebSocketClient(CClientContext &Context); + + protected: + + void Heartbeat(CDateTime Now) override; + + void DoUser(CHTTPServerConnection *AConnection, const CString &Method, const CString &URI); + void DoDeal(CHTTPServerConnection *AConnection, const CString &Method, const CString &URI, const CString &Action); + + void DoSignature(CHTTPServerConnection *AConnection); + + void DoAPI(CHTTPServerConnection *AConnection); + + void DoGet(CHTTPServerConnection *AConnection) override; + void DoPost(CHTTPServerConnection *AConnection); + + void DoWebSocketError(CTCPConnection *AConnection); + void DoClientConnected(CObject *Sender); + void DoClientDisconnected(CObject *Sender); + + void DoClientHeartbeat(CObject *Sender); + void DoClientTimeOut(CObject *Sender); + + void DoClientMessage(CObject *Sender, const CWSMessage &Message); + void DoClientError(CObject *Sender, int Code, const CString &Message); + + void DoAuthorizeEvent(CObject *Sender, const CWSMessage &Request, const CWSMessage &Response); + void DoSubscribeEvent(CObject *Sender, const CWSMessage &Request, const CWSMessage &Response); + void DoKeyEvent(CObject *Sender, const CWSMessage &Request, const CWSMessage &Response); + + void DoWebSocketClientEvent(CObject *Sender, const CWSMessage &Request, const CWSMessage &Response); + + public: + + explicit CWebSocketModule(CModuleProcess *AProcess); + + ~CWebSocketModule() override = default; + + static class CWebSocketModule *CreateModule(CModuleProcess *AProcess) { + return new CWebSocketModule(AProcess); + } + + void Reload(); + + bool Enabled() override; + + static bool FindURLInLine(const CString &Line, CStringList &List); + static void LoadOAuth2(const CString &FileName, const CString &ProviderName, const CString &ApplicationName, CProviders &Providers); + }; + } +} + +using namespace Apostol::Module; +} +#endif //DM_WEB_SOCKET_MODULE_HPP diff --git a/src/modules/Workers/WebSocket/WebSocketClient.cpp b/src/modules/Workers/WebSocket/WebSocketClient.cpp new file mode 100644 index 0000000..da2b809 --- /dev/null +++ b/src/modules/Workers/WebSocket/WebSocketClient.cpp @@ -0,0 +1,303 @@ +/*++ + +Program name: + + dm + +Module Name: + + WebSocketClient.cpp + +Notices: + + WebSocket Client + +Author: + + Copyright (c) Prepodobny Alen + + mailto: alienufo@inbox.ru + mailto: ufocomp@gmail.com + +--*/ + +#include "Core.hpp" +#include "WebSocketClient.hpp" +//---------------------------------------------------------------------------------------------------------------------- + +extern "C++" { + +namespace Apostol { + + namespace Client { + + CString GenUniqueId() { + return GetUID(32).Lower(); + } + + //-------------------------------------------------------------------------------------------------------------- + + //-- CWebSocketClient ------------------------------------------------------------------------------------------ + + //-------------------------------------------------------------------------------------------------------------- + + CWebSocketClient::CWebSocketClient(): CCustomWebSocketClient() { + m_pContext = nullptr; + m_SendCount = 0; + m_PingDateTime = 0; + m_PongDateTime = 0; + m_HeartbeatDateTime = 0; + m_RegistrationDateTime = 0; + m_HeartbeatInterval = 600; + m_Authorized = false; + m_OnAuthorizeEvent = nullptr; + m_OnSubscribeEvent = nullptr; + m_OnKeyEvent = nullptr; + m_OnWebSocketEvent = nullptr; + } + //-------------------------------------------------------------------------------------------------------------- + + CWebSocketClient::CWebSocketClient(CContext *Context, const CLocation &URI): CCustomWebSocketClient(URI) { + m_pContext = Context; + m_SendCount = 0; + m_PingDateTime = 0; + m_PongDateTime = 0; + m_HeartbeatDateTime = 0; + m_RegistrationDateTime = 0; + m_HeartbeatInterval = 600; + m_Authorized = false; + m_OnAuthorizeEvent = nullptr; + m_OnSubscribeEvent = nullptr; + m_OnKeyEvent = nullptr; + m_OnWebSocketEvent = nullptr; + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::Reload() { + m_Authorized = false; + m_SendCount = 0; + m_PongDateTime = 0; + m_HeartbeatDateTime = 0; + m_RegistrationDateTime = 0; + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::CheckCallError(const CWSMessage &Error, const CWSMessage &Message) { + if (Error.ErrorCode == 401) { + m_Authorized = false; + m_RegistrationDateTime = 0; + m_MessageList.Add(Message); + } + } + //-------------------------------------------------------------------------------------------------------------- + + bool CWebSocketClient::Authorized() const { + return m_Authorized; + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::Authorize() { + + auto OnRequest = [this](CWebSocketMessageHandler *AHandler, CWebSocketConnection *AWSConnection) { + const auto &wsMessage = RequestToMessage(AWSConnection); + if (wsMessage.MessageTypeId == mtCallResult) { + if (wsMessage.Payload.HasOwnProperty("authorized")) { + m_Authorized = wsMessage.Payload["authorized"].AsBoolean(); + DoAuthorizeEvent(AHandler->Message(), wsMessage); + } + + if (m_Authorized) { + Subscribe(); + } + } else if (wsMessage.MessageTypeId == mtCallError) { + DoError(wsMessage.ErrorCode, wsMessage.ErrorMessage); + } + }; + + CWSMessage Message; + + Message.MessageTypeId = WSProtocol::mtOpen; + Message.UniqueId = GenUniqueId(); + Message.Action = "/authorize"; + Message.Payload = CString().Format(R"({"secret": "%s"})", m_pContext->Secret().c_str()); + + SendMessage(Message, OnRequest); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::Subscribe() { + + auto OnRequest = [this](CWebSocketMessageHandler *AHandler, CWebSocketConnection *AWSConnection) { + const auto &wsMessage = RequestToMessage(AWSConnection); + if (wsMessage.MessageTypeId == mtCallResult) { + DoSubscribeEvent(AHandler->Message(), wsMessage); + UpdateKey(); + } else if (wsMessage.MessageTypeId == mtCallError) { + DoError(wsMessage.ErrorCode, wsMessage.ErrorMessage); + } + }; + + CWSMessage Message; + + Message.MessageTypeId = WSProtocol::mtCall; + Message.UniqueId = GenUniqueId(); + Message.Action = "/observer/subscribe"; + Message.Payload = CString().Format(R"({"publisher":"notify","filter": {"classes": ["key"]},"params":{"type":"object"}})"); + + SendMessage(Message, OnRequest); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::UpdateKey() { + + auto OnRequest = [this](CWebSocketMessageHandler *AHandler, CWebSocketConnection *AWSConnection) { + const auto &wsMessage = RequestToMessage(AWSConnection); + if (wsMessage.MessageTypeId == mtCallResult) { + DoKeyEvent(AHandler->Message(), wsMessage); + } else if (wsMessage.MessageTypeId == mtCallError) { + DoError(wsMessage.ErrorCode, wsMessage.ErrorMessage); + } + }; + + CWSMessage Message; + + Message.MessageTypeId = WSProtocol::mtCall; + Message.UniqueId = GenUniqueId(); + Message.Action = "/key/public"; + Message.Payload = CString().Format(R"({"type": "pgp", "account": "%s", "code": "%s", "fields": ["data"]})", m_pContext->Name().c_str(), m_pContext->PGP().Name.c_str()); + + SendMessage(Message, OnRequest); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::Send(const CString &Action, const CString &Payload, COnMessageHandlerEvent &&Handler) { + + auto OnRequest = [this](CWebSocketMessageHandler *AHandler, CWebSocketConnection *AWSConnection) { + const auto &wsMessage = RequestToMessage(AWSConnection); + if (wsMessage.MessageTypeId == mtCallResult) { + DoWebSocketEvent(AHandler->Message(), wsMessage); + } else if (wsMessage.MessageTypeId == mtCallError) { + DoError(wsMessage.ErrorCode, wsMessage.ErrorMessage); + } + }; + + CWSMessage Message; + + Message.MessageTypeId = WSProtocol::mtCall; + Message.UniqueId = GenUniqueId(); + Message.Action = Action; + Message.Payload = Payload; + + m_SendCount++; + SendMessage(Message, Handler == nullptr ? OnRequest : std::move(Handler)); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::PushMessageList() { + for (int i = 0; i < m_MessageList.Count(); ++i) { + SendMessage(m_MessageList[i], true); + } + m_MessageList.Clear(); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::Heartbeat(CDateTime Now) { + if (Active() && Connected() && !Connection()->ClosedGracefully() && Connection()->Protocol() == pWebSocket) { + if (m_PongDateTime == 0) + m_PongDateTime = Now; + + if (Now - m_PongDateTime >= (CDateTime) 90 / SecsPerDay) { + DoTimeOut(); + return; + } + + if (Now >= m_PingDateTime) { + m_PingDateTime = Now + (CDateTime) 60 / SecsPerDay; // 60 sec + Ping(); + } else if (!m_Authorized) { + if (Now >= m_RegistrationDateTime) { + m_RegistrationDateTime = Now + (CDateTime) 30 / SecsPerDay; // 30 sec + Authorize(); + } + } else { + if (Now >= m_HeartbeatDateTime) { + m_HeartbeatDateTime = Now + (CDateTime) m_HeartbeatInterval / SecsPerDay; + DoHeartbeat(); + } + } + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::DoPing(CObject *Sender) { + + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::DoPong(CObject *Sender) { + m_PongDateTime = Now(); + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::DoAuthorizeEvent(const CWSMessage &Request, const CWSMessage &Response) { + if (m_OnAuthorizeEvent != nullptr) { + m_OnAuthorizeEvent(this, Request, Response); + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::DoSubscribeEvent(const CWSMessage &Request, const CWSMessage &Response) { + if (m_OnSubscribeEvent != nullptr) { + m_OnSubscribeEvent(this, Request, Response); + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::DoKeyEvent(const CWSMessage &Request, const CWSMessage &Response) { + if (m_OnKeyEvent != nullptr) { + m_OnKeyEvent(this, Request, Response); + } + } + //-------------------------------------------------------------------------------------------------------------- + + void CWebSocketClient::DoWebSocketEvent(const CWSMessage &Request, const CWSMessage &Response) { + if (m_OnWebSocketEvent != nullptr) { + m_OnWebSocketEvent(this, Request, Response); + } + } + + //-------------------------------------------------------------------------------------------------------------- + + //-- CWebSocketClientItem -------------------------------------------------------------------------------------- + + //-------------------------------------------------------------------------------------------------------------- + + CWebSocketClientItem::CWebSocketClientItem(CWebSocketClientManager *AManager): CCollectionItem(AManager), CWebSocketClient() { + + } + //-------------------------------------------------------------------------------------------------------------- + + CWebSocketClientItem::CWebSocketClientItem(CWebSocketClientManager *AManager, CContext *Context, const CLocation &URI): + CCollectionItem(AManager), CWebSocketClient(Context, URI) { + + } + + //-------------------------------------------------------------------------------------------------------------- + + //-- CWebSocketClientManager ----------------------------------------------------------------------------------- + + //-------------------------------------------------------------------------------------------------------------- + + CWebSocketClientItem *CWebSocketClientManager::GetItem(int Index) const { + return (CWebSocketClientItem *) inherited::GetItem(Index); + } + //-------------------------------------------------------------------------------------------------------------- + + CWebSocketClientItem *CWebSocketClientManager::Add(CContext *Context, const CLocation &URI) { + return new CWebSocketClientItem(this, Context, URI); + } + + } +} + +} diff --git a/src/modules/Workers/WebSocket/WebSocketClient.hpp b/src/modules/Workers/WebSocket/WebSocketClient.hpp new file mode 100644 index 0000000..77ff7dc --- /dev/null +++ b/src/modules/Workers/WebSocket/WebSocketClient.hpp @@ -0,0 +1,166 @@ +/*++ + +Program name: + + dm + +Module Name: + + WebSocketClient.hpp + +Notices: + + WebSocket Client + +Author: + + Copyright (c) Prepodobny Alen + + mailto: alienufo@inbox.ru + mailto: ufocomp@gmail.com + +--*/ + +#ifndef APOSTOL_WEB_SOCKET_CLIENT_HPP +#define APOSTOL_WEB_SOCKET_CLIENT_HPP +//---------------------------------------------------------------------------------------------------------------------- + +extern "C++" { + +namespace Apostol { + + namespace Client { + + //-------------------------------------------------------------------------------------------------------------- + + //-- CWebSocketClient ------------------------------------------------------------------------------------------ + + //-------------------------------------------------------------------------------------------------------------- + + typedef std::function COnWebSocketClientEvent; + //-------------------------------------------------------------------------------------------------------------- + + class CWebSocketClient: public CCustomWebSocketClient { + private: + + CContext *m_pContext; + + int m_SendCount; + + bool m_Authorized; + + int m_HeartbeatInterval; + + CDateTime m_PingDateTime; + CDateTime m_PongDateTime; + CDateTime m_HeartbeatDateTime; + CDateTime m_RegistrationDateTime; + + TList m_MessageList; + + COnWebSocketClientEvent m_OnAuthorizeEvent; + COnWebSocketClientEvent m_OnSubscribeEvent; + COnWebSocketClientEvent m_OnKeyEvent; + COnWebSocketClientEvent m_OnWebSocketEvent; + + void CheckCallError(const CWSMessage &Error, const CWSMessage &Message); + + void PushMessageList(); + + protected: + + void Heartbeat(CDateTime Now) override; + + void DoPing(CObject *Sender) override; + void DoPong(CObject *Sender) override; + + void DoAuthorizeEvent(const CWSMessage &Request, const CWSMessage &Response); + void DoSubscribeEvent(const CWSMessage &Request, const CWSMessage &Response); + void DoKeyEvent(const CWSMessage &Request, const CWSMessage &Response); + void DoWebSocketEvent(const CWSMessage &Request, const CWSMessage &Response); + + public: + + CWebSocketClient(); + explicit CWebSocketClient(CContext *Context, const CLocation &URI); + + bool Authorized() const; + + void Authorize(); + void Subscribe(); + void UpdateKey(); + + void Send(const CString &Action, const CString &Payload, COnMessageHandlerEvent &&Handler = nullptr); + + void Reload(); + + CContext *Context() const { return m_pContext; } + + const COnWebSocketClientEvent &OnAuthorize() const { return m_OnAuthorizeEvent; } + void OnAuthorize(COnWebSocketClientEvent && Value) { m_OnAuthorizeEvent = Value; } + + const COnWebSocketClientEvent &OnSubscribe() const { return m_OnSubscribeEvent; } + void OnSubscribe(COnWebSocketClientEvent && Value) { m_OnSubscribeEvent = Value; } + + const COnWebSocketClientEvent &OnKey() const { return m_OnKeyEvent; } + void OnKey(COnWebSocketClientEvent && Value) { m_OnKeyEvent = Value; } + + const COnWebSocketClientEvent &OnWebSocketEvent() const { return m_OnWebSocketEvent; } + void OnWebSocketEvent(COnWebSocketClientEvent && Value) { m_OnWebSocketEvent = Value; } + + }; + + //-------------------------------------------------------------------------------------------------------------- + + //-- CWebSocketClientItem -------------------------------------------------------------------------------------- + + //-------------------------------------------------------------------------------------------------------------- + + class CWebSocketClientManager; + //-------------------------------------------------------------------------------------------------------------- + + class CWebSocketClientItem: public CCollectionItem, public CWebSocketClient { + public: + + explicit CWebSocketClientItem(CWebSocketClientManager *AManager); + + explicit CWebSocketClientItem(CWebSocketClientManager *AManager, CContext *Context, const CLocation &URI); + + }; + + //-------------------------------------------------------------------------------------------------------------- + + //-- CWebSocketClientManager ----------------------------------------------------------------------------------- + + //-------------------------------------------------------------------------------------------------------------- + + class CWebSocketClientManager: public CCollection { + typedef CCollection inherited; + + protected: + + CWebSocketClientItem *GetItem(int Index) const override; + + public: + + CWebSocketClientManager(): CCollection(this) { + + }; + + ~CWebSocketClientManager() override = default; + + CWebSocketClientItem *Add(CContext *Context, const CLocation &URI); + + CWebSocketClientItem *Items(int Index) const override { return GetItem(Index); }; + + CWebSocketClientItem *operator[] (int Index) const override { return Items(Index); }; + + }; + + } +} + +using namespace Apostol::Client; +} + +#endif //APOSTOL_WEB_SOCKET_CLIENT_HPP