協定定義
通信協定使用在所有通信中(包括叢集和api)被定在
wazuh.core.cluster.common.Handler(class Handler)
class Handler(asyncio.Protocol):
"""
Define common methods for echo clients and servers.
"""
def __init__(self, fernet_key: str, cluster_items: Dict, logger: logging.Logger = None, tag: str = "Handler"):
"""Class constructor.
Parameters
----------
fernet_key : str
32 length string used as key to initialize cryptography's Fernet.
cluster_items : dict
Cluster.json object containing cluster internal variables.
logger : Logger object
Logger object to use.
tag : str
Log tag.
"""
super().__init__()
# The counter is used to identify each message. If an incoming request has a known ID,
# it is processed as a response.
self.counter = random.SystemRandom().randint(0, 2 ** 32 - 1)
# The box stores all sent messages IDs.
self.box = {}
# The div_msg_box stores all divided messages under its IDs.
self.div_msg_box = {}
# Defines command length.
self.cmd_len = 12
# Defines header length.
self.header_len = self.cmd_len + 8 # 4 bytes of counter and 4 bytes of message size
# Defines header format.
self.header_format = f'!2I{self.cmd_len}s'
# Stores received data.
self.in_buffer = b''
# Stores last received message.
self.in_msg = InBuffer()
# Stores incoming file information from file commands.
self.in_file = {}
# Stores incoming string information from string commands.
self.in_str = {}
# Maximum message length to send in a single request.
self.request_chunk = 5242880
# Object use to encrypt and decrypt requests.
self.my_fernet = cryptography.fernet.Fernet(base64.b64encode(fernet_key.encode())) if fernet_key else None
# Logging.Logger object used to write logs.
self.logger = logging.getLogger('wazuh') if not logger else logger
# Logging tag.
self.tag = tag
# Modify filter tags with context vars.
wazuh.core.cluster.utils.context_tag.set(self.tag)
self.cluster_items = cluster_items
# Transports in asyncio are an abstraction of sockets.
self.transport = None
......
協定中消息結構如下:

