天天看點

wazuh叢集通信

協定定義

通信協定使用在所有通信中(包括叢集和api)被定在​

​wazuh.core.cluster.common.Handler(class Handler)​

class Handler(asyncio.Protocol):
    """
    Define common methods for echo clients and servers.
    """

    def __init__(self, fernet_key: str, cluster_items: Dict, logger: logging.Logger = None, tag: str = "Handler"):
        """Class constructor.

        Parameters
        ----------
        fernet_key : str
            32 length string used as key to initialize cryptography's Fernet.
        cluster_items : dict
            Cluster.json object containing cluster internal variables.
        logger : Logger object
            Logger object to use.
        tag : str
            Log tag.
        """
        super().__init__()
        # The counter is used to identify each message. If an incoming request has a known ID,
        # it is processed as a response.
        self.counter = random.SystemRandom().randint(0, 2 ** 32 - 1)
        # The box stores all sent messages IDs.
        self.box = {}
        # The div_msg_box stores all divided messages under its IDs.
        self.div_msg_box = {}
        # Defines command length.
        self.cmd_len = 12
        # Defines header length.
        self.header_len = self.cmd_len + 8  # 4 bytes of counter and 4 bytes of message size
        # Defines header format.
        self.header_format = f'!2I{self.cmd_len}s'
        # Stores received data.
        self.in_buffer = b''
        # Stores last received message.
        self.in_msg = InBuffer()
        # Stores incoming file information from file commands.
        self.in_file = {}
        # Stores incoming string information from string commands.
        self.in_str = {}
        # Maximum message length to send in a single request.
        self.request_chunk = 5242880
        # Object use to encrypt and decrypt requests.
        self.my_fernet = cryptography.fernet.Fernet(base64.b64encode(fernet_key.encode())) if fernet_key else None
        # Logging.Logger object used to write logs.
        self.logger = logging.getLogger('wazuh') if not logger else logger
        # Logging tag.
        self.tag = tag
        # Modify filter tags with context vars.
        wazuh.core.cluster.utils.context_tag.set(self.tag)
        self.cluster_items = cluster_items
        # Transports in asyncio are an abstraction of sockets.
        self.transport = None


        ......      

協定中消息結構如下: 

wazuh叢集通信

counter 是用于辨別消息的消息ID,對于這個消息的響應消息需要帶回這個消息ID

payload length 消息載荷長度,用于知道要接收的載荷長度

command 長度是11個字元,描述這個指令類型,不足11個字元的通過-補齊11個位元組

divide_flag 分割辨別,因為最大載荷長度是5242880個位元組,如果載荷被分割了則divide_flag辨別為b,如果這是被分割的消息的最後一部分或者是一個單獨的消息則為-

Wazuh 叢集協定

所有叢集節點使用該通信協定同步必要的資訊,以接收來自agents的報告。所有通信協定通過tcp。這些指令定義在 ​

​wazuh.core.cluster.master.MasterHandler.process_request​

​​ 和 ​

​wazuh.core.cluster.worker.WorkerHandler.process_request。​

 ​

​wazuh.core.cluster.master.MasterHandler.process_request​

def process_request(self, command: bytes, data: bytes) -> Tuple[bytes, bytes]:
        """Define all available commands that can be received from a worker node.

        Parameters
        ----------
        command : bytes
            Received command.
        data : bytes
            Received payload.

        Returns
        -------
        bytes
            Result.
        bytes
            Response message.
        """
        self.logger.debug(f"Command received: {command}")
        if command == b'syn_i_w_m_p' or command == b'syn_e_w_m_p' or command == b'syn_a_w_m_p':
            return self.get_permission(command)
        elif command == b'syn_i_w_m' or command == b'syn_e_w_m' or command == b'syn_a_w_m':
            return self.setup_sync_integrity(command, data)
        elif command == b'syn_i_w_m_e' or command == b'syn_e_w_m_e':
            return self.end_receiving_integrity_checksums(data.decode())
        elif command == b'syn_i_w_m_r' or command == b'syn_e_w_m_r':
            return self.process_sync_error_from_worker(command, data)
        elif command == b'dapi':
            self.server.dapi.add_request(self.name.encode() + b'*' + data)
            return b'ok', b'Added request to API requests queue'
        elif command == b'dapi_res':
            return self.process_dapi_res(data)
        elif command == b'dapi_err':
            dapi_client, error_msg = data.split(b' ', 1)
            asyncio.create_task(self.server.local_server.clients[dapi_client.decode()].send_request(command, error_msg))
            return b'ok', b'DAPI error forwarded to worker'
        elif command == b'get_nodes':
            cmd, res = self.get_nodes(json.loads(data))
            return cmd, json.dumps(res).encode()
        elif command == b'get_health':
            cmd, res = self.get_health(json.loads(data))
            return cmd, json.dumps(res,
                                   default=lambda o: "n/a" if isinstance(o, datetime) and o == datetime.fromtimestamp(0)
                                   else (o.__str__() if isinstance(o, datetime) else None)
                                   ).encode()
        elif command == b'sendsync':
            self.server.sendsync.add_request(self.name.encode() + b'*' + data)
            return b'ok', b'Added request to SendSync requests queue'
        else:
            return super().process_request(command, data)      

