應用層實作TCP/IP讀入流速控制,與在L2/L3層通過對于某個IP或MAC位址進行丢包來控制速度不同,應用層是通過當讀入的位元組達到 Peek(峰值)就挂起TCP/IP讀入流的動作,等到下一秒在恢複執行TCP/IP讀入流的動作。
QoS.h
#pragma once
#include <My/Environment.h>
#include <My/Net/IPEndPoint.h>
#include <My/Threading/Timer.h>
#include <My/asio/ip/udp/socket.h>
namespace My {
namespace Vpn {
class QoS final : public std::enable_shared_from_this<QoS> {
public:
typedef std::recursive_mutex Mutex;
typedef std::lock_guard<Mutex> MutexScope;
typedef std::function<void(int sz)> ReceiveCallback;
typedef std::function<void(int sz, boost::asio::ip::udp::endpoint&)> ReceiveFromCallback;
struct ReceiveTransaction {
public:
std::shared_ptr<boost::asio::ip::tcp::socket> socket;
unsigned char* buffer;
int offset;
int length;
bool demand;
std::shared_ptr<ReceiveCallback> callback;
public:
typedef std::shared_ptr<ReceiveTransaction> Ptr;
};
struct ReceiveFromTransaction {
public:
std::shared_ptr<My::asio::ip::udp::socket> socket;
unsigned char* buffer;
int offset;
int length;
std::shared_ptr<ReceiveFromCallback> callback;
boost::asio::ip::udp::endpoint* sourceEP;
public:
typedef std::shared_ptr<ReceiveFromTransaction> Ptr;
};
typedef std::unordered_map<boost::asio::ip::tcp::socket*, ReceiveTransaction::Ptr > ReceiveTransactionTable;
typedef std::unordered_map<My::asio::ip::udp::socket*, ReceiveFromTransaction::Ptr> ReceiveFromTransactionTable;
public:
QoS(const std::shared_ptr<Hosting>& hosting, int bandwidth) noexcept;
~QoS() noexcept;
public:
inline int GetBandwidth() noexcept { return this->_bandwidth; }
inline void SetBandwidth(int bandwidth) noexcept {
this->_bandwidth = bandwidth < 1 ? 0 : bandwidth; // ReLU
}
inline std::shared_ptr<Hosting> GetHosting() noexcept { return this->_hosting; }
inline bool IsDisposed() noexcept { return this->_disposed; }
inline std::shared_ptr<QoS> GetPtr() noexcept { return this->shared_from_this(); }
public:
bool Initialize() noexcept;
void Dispose() noexcept;
bool Receive(
const std::shared_ptr<boost::asio::ip::tcp::socket>& socket,
const void* buffer,
int offset,
int length,
bool demand,
const std::shared_ptr<ReceiveCallback>& callback) noexcept;
bool ReceiveFrom(
const std::shared_ptr<My::asio::ip::udp::socket>& socket,
const void* buffer,
int offset,
int length,
boost::asio::ip::udp::endpoint& sourceEP,
const std::shared_ptr<ReceiveFromCallback>& callback) noexcept;
private:
void Update() noexcept;
bool IsPeek() noexcept;
bool ReceiveImpl(
const std::shared_ptr<boost::asio::ip::tcp::socket>& socket,
const void* buffer,
int offset,
int length,
const std::shared_ptr<ReceiveTransaction>& receive) noexcept;
bool ReceiveFromImpl(
const std::shared_ptr<My::asio::ip::udp::socket>& socket,
const void* buffer,
int offset,
int length,
boost::asio::ip::udp::endpoint& sourceEP,
const std::shared_ptr<ReceiveFromTransaction>& receive) noexcept;
private:
Mutex _syncobj;
int _bandwidth;
UInt64 _per_seconds;
std::shared_ptr<Hosting> _hosting;
std::atomic<bool> _disposed;
std::atomic<int> _current_traffic;
ReceiveTransactionTable _recv_tcps;
ReceiveFromTransactionTable _recv_iips;
std::shared_ptr<My::Threading::Timer> _work_timer;
};
}
}
QoS.cpp
#include <My/Environment.h>
#include <My/Vpn/QoS.h>
#include <My/Net/Socket.h>
using My::Threading::Timer;
namespace My {
namespace Vpn {
QoS::QoS(const std::shared_ptr<Hosting>& hosting, int bandwidth) noexcept
: _bandwidth(0)
, _per_seconds(0)
, _hosting(hosting)
, _disposed(false)
, _current_traffic(0) {
if (bandwidth < 1) {
bandwidth = 0;
}
this->_bandwidth = bandwidth;
}
QoS::~QoS() noexcept {
this->Dispose();
}
void QoS::Dispose() noexcept {
std::vector<std::shared_ptr<ReceiveFromTransaction> > recv_iips;
std::vector<std::shared_ptr<ReceiveTransaction> > recv_tcps;
if (!this->_disposed.exchange(true)) {
MutexScope scope(this->_syncobj);
ReceiveTransactionTable::iterator recv_tcps_tail = this->_recv_tcps.begin();
ReceiveTransactionTable::iterator recv_tcps_endl = this->_recv_tcps.end();
for (; recv_tcps_tail != recv_tcps_endl; recv_tcps_tail++) {
recv_tcps.push_back(std::move(recv_tcps_tail->second));
}
ReceiveFromTransactionTable::iterator recv_iips_tail = this->_recv_iips.begin();
ReceiveFromTransactionTable::iterator recv_iips_endl = this->_recv_iips.end();
for (; recv_iips_tail != recv_iips_endl; recv_iips_tail++) {
recv_iips.push_back(std::move(recv_iips_tail->second));
}
this->_recv_tcps.clear();
this->_recv_iips.clear();
}
for (size_t i = 0, count = recv_tcps.size(); i < count; i++) {
const std::shared_ptr<ReceiveTransaction>& r = recv_tcps[i];
if (NULL != r) {
(*r->callback)(-1);
}
}
for (size_t i = 0, count = recv_iips.size(); i < count; i++) {
const std::shared_ptr<ReceiveFromTransaction>& r = recv_iips[i];
if (NULL != r) {
(*r->callback)(-1, *r->sourceEP);
}
}
}
bool QoS::Initialize() noexcept {
MutexScope scope(this->_syncobj);
if (this->IsDisposed()) {
return false;
}
std::shared_ptr<Timer> t = this->_work_timer;
if (NULL != t) {
return false;
}
int bandwidth = this->_bandwidth;
if (bandwidth < 1) {
return true;
}
else {
this->_per_seconds = Hosting::GetTickCount();
}
std::shared_ptr<QoS> self = this->GetPtr();
t = make_shared_object<My::Threading::Timer>(this->_hosting);
t->TickEvent = make_shared_object<Timer::TickEventHandler>(
[self, this] (Timer* sender, Timer::TickEventArgs& e) noexcept {
this->Update();
});
t->SetInterval(10);
t->Start();
this->_work_timer = std::move(t);
return true;
}
bool QoS::IsPeek() noexcept {
int bandwidth = this->_bandwidth;
if (bandwidth < 1) {
return false;
}
int traffic = this->_current_traffic;
return traffic >= bandwidth;
}
void QoS::Update() noexcept {
MutexScope scope(this->_syncobj);
do {
UInt64 now = Hosting::GetTickCount();
if ((this->_per_seconds / 1000) == (now / 1000)) {
return;
}
this->_per_seconds = now;
} while (0);
this->_current_traffic = 0;
for (; ;) {
ReceiveTransactionTable::iterator tail = this->_recv_tcps.begin();
ReceiveTransactionTable::iterator endl = this->_recv_tcps.end();
if (tail == endl) {
break;
}
std::shared_ptr<ReceiveTransaction> r = tail->second;
this->_recv_tcps.erase(tail);
if (!this->ReceiveImpl(r->socket, r->buffer, r->offset, r->length, r)) {
(*r->callback)(0);
}
}
this->_recv_tcps.clear();
for (; ;) {
ReceiveFromTransactionTable::iterator tail = this->_recv_iips.begin();
ReceiveFromTransactionTable::iterator endl = this->_recv_iips.end();
if (tail == endl) {
break;
}
std::shared_ptr<ReceiveFromTransaction> r = tail->second;
this->_recv_iips.erase(tail);
if (!this->ReceiveFromImpl(r->socket, r->buffer, r->offset, r->length, *r->sourceEP, r)) {
(*r->callback)(0, *r->sourceEP);
}
}
this->_recv_iips.clear();
}
bool QoS::ReceiveImpl(
const std::shared_ptr<boost::asio::ip::tcp::socket>& socket,
const void* buffer,
int offset,
int length,
const std::shared_ptr<ReceiveTransaction>& receive) noexcept {
if (!socket->is_open()) {
return false;
}
std::shared_ptr<QoS> self = this->GetPtr();
std::shared_ptr<ReceiveTransaction> receive_ = receive;
if (receive_->demand) {
boost::asio::async_read(*socket.get(), boost::asio::buffer((char*)buffer + offset, length),
[self, this, receive_](const boost::system::error_code& ec, UInt32 sz) noexcept {
int by = std::max<int>(-1, ec ? -1 : sz);
if (by > 0) {
this->_current_traffic += by;
}
(*receive_->callback)(by);
});
}
else {
socket->async_read_some(boost::asio::buffer((char*)buffer + offset, length),
[self, this, receive_](const boost::system::error_code& ec, UInt32 sz) noexcept {
int by = std::max<int>(-1, ec ? -1 : sz);
if (by > 0) {
this->_current_traffic += by;
}
(*receive_->callback)(by);
});
}
return true;
}
bool QoS::Receive(
const std::shared_ptr<boost::asio::ip::tcp::socket>& socket,
const void* buffer,
int offset,
int length,
bool demand,
const std::shared_ptr<ReceiveCallback>& callback) noexcept {
if (this->IsDisposed()) {
return false;
}
if (socket == NULL || callback == NULL || !socket->is_open()) {
return false;
}
if (this->_bandwidth < 1) {
std::shared_ptr<ReceiveTransaction> r = make_shared_object<ReceiveTransaction>();
r->socket = socket;
r->buffer = (unsigned char*)buffer;
r->offset = offset;
r->length = length;
r->callback = callback;
r->demand = demand;
return this->ReceiveImpl(socket, buffer, offset, length, r);
}
if (buffer == NULL && (offset != 0 || length != 0)) {
return false;
}
if (offset < 0 || length < 0) {
return false;
}
std::shared_ptr<ReceiveTransaction> r;
do {
MutexScope scope(this->_syncobj);
ReceiveTransactionTable::iterator tail = this->_recv_tcps.find(socket.get());
ReceiveTransactionTable::iterator endl = this->_recv_tcps.end();
if (tail != endl) {
return false;
}
r = make_shared_object<ReceiveTransaction>();
r->socket = socket;
r->buffer = (unsigned char*)buffer;
r->offset = offset;
r->length = length;
r->callback = callback;
r->demand = demand;
if (this->IsPeek()) {
this->_recv_tcps[socket.get()] = r;
return true;
}
} while (0);
return this->ReceiveImpl(socket, buffer, offset, length, r);
}
bool QoS::ReceiveFromImpl(
const std::shared_ptr<My::asio::ip::udp::socket>& socket,
const void* buffer,
int offset,
int length,
boost::asio::ip::udp::endpoint& sourceEP,
const std::shared_ptr<ReceiveFromTransaction>& receive) noexcept {
if (!socket->is_open()) {
return false;
}
std::shared_ptr<QoS> self = this->GetPtr();
std::shared_ptr<ReceiveFromTransaction> receive_ = receive;
socket->async_receive_from(boost::asio::buffer((char*)buffer + offset, length), sourceEP,
[self, this, receive_](const boost::system::error_code& ec, UInt32 sz) noexcept {
int by = std::max<int>(-1, ec ? -1 : sz);
if (by > 0) {
this->_current_traffic += by;
}
(*receive_->callback)(by, *receive_->sourceEP);
});
return true;
}
bool QoS::ReceiveFrom(
const std::shared_ptr<My::asio::ip::udp::socket>& socket,
const void* buffer,
int offset,
int length,
boost::asio::ip::udp::endpoint& sourceEP,
const std::shared_ptr<ReceiveFromCallback>& callback) noexcept {
if (this->IsDisposed()) {
return false;
}
if (socket == NULL || callback == NULL || !socket->is_open()) {
return false;
}
if (this->_bandwidth < 1) {
std::shared_ptr<ReceiveFromTransaction> r = make_shared_object<ReceiveFromTransaction>();
r->socket = socket;
r->buffer = (unsigned char*)buffer;
r->offset = offset;
r->length = length;
r->callback = callback;
r->sourceEP = std::addressof(sourceEP);
return this->ReceiveFromImpl(socket, buffer, offset, length, sourceEP, r);
}
if (buffer == NULL && (offset != 0 || length != 0)) {
return false;
}
if (offset < 0 || length < 0) {
return false;
}
std::shared_ptr<ReceiveFromTransaction> r;
do {
MutexScope scope(this->_syncobj);
ReceiveFromTransactionTable::iterator tail = this->_recv_iips.find(socket.get());
ReceiveFromTransactionTable::iterator endl = this->_recv_iips.end();
if (tail != endl) {
return false;
}
r = make_shared_object<ReceiveFromTransaction>();
r->socket = socket;
r->buffer = (unsigned char*)buffer;
r->offset = offset;
r->length = length;
r->callback = callback;
r->sourceEP = std::addressof(sourceEP);
if (this->IsPeek()) {
this->_recv_iips[socket.get()] = r;
return true;
}
} while (0);
return this->ReceiveFromImpl(socket, buffer, offset, length, sourceEP, r);
}
}
}