counter 是用于辨別消息的消息ID,對于這個消息的響應消息需要帶回這個消息ID
payload length 消息載荷長度,用于知道要接收的載荷長度
command 長度是11個字元,描述這個指令類型,不足11個字元的通過-補齊11個位元組
divide_flag 分割辨別,因為最大載荷長度是5242880個位元組,如果載荷被分割了則divide_flag辨別為b,如果這是被分割的消息的最後一部分或者是一個單獨的消息則為-
Wazuh 叢集協定
所有叢集節點使用該通信協定同步必要的資訊,以接收來自agents的報告。所有通信協定通過tcp。這些指令定義在
wazuh.core.cluster.master.MasterHandler.process_request
和
wazuh.core.cluster.worker.WorkerHandler.process_request。
wazuh.core.cluster.master.MasterHandler.process_request
def process_request(self, command: bytes, data: bytes) -> Tuple[bytes, bytes]:
"""Define all available commands that can be received from a worker node.
Parameters
----------
command : bytes
Received command.
data : bytes
Received payload.
Returns
-------
bytes
Result.
bytes
Response message.
"""
self.logger.debug(f"Command received: {command}")
if command == b'syn_i_w_m_p' or command == b'syn_e_w_m_p' or command == b'syn_a_w_m_p':
return self.get_permission(command)
elif command == b'syn_i_w_m' or command == b'syn_e_w_m' or command == b'syn_a_w_m':
return self.setup_sync_integrity(command, data)
elif command == b'syn_i_w_m_e' or command == b'syn_e_w_m_e':
return self.end_receiving_integrity_checksums(data.decode())
elif command == b'syn_i_w_m_r' or command == b'syn_e_w_m_r':
return self.process_sync_error_from_worker(command, data)
elif command == b'dapi':
self.server.dapi.add_request(self.name.encode() + b'*' + data)
return b'ok', b'Added request to API requests queue'
elif command == b'dapi_res':
return self.process_dapi_res(data)
elif command == b'dapi_err':
dapi_client, error_msg = data.split(b' ', 1)
asyncio.create_task(self.server.local_server.clients[dapi_client.decode()].send_request(command, error_msg))
return b'ok', b'DAPI error forwarded to worker'
elif command == b'get_nodes':
cmd, res = self.get_nodes(json.loads(data))
return cmd, json.dumps(res).encode()
elif command == b'get_health':
cmd, res = self.get_health(json.loads(data))
return cmd, json.dumps(res,
default=lambda o: "n/a" if isinstance(o, datetime) and o == datetime.fromtimestamp(0)
else (o.__str__() if isinstance(o, datetime) else None)
).encode()
elif command == b'sendsync':
self.server.sendsync.add_request(self.name.encode() + b'*' + data)
return b'ok', b'Added request to SendSync requests queue'
else:
return super().process_request(command, data)
wazuh.core.cluster.worker.WorkerHandler.process_request
def process_request(self, command: bytes, data: bytes) -> Union[bytes, Tuple[bytes, bytes]]:
"""Define all commands that a worker can receive from the master.
Parameters
----------
command : bytes
Received command.
data : bytes
Received payload.
Returns
-------
bytes
Result.
bytes
Response message.
"""
self.logger.debug(f"Command received: '{command}'")
if command == b'syn_m_c_ok':
return self.sync_integrity_ok_from_master()
elif command == b'syn_m_c':
return self.setup_receive_files_from_master()
elif command == b'syn_m_c_e':
return self.end_receiving_integrity(data.decode())
elif command == b'syn_m_c_r':
return self.error_receiving_integrity(data.decode())
elif command == b'syn_m_a_e':
return self.sync_agent_info_from_master(data.decode())
elif command == b'syn_m_a_err':
return self.error_receiving_agent_info(data.decode())
elif command == b'dapi_res':
asyncio.create_task(self.forward_dapi_response(data))
return b'ok', b'Response forwarded to worker'
elif command == b'sendsyn_res':
asyncio.create_task(self.forward_sendsync_response(data))
return b'ok', b'Response forwarded to worker'
elif command == b'dapi_err':
dapi_client, error_msg = data.split(b' ', 1)
try:
asyncio.create_task(
self.manager.local_server.clients[dapi_client.decode()].send_request(command, error_msg))
except WazuhClusterError as e:
raise WazuhClusterError(3025)
return b'ok', b'DAPI error forwarded to worker'
elif command == b'sendsyn_err':
sendsync_client, error_msg = data.split(b' ', 1)
try:
asyncio.create_task(
self.manager.local_server.clients[sendsync_client.decode()].send_request(b'err', error_msg))
except WazuhClusterError as e:
raise WazuhClusterError(3025)
return b'ok', b'SendSync error forwarded to worker'
elif command == b'dapi':
self.manager.dapi.add_request(b'master*' + data)
return b'ok', b'Added request to API requests queue'
else:
return super().process_request(command, data)
指令說明
Message | Received in | Arguments | Description |
| Master |
| First message sent by a worker to the master on its first connection. |
, | Master | None |
|
, , | Master |
|
|
, | Master | None |
|
, | Master | None |
|
| Master |
| Receive a message from a worker node destined for the specified daemon of the master node. |
| Worker |
| Notify the response is available. |
| Both |
| Notify errors in the communication. |
| Master |
| Request sent from from worker nodes. |
| Master |
| Request sent from from worker nodes. |
| Master |
| Receive an API call related to cluster information: Get nodes information or healthcheck. |
| Both |
| Receive a distributed API request. If the API call has been forwarded multiple times, the sender node contains multiple names separated by a character. |
| Both |
| Receive a distributed API response from a previously forwarded request. Responses are sent using send long strings protocol so this request only needs the string ID. |
| Both |
| Receive an error related to a previously requested distributed API request. |
| Worker | None | Master verifies that worker integrity is correct. |
| Worker | None | Master will send the worker integrity files to update. |
| Worker |
| Master has finished sending integrity files. The files were received in task Task name previously created by the worker in . If master had issues sending/processing/receiving worker integrity an error message will be sent instead of the task name and filename. |
| Worker |
| Master has finished updating agent-info. Number of updated chunks and chunks with errors (if any) will be sent. |
|