天天看點

wazuh叢集代碼分析(一)

wazuh叢集請求權限啟動同步協定

Message Received in Arguments Description

​syn_i_w_m_p​

​,

​syn_a_w_m_p​

Master None
  • Ask permission to start synchronization protocol. Message characters define the action to do:
  • I (integrity), A (agent-info).
  • W (worker), M (master), P (permission).

framework\wazuh\core\cluster\master.py

master處理進來的請求并且同步worker程序

class MasterHandler(server.AbstractServerHandler, c_common.WazuhCommon):
    """
    Handle incoming requests and sync processes with a worker.
    """
    def process_request(self, command: bytes, data: bytes) -> Tuple[bytes, bytes]:
        """Define all available commands that can be received from a worker node.

        Parameters
        ----------
        command : bytes
            Received command.
        data : bytes
            Received payload.

        Returns
        -------
        bytes
            Result.
        bytes
            Response message.
        """
        self.logger.debug(f"Command received: {command}")
        if command == b'syn_i_w_m_p' or command == b'syn_e_w_m_p' or command == b'syn_a_w_m_p':
            return self.get_permission(command)
        elif command == b'syn_i_w_m' or command == b'syn_e_w_m' or command == b'syn_a_w_m':
            return self.setup_sync_integrity(command, data)
        elif command == b'syn_i_w_m_e' or command == b'syn_e_w_m_e':
            return self.end_receiving_integrity_checksums(data.decode())
        elif command == b'syn_i_w_m_r' or command == b'syn_e_w_m_r':
            return self.process_sync_error_from_worker(command, data)
        elif command == b'dapi':
            self.server.dapi.add_request(self.name.encode() + b'*' + data)
            return b'ok', b'Added request to API requests queue'
        elif command == b'dapi_res':
            return self.process_dapi_res(data)
        elif command == b'dapi_err':
            dapi_client, error_msg = data.split(b' ', 1)
            asyncio.create_task(self.server.local_server.clients[dapi_client.decode()].send_request(command, error_msg))
            return b'ok', b'DAPI error forwarded to worker'
        elif command == b'get_nodes':
            cmd, res = self.get_nodes(json.loads(data))
            return cmd, json.dumps(res).encode()
        elif command == b'get_health':
            cmd, res = self.get_health(json.loads(data))
            return cmd, json.dumps(res,
                                   default=lambda o: "n/a" if isinstance(o, datetime) and o == datetime.fromtimestamp(0)
                                   else (o.__str__() if isinstance(o, datetime) else None)
                                   ).encode()
        elif command == b'sendsync':
            self.server.sendsync.add_request(self.name.encode() + b'*' + data)
            return b'ok', b'Added request to SendSync requests queue'
        else:
            return super().process_request(command, data)      

 get_permission(self, sync_type: bytes) -> Tuple[bytes, bytes]:

def get_permission(self, sync_type: bytes) -> Tuple[bytes, bytes]:
        """Get whether a sync process is in progress or not.

        Parameters
        ----------
        sync_type : bytes
            Sync process to check.

        Returns
        -------
        bytes
            Result.
        bytes
            Response message.
        """
        if sync_type == b'syn_i_w_m_p':
            permission = self.sync_integrity_free
        elif sync_type == b'syn_e_w_m_p':
            permission = self.sync_extra_valid_free
        elif sync_type == b'syn_a_w_m_p':
            permission = self.sync_agent_info_free
        else:
            permission = False

        return b'ok', str(permission).encode()      
class ReceiveIntegrityTask(c_common.ReceiveFileTask):
    """
    Create an asyncio.Task that waits until the master sends its integrity information and processes the
    received information.
    """

    def set_up_coro(self) -> Callable:
        """Set up the function to process the integrity files received from master."""
        return self.wazuh_common.process_files_from_master      

def set_up_coro(self) -> Callable:

python中函數名之後跟-> 

1.可以表示一個函數傳回的類型

1 def test(a: int = 1) -> int:
2     print('xxx')
3 
4 m = test(1)
5 print(m)

代碼運作結果如下:
xxx
None      
如果用鍊式調用,則鍊式調用外部需要加()

1 def Word()->WordCloud:
2     c = (
3         WordCloud()
4         .add("",words,word_size_range=[20,100])
5         .set_global_opts(title_opts=opts.TitleOpts(title="标題"))
6     )
7     return c

c 裡面采用了鍊式調用,是以要用() 把它們括起來,必須用(),  不能用[ ] 和 { }
以上寫法和下面這種寫法含義是一樣的:

1 def Word()->WordCloud:
2     c =WordCloud().add("",words,word_size_range=[20,100]).set_global_opts(title_opts=opts.TitleOpts(title="标題"))
3     return c      
class WorkerHandler(client.AbstractClient, c_common.WazuhCommon):
    async def process_files_from_master(self, name: str, file_received: asyncio.Event):
        """Perform relevant actions for each file according to its status.

        Process integrity files coming from the master. It updates necessary information and sends the master
        any required extra_valid files.

        Parameters
        ----------
        name : str
            Task ID that was waiting for the file to be received.
        file_received : asyncio.Event
            Asyncio event that is unlocked once the file has been received.
        """
        await asyncio.wait_for(file_received.wait(),
                               timeout=self.cluster_items['intervals']['communication']['timeout_receiving_file'])

        if isinstance(self.sync_tasks[name].filename, Exception):
            raise self.sync_tasks[name].filename

        zip_path = ""
        # Path of the zip containing a JSON with metadata and files to be updated in this worker node.
        received_filename = self.sync_tasks[name].filename
        try:
            logger = self.task_loggers['Integrity sync']
            self.integrity_sync_status['date_start'] = time.time()
            logger.info("Starting.")

            """
            - zip_path contains the path of the unzipped directory
            - ko_files contains a Dict with this structure:
              {'missing': {'<file_path>': {<MD5, merged, merged_name, etc>}, ...},
               'shared': {...}, 'extra': {...}, 'extra_valid': {...}}
            """
            ko_files, zip_path = await wazuh.core.cluster.cluster.decompress_files(received_filename)
            logger.info("Files to create: {} | Files to update: {} | Files to delete: {} | Files to send: {}".format(
                len(ko_files['missing']), len(ko_files['shared']), len(ko_files['extra']), len(ko_files['extra_valid']))
            )

            date_end = 0
            if ko_files['shared'] or ko_files['missing'] or ko_files['extra']:
                # Update or remove files in this worker node according to their status (missing, extra or shared).
                logger.debug("Worker does not meet integrity checks. Actions required.")
                logger.debug("Updating local files: Start.")
                self.update_master_files_in_worker(ko_files, zip_path)
                logger.debug("Updating local files: End.")
                date_end = time.time() - self.integrity_sync_status['date_start']

            # Send extra valid files to the master.
            if ko_files['extra_valid']:
                logger.debug("Master requires some worker files.")
                asyncio.create_task(self.sync_extra_valid(ko_files['extra_valid']))
            else:
                date_end and logger.info(f"Finished in {date_end:.3f}s.")

        finally:
            zip_path and shutil.rmtree(zip_path)      

分布式api

  • local_any:請求能夠被任何節點處理。這些請求是通常的資訊,master節點分發到其他節點的,例如rules, decoder, CDB 清單。這些請求從不轉發或遠端解決。
  • local_master:master節點處理這種請求,這些請求通常是關于叢集全局狀态/管理,比如agent 資訊/狀态/管理,agent組,叢集資訊等。
  • distributed_master:這類請求master必須轉發到最适合的節點解決。