​wazuh.core.cluster.worker.WorkerHandler.process_request​

def process_request(self, command: bytes, data: bytes) -> Union[bytes, Tuple[bytes, bytes]]:
        """Define all commands that a worker can receive from the master.

        Parameters
        ----------
        command : bytes
            Received command.
        data : bytes
            Received payload.

        Returns
        -------
        bytes
            Result.
        bytes
            Response message.
        """
        self.logger.debug(f"Command received: '{command}'")
        if command == b'syn_m_c_ok':
            return self.sync_integrity_ok_from_master()
        elif command == b'syn_m_c':
            return self.setup_receive_files_from_master()
        elif command == b'syn_m_c_e':
            return self.end_receiving_integrity(data.decode())
        elif command == b'syn_m_c_r':
            return self.error_receiving_integrity(data.decode())
        elif command == b'syn_m_a_e':
            return self.sync_agent_info_from_master(data.decode())
        elif command == b'syn_m_a_err':
            return self.error_receiving_agent_info(data.decode())
        elif command == b'dapi_res':
            asyncio.create_task(self.forward_dapi_response(data))
            return b'ok', b'Response forwarded to worker'
        elif command == b'sendsyn_res':
            asyncio.create_task(self.forward_sendsync_response(data))
            return b'ok', b'Response forwarded to worker'
        elif command == b'dapi_err':
            dapi_client, error_msg = data.split(b' ', 1)
            try:
                asyncio.create_task(
                    self.manager.local_server.clients[dapi_client.decode()].send_request(command, error_msg))
            except WazuhClusterError as e:
                raise WazuhClusterError(3025)
            return b'ok', b'DAPI error forwarded to worker'
        elif command == b'sendsyn_err':
            sendsync_client, error_msg = data.split(b' ', 1)
            try:
                asyncio.create_task(
                    self.manager.local_server.clients[sendsync_client.decode()].send_request(b'err', error_msg))
            except WazuhClusterError as e:
                raise WazuhClusterError(3025)
            return b'ok', b'SendSync error forwarded to worker'
        elif command == b'dapi':
            self.manager.dapi.add_request(b'master*' + data)
            return b'ok', b'Added request to API requests queue'
        else:
            return super().process_request(command, data)      

指令說明

Message Received in Arguments Description

​hello​

Master
  • Node name<str>,
  • Cluster name<str>,
  • Node type<str>,
  • Wazuh version<str>
First message sent by a worker to the master on its first connection.

​syn_i_w_m_p​

​,

​syn_a_w_m_p​

Master None
  • Ask permission to start synchronization protocol. Message characters define the action to do:
  • I (integrity), A (agent-info).
  • W (worker), M (master), P (permission).

​syn_i_w_m​

​​, ​

​syn_e_w_m​

​​, ​

​syn_a_w_m​

Master
  • None or String ID<str>
  • Start synchronization protocol. Message characters define the action to do:
  • I (integrity), E (extra valid), A (agent-info).
  • W (worker), M (master).

​syn_i_w_m_e​

​​, ​

​syn_e_w_m_e​

Master None
  • End synchronization protocol. Message characters define the action to do:
  • I (integrity), E (extra valid).
  • W (worker), M (master), E(end).

​syn_i_w_m_r​

​,
Master None
  • Notify an error during synchronization. Message characters define the action to do:
  • I (integrity).
  • W (worker), M (master), R(error).

​sendsync​

Master
  • Arguments<Dict>
Receive a message from a worker node destined for the specified daemon of the master node.

​sendsyn_res​

Worker
  • Request ID<str>
  • String ID<str>
Notify the ​

​sendsync​

​ response is available.

​sendsyn_err​

Both
  • Local client ID<str>
  • Error message<str>
Notify errors in the ​

​sendsync​

​ communication.

​get_nodes​

Master
  • Arguments<Dict>
Request sent from ​

​cluster_control -l​

​ from worker nodes.

​get_health​

Master
  • Arguments<Dict>
Request sent from ​

​cluster_control -i​

​ from worker nodes.

​dapi_clus​

Master
  • Arguments<Dict>
Receive an API call related to cluster information: Get nodes information or healthcheck.

​dapi​

Both
  • Sender node<str>
  • Arguments<Dict>
Receive a distributed API request. If the API call has been forwarded multiple times, the sender node contains multiple names separated by a ​

​*​

​ character.

​dapi_res​

Both
  • Request ID<str>
  • String ID<str>
Receive a distributed API response from a previously forwarded request. Responses are sent using send long strings protocol so this request only needs the string ID.

​dapi_err​

Both
  • Local client ID<str>
  • Error message<str>
Receive an error related to a previously requested distributed API request.

​syn_m_c_ok​

Worker None Master verifies that worker integrity is correct.

​syn_m_c​

Worker None Master will send the worker integrity files to update.

​syn_m_c_e​

Worker
  • Error msg<str> or Task name<str>
  • Filename<str>
Master has finished sending integrity files. The files were received in task Task name previously created by the worker in ​

​syn_m_c​

​. If master had issues sending/processing/receiving worker integrity an error message will be sent instead of the task name and filename.

​syn_m_a_e​

Worker
  • Arguments<Dict>
Master has finished updating agent-info. Number of updated chunks and chunks with errors (if any) will be sent.
  • Error msg<str>