天天看點

wazuh-monitord agent連接配接監控

ossecc -monitord程式監視agent的連接配接。此外,它每天或當内部日志達到一定的可配置大小時對其進行旋轉和壓縮。

1.掉線時間間隔配置

agent掉線時間間隔

        <agents_disconnection_time>1m</agents_disconnection_time>       

agent掉線告警時間間隔

        <agents_disconnection_alert_time>1h</agents_disconnection_alert_time>

2.監控agent連接配接

連接配接到消息隊列

/* Connect to the message queue
     * Exit if it fails.
     */
    if ((logr.m_queue = StartMQ(DEFAULTQUEUE, WRITE, INFINITE_OPENQ_ATTEMPTS)) < 0) {
        merror_exit(QUEUE_FATAL, DEFAULTQUEUE);
    }      

這個是通用消息隊列,一般消息都走這個消息隊列 

/* Default queue */
#define DEFAULTQUEUE    "queue/sockets/queue"      

 監控連接配接狀态函數

void monitor_agents_alert(){
    unsigned int inode_it = 0;
    OSHashNode *agent_hash_node = NULL;
    OSHashNode *agent_hash_next_node = NULL;

    cJSON *j_agent_info = NULL;
    cJSON *j_agent_status = NULL;
    cJSON *j_agent_lastkeepalive = NULL;
    cJSON *j_agent_name = NULL;
    cJSON *j_agent_ip = NULL;

    agent_hash_node = OSHash_Begin(agents_to_alert_hash, &inode_it);
    while (agent_hash_node) {
        agent_hash_next_node = OSHash_Next(agents_to_alert_hash, &inode_it, agent_hash_node);

        j_agent_info = wdb_get_agent_info(atoi(agent_hash_node->key), &sock);
        if (j_agent_info) {
            j_agent_status = cJSON_GetObjectItem(j_agent_info->child, "connection_status");
            j_agent_lastkeepalive = cJSON_GetObjectItem(j_agent_info->child, "last_keepalive");
            j_agent_name = cJSON_GetObjectItem(j_agent_info->child, "name");
            j_agent_ip = cJSON_GetObjectItem(j_agent_info->child, "register_ip");

            if (cJSON_IsString(j_agent_status) && j_agent_status->valuestring != NULL &&
                cJSON_IsString(j_agent_name) && j_agent_name->valuestring != NULL &&
                cJSON_IsString(j_agent_ip) && j_agent_ip->valuestring != NULL &&
                cJSON_IsNumber(j_agent_lastkeepalive)) {

                    if (strcmp(j_agent_status->valuestring, "active") == 0) {
                        /* The agent is now connected, removing from table */
                        OSHash_Delete(agents_to_alert_hash, agent_hash_node->key);
                    }

                    else if (j_agent_lastkeepalive->valueint < (time(0) -
                            (mond.global.agents_disconnection_time + mond.global.agents_disconnection_alert_time))) {
                        /* Generating the disconnection alert */
                        char *agent_name_ip = NULL;
                        os_strdup(j_agent_name->valuestring, agent_name_ip);
                        wm_strcat(&agent_name_ip, j_agent_ip->valuestring, '-');
                        monitor_send_disconnection_msg(agent_name_ip);
                        OSHash_Delete(agents_to_alert_hash, agent_hash_node->key);
                        os_free(agent_name_ip);
                    }
                }
        } else {
            mdebug1("Unable to retrieve agent's '%s' data from Wazuh DB", agent_hash_node->key);
            OSHash_Delete(agents_to_alert_hash, agent_hash_node->key);
        }
        cJSON_Delete(j_agent_info);
        agent_hash_node = agent_hash_next_node;
    }
}      

掉線逾時判斷else if ( j_agent_lastkeepalive->valueint < (time(0) - (mond.global.agents_disconnection_time + mond.global.agents_disconnection_alert_time)) )

if (strcmp(j_agent_status->valuestring, "active") == 0) {
                        /* The agent is now connected, removing from table */
                        OSHash_Delete(agents_to_alert_hash, agent_hash_node->key);
                    }

                    else if (j_agent_lastkeepalive->valueint < (time(0) -
                            (mond.global.agents_disconnection_time + mond.global.agents_disconnection_alert_time))) {
                        /* Generating the disconnection alert */
                        char *agent_name_ip = NULL;
                        os_strdup(j_agent_name->valuestring, agent_name_ip);
                        wm_strcat(&agent_name_ip, j_agent_ip->valuestring, '-');
                        monitor_send_disconnection_msg(agent_name_ip);
                        OSHash_Delete(agents_to_alert_hash, agent_hash_node->key);
                        os_free(agent_name_ip);
                    }      

 發送掉線消息

