Решил написать чат на вебсокетах на языке С++ . Безальтернативным вариантом оказалась библиотека Boost . Используя техноВозможно среди нас есть программисты С++ подскажут ,что-то толковое.В общем установил я библиотеку Boost 1.69 в Visual Stutio 2017 . Скопировал с официального сайта Boost листинг websocket_server_fast.cpp и модифицировал его. Код на сайте работал как эхо -сервер. То есть отсылал сообщение обратно. Мне нужно ,чтоб сервер отправлял сообщение всем участникам чата.
В общем для того чтоб хранить соединение я создал контейнер set который хранит умные указатели на соединения
typedef websocket::stream<tcp::socket> websocket_connect;
set< shared_ptr<websocket_connect>>wesocket_connects;
Все методы получения и отправки сообщений работают асинхронно ,соответсвенно необходимости в блокировке wesocket_connects ,мютексами нет. Хотя пробовал и блокировать это не решило проблем. В общем создаем shared_ptr<websocket_connect>ws(make_shared<websocket_connect>((move(socket)))); на который принимаем соединение , сразу же добавляем этот указатель в websocket_connect; а потом просто проходимся по нему циклом и отправляем всем полученное сообщение методом webconnect->async_write(buffer.data(), yield[ec]); Каждая операция ,как мне кажется перехватывает ошибку в случае ошибки удаляем соединение из контейнера auto result = wesocket_connects.erase(ws);
и прерываем прослушивание сокета return; Браузер перехватывает собитие Close или Error и моментально создает новое соединение по вебсокету.
В общем я все откомпилировал в Release загрузил на VPS сначала все работает ,но через часов 5 -7 программа-сервер перестает отвечать на запросы ,когда браузер пытается переключить соединение выдает ошибку -1006.
При запуске в Debug выдало такую ошибку .
// If this assert goes off it means you are attempting to
// simultaneously initiate more than one of same asynchronous
// operation, which is not allowed. For example, you must wait
// for an async_read to complete before performing another
// async_read.
//
BOOST_ASSERT(id_ != T::id);
Что -то связано с асинхронными операциями ,но как исправить не понимаю
Ниже привожу полный листинг кода. Буду рад любой консультации.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#include "pch.h"
#include <iostream>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <algorithm>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include<set>
#include <boost/config.hpp>
#include <boost/coroutine/standard_stack_allocator.hpp>
#include <boost/coroutine/detail/symmetric_coroutine_call.hpp>
#include <boost/coroutine/detail/symmetric_coroutine_yield.hpp>
#include <boost/coroutine2/all.hpp>
#include<mutex>
#define BOOST_COROUTINES_SYMMETRIC_COROUTINE_H
using namespace boost::coroutines2;
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
//------------------------------------------------------------------------------
using namespace std;
typedef websocket::stream<tcp::socket> websocket_connect;
set< shared_ptr<websocket_connect>>wesocket_connects;
mutex mtx;
// Report a failure
void
fail(beast::error_code ec, char const* what)
{
std::cerr << (std::string(what) + ": " + ec.message() + "\n");
}
// Adjust settings on the stream
void
setup_stream(shared_ptr<websocket_connect> ws)
{
// These values are tuned for Autobahn|Testsuite, and
// should also be generally helpful for increased performance.
websocket::permessage_deflate pmd;
pmd.client_enable = true;
pmd.server_enable = true;
pmd.compLevel = 3;
ws->set_option(pmd);
ws->auto_fragment(false);
// Autobahn|Testsuite needs this
ws->read_message_max(64 * 1024 * 1024);
}
//------------------------------------------------------------------------------
void
do_coro_session(tcp::socket& socket, net::yield_context yield)
{
beast::error_code ec;
shared_ptr<websocket_connect>ws(make_shared<websocket_connect>((move(socket))));
setup_stream(ws);
ws->async_accept(yield[ec]);
if (ec)
{
return fail(ec, "accept");
}
/*mtx.lock();*/
wesocket_connects.insert(ws);
///*mtx.unlock();*/
for (;;)
{
beast::multi_buffer buffer;
unsigned long long result_read = ws->async_read(buffer, yield[ec]);
if (ec == websocket::error::closed)
{
cout << "websocket is closed - " << std::this_thread::get_id() << endl;
/*mtx.lock();*/
auto result = wesocket_connects.erase(ws);
cout << "erase async_read - " << result << endl;
///*mtx.unlock();*/
return;
}
if (ec)
{
///*mtx.lock();*/
auto result = wesocket_connects.erase(ws);
cout << "erase async_read alternative - " << result << endl;
///*mtx.unlock();*/
return fail(ec, "read");
}
ws->text(ws->got_text());
std::cout << boost::beast::buffers_to_string(buffer.data()) << " buffer_size - " << boost::asio::buffer_size(buffer.data()) << " thread id - " << std::this_thread::get_id() << endl;
cout<<"result_read - " << result_read << endl;
for (auto webconnect : wesocket_connects)
{
if (webconnect->is_open())
{
webconnect->async_write(buffer.data(), yield[ec]);
if (ec)
{
///*mtx.lock();*/
auto result = wesocket_connects.erase(webconnect);
cout << "player->async_write - " << result << endl;
///*mtx.unlock();*/
std::cerr << ": " << ec.message() << "\n";
}
}
else
{
/*mtx.lock();*/
auto result = wesocket_connects.erase(webconnect);
cout << "player->closed - " << result << endl;
/*mtx.unlock();*/
}
}
}
}
void
do_coro_listen(
net::io_context& ioc,
tcp::endpoint endpoint,
net::yield_context yield)
{
beast::error_code ec;
tcp::acceptor acceptor(ioc);
acceptor.open(endpoint.protocol(), ec);
if (ec)
return fail(ec, "open");
acceptor.set_option(net::socket_base::reuse_address(true), ec);
if (ec)
return fail(ec, "set_option");
acceptor.bind(endpoint, ec);
if (ec)
return fail(ec, "bind");
acceptor.listen(net::socket_base::max_listen_connections, ec);
if (ec)
return fail(ec, "listen");
for (;;)
{
tcp::socket socket(ioc);
acceptor.async_accept(socket, yield[ec]);
if (ec)
{
fail(ec, "accept");
/*continue;*/
return;
}
tcp::endpoint endpoint = socket.remote_endpoint();
boost::asio::ip::address ip_address = endpoint.address();
cout << "ip_address - " << ip_address << " - " << std::this_thread::get_id() << endl;
net::spawn(
acceptor.get_executor().context(),
std::bind(
&do_coro_session,
std::move(socket),
std::placeholders::_1));
}
}
//------------------------------------------------------------------------------
int main(int argc, char* argv[])
{
setlocale(LC_ALL, "Russian");
auto const address = net::ip::make_address("0.0.0.0");
auto const port = 1234;
auto const threads = std::max<int>(1, 8);
// The io_context is required for all I/O
net::io_context ioc{ threads };
// Create coro port
net::spawn(ioc,
std::bind(
&do_coro_listen,
std::ref(ioc),
tcp::endpoint{
address,
static_cast<unsigned short>(port) },
std::placeholders::_1));
// Run the I/O service on the requested number of threads
std::vector<std::thread> v;
v.reserve(threads - 1);
for (auto i = threads - 1; i > 0; --i)
v.emplace_back(
[&ioc]
{
ioc.run();
});
ioc.run();
return EXIT_SUCCESS;
}
Нет комментариев