void monitor_send_disconnection_msg(char *agent) {
    char str[OS_SIZE_1024];
    int error;

    memset(str, '\0', OS_SIZE_1024);
    /* Send disconnected message */
    snprintf(str, OS_SIZE_1024, AG_DISCON_MSG, agent);
    if (error = mon_send_agent_msg(agent, str), error) {
        if (error == 2) {
            // Agent is no longer in the database
            monitor_send_deletion_msg(agent);
        } else {
            mdebug1("Could not generate disconnected agent alert for '%s'", agent);
        }
    }
}      

如果 mon_send_agent_msg函數傳回2,說明Wazuh DB中沒有找到agent_id(具體是通過agent_name找agent_id)

int mon_send_agent_msg(char *agent, char *msg) {
    char header[OS_SIZE_256];
    char ag_name[OS_SIZE_128];
    int ag_id;
    char *ag_ip = NULL;
    char *found = agent;
    size_t name_size;

    while (found = strchr(found, '-'), found) {
        ag_ip = ++found;
    }

    if (!ag_ip) {
        return 1;
    }

    if (name_size = strlen(agent) - strlen(ag_ip), name_size > OS_SIZE_128) {
        return 1;
    }

    snprintf(ag_name, name_size, "%s", agent);

    if (ag_id = wdb_find_agent(ag_name, ag_ip, NULL), ag_id > 0) {

        snprintf(header, OS_SIZE_256, "[%03d] (%s) %s", ag_id, ag_name, ag_ip);
        if (SendMSG(mond.a_queue, msg, header, SECURE_MQ) < 0) {
            mond.a_queue = -1;  // set an invalid fd so we can attempt to reconnect later on.
            merror(QUEUE_SEND);
            return 1;
        }
        return 0;
    } else if (ag_id == -2) {
        return 2;
    }

    return 1;
}      

 将消息通過消息隊列發出去

SendMSG(mond.a_queue, msg, header, SECURE_MQ)      
/* Send a message to the queue */
int SendMSG(int queue, const char *message, const char *locmsg, char loc)
{
    int __mq_rcode;
    char tmpstr[OS_MAXSTR + 1];
    static int reported = 0;

    tmpstr[OS_MAXSTR] = '\0';

    /* Check for global locks */
    os_wait();

    if (loc == SECURE_MQ) {
        loc = message[0];
        message++;

        if (message[0] != ':') {
            merror(FORMAT_ERROR);
            return (0);
        }
        message++; /* Pointing now to the location */

        if (strncmp(message, "keepalive", 9) == 0) {
            return (0);
        }

        snprintf(tmpstr, OS_MAXSTR, "%c:%s->%s", loc, locmsg, message);
    } else {
        snprintf(tmpstr, OS_MAXSTR, "%c:%s:%s", loc, locmsg, message);
    }

    /* Queue not available */
    if (queue < 0) {
        return (-1);
    }

    if ((__mq_rcode = OS_SendUnix(queue, tmpstr, 0)) < 0) {
        /* Error on the socket */
        if (__mq_rcode == OS_SOCKTERR) {
            merror("socketerr (not available).");
            close(queue);
            return (-1);
        }

        /* Unable to send. Socket busy */
        mdebug2("Socket busy, discarding message.");

        if (!reported) {
            reported = 1;
            mwarn("Socket busy, discarding message.");
        }
    }

    return (0);
}      
/* Send a message using a Unix socket
 * Returns the OS_SOCKETERR if it fails
 */
int OS_SendUnix(int socket, const char *msg, int size)
{
    if (size == 0) {
        size = strlen(msg) + 1;
    }

    if (send(socket, msg, size, 0) < size) {
        if (errno == ENOBUFS) {
            return (OS_SOCKBUSY);
        }

        return (OS_SOCKTERR);
    }

    return (OS_SUCCESS);
}      

 mond.a_queue 是socket對應的fd

通過SendMSG函數可知:

  • 如果消息中包含"keepalive"字元串,那麼消息不會發送。
  • 消息發出去的結構是 SECURE_MQ:header:msg這三部以冒号分隔,然後通過OS_SendUnix發送到消息隊列
  • mond.a_queue 指向的DEFAULTQUEUE

3.ossec-analysisd 程序接收

設定消息隊列

/* Set the queue */
        if ((m_queue = StartMQ(DEFAULTQUEUE, READ, 0)) < 0) {
            merror_exit(QUEUE_ERROR, DEFAULTQUEUE, strerror(errno));
        }      

 接收處理消息隊列中的消息

/* Create message handler thread */

    w_create_thread(ad_input_main, &m_queue);      

 消息接收函數

/* Receive a message from a Unix socket */
int OS_RecvUnix(int socket, int sizet, char *ret)
{
    struct sockaddr_un n_us;
    socklen_t us_l = sizeof(n_us);
    ssize_t recvd;
    ret[sizet] = '\0';

    if ((recvd = recvfrom(socket, ret, sizet - 1, 0,
                          (struct sockaddr *)&n_us, &us_l)) < 0) {
        return (0);
    }

    ret[recvd] = '\0';
    return ((int)recvd);
}      

繼續閱讀