zabbix 项目组 | 合并检测

5.总结

 

实在我们透过把流量数据转发给Lua,让Lua处理又高级的数据检索需求,在实际的做事吃,有些应用的访问者会给有畸形的杂质信息。如果某些应用输入脏数据,直接会造成程序崩溃,程序同时不出口日志,这种体制的流量监听虽会见发出以场景了,比方说,我们进行大量底扫描行为了,会生一些或多或少程序之前预想之外的多寡,为了还原是现实性那长长的扫描把程序为挂了,我们就算好活的勾一个lua插件,捕获脏数据。

随即程序只是一个抛砖引玉,我们直接通过C加Lua的章程,灵感来源于至Nginx + lua
,
 就是今盛行的openresty服务器,又有何不可就此到C的胜性能,又使Lua提高了继往开来处理的八面玲珑。如果要是拍卖又老流量的单机流量监听,应该继承在环形buffer
缓存数据,如果一直以日志数据syslog到远端口底syslog服务达标,我们就算可以假设lua开发一个插件,做syslog转发就是吓,这就是是当下设想下lua管道设计开这个试验工具的目地。

今昔我们开的干活便象是是,让tcpdump支持lua插件扩展。

文中提到的代码,放到了Github上:https://github.com/shengnoah/riff

信用社级网站服务器单机运行的已经大少了,最低配置为是一个档组2华主机进行互备。那么对品种组的各种监督就待联合才能见到本质。
吓吧我承认zabbix已经同我们想到了–zabbix整合(zabbix Aggregate)

4.采用实例 

当您拿走了流量数据后,理论及我们想干什么,由咱们的想象力决定,在实际上的用场景被,我们比如说无深刻一个下的一些,就想取这以之输入数据,比如是应用是一个HTTP
SEVER,Openrety服务,我们能无克经过启动这个监听程序,来获得对有nginx服务之用户要的agent数据为,其实这演示程序可以好,我们组织一个粗略的要。

 

curl --user-agent "pcap testcase" www.lua.ren

 

下一场我们,启动之本子程序:

 

./watch

图片 1

咱俩只是于
flter-plugin这个lua插件中,对action()回调函数,添加了一个简单易行的拍卖,就破获到了User-Agent的音讯含有”pcap”的多少。

 

function filter_plugin.action(self, stream) 
    io.write(stream.data, "\n")
    local flg = string.find(stream.data, "pcap")
    if flg then 
        print("###########[ OK ]#############")
    end 
end
groupfunc["Host group","Item key",itemfunc,timeperiod]

2.PCap底C语言实现

 

#include <pcap.h>  
#include <time.h>  
#include <stdlib.h>  
#include <stdio.h>  
#include <string.h>
#include <lua.h>
#include <lauxlib.h>
#include <lualib.h>
lua_State* L = NULL;
void getPacket(u_char * arg, const struct pcap_pkthdr * pkthdr, const u_char * packet)  
{ 
  L = lua_open();
      luaL_openlibs(L);
  if (luaL_loadfile(L, "buffer.lua") || lua_pcall(L, 0,0,0))
      printf("Cannot run configuration file:%s", lua_tostring(L, -1));
  lua_getglobal(L, "buffer");
  char *buffer = NULL;
  buffer = (u_char*)malloc(pkthdr->len);
  memcpy(buffer, packet, pkthdr->len);
  lua_newtable(L); 
  int idx = 0;
  for (idx=1; idx < pkthdr->len; idx++) {
      lua_pushnumber(L, idx);  
      lua_pushnumber(L, packet[idx]);  
      lua_settable(L, -3);  
  }
  lua_pcall(L, 1,0,0);
  int * id = (int *)arg;  
  printf("id: %d\n", ++(*id));  
  printf("Packet length: %d\n", pkthdr->len);  
  printf("Number of bytes: %d\n", pkthdr->caplen);  
  printf("Recieved time: %s", ctime((const time_t *)&pkthdr->ts.tv_sec));   
    
  int i;  
  for(i=0; i<pkthdr->len; ++i) {  
    //printf(" %02x", packet[i]);  
    if( (i + 1) % 16 == 0 ) {  
//printf("\n");  
    }  
  }  
  printf("\n\n");  
  free(buffer);
  buffer = NULL;
}  
  
int main()  
{  
  char errBuf[PCAP_ERRBUF_SIZE], * devStr;  
    
  /* get a device */  
  devStr = "eth1";
    
  if(devStr)  
  {  
    printf("success: device: %s\n", devStr);  
  }  
  else  
  {  
    printf("error: %s\n", errBuf);  
    exit(1);  
  }  
    
  /* open a device, wait until a packet arrives */  
  pcap_t * device = pcap_open_live(devStr, 65535, 1, 0, errBuf);  
    
  if(!device)  
  {  
    printf("error: pcap_open_live(): %s\n", errBuf);  
    exit(1);  
  }  
    
  /* construct a filter */  
  struct bpf_program filter;  
  pcap_compile(device, &filter, "dat port 80", 1, 0);  
  pcap_setfilter(device, &filter);  
    
  /* wait loop forever */  
  int id = 0;  
  pcap_loop(device, -1, getPacket, (u_char*)&id);  
    
  pcap_close(device);  
  
  return 0;  
}  

C部分用监听的流量buffer的数,以数组的款式为lua,在lua中array其实就是一个table,我们以lua部分构成了一晃数组数据,生成了一个字符串,代码如下:

 

buffer = function(tbl)
    local tmpstr=''
    for k,v in pairs(tbl) do
        tmpstr = tmpstr..string.char(v)
    end
    io.write(tmpstr,"\n")
end

编译C程序就算依靠下面的一声令下执行,后期我们也可以很成一个makefile简化编译流程。

 

gcc watch.c -I/usr/include/lua5.1 -ldl -lm -llua5.1 -lpcap -o watch

为方便 ,我们刻画了一个Makefile:

 

LUALIB=-I/usr/include/lua5.1 -lpcap -ldl -lm -llua5.1 

.PHONY: all win linux

all:
        @echo Please do \'make PLATFORM\' where PLATFORM is one of these:
        @echo win linux

win:

linux: watch

watch : watch.c
         gcc $^ -o$@ $(LUALIB) 
clean:
        rm -f watch 

 

  • 参数timeperiod为指定的采访时间,可以动用时间单位,例如可以动用1d替86400(单位默认为秒),5m代表300.

1.前言

咱在实际工作屡遭,遇到了一个如此的用例,在每日例行扫描活动着,发现有点应用系统非期的被扫挂,因为咱们无是劳务之制造者,没有办法在不同的网里打印日志,所以我们尽管想用一个家伙来获取特定服务之输入数据流。我们设非在IDS上看以的服务,可以一直指向服务所在劳务岗位,针对使用端口进行,有针对的监听分析。

Tshark和tcpdump、windump这些监听工具提供了比较丰富的下令执行参数来监听流量数据。wireshark、burpsuite这些家伙为提供相应的lua、python脚本的机制用来去处理监听的流量数据。但有点场景我们无会见利用体积这么好的工具,命令执行道的监听工具又未能够在更多多少处理逻辑,细化对数据的操作。实际上我们得以自制一个小型的工具,做流量监听,是除命令并shell脚本,、wireshark、suricata等插件开发的任何一样种植形式。现在成千上万之监听工具还是根据pcap的,我们根据pcap底层开发一个监听工具。

pcap支持C、python两种植开发方式,基于C和pcap库的开支效率比pyton的性大,这样以青出于蓝性能的现象python就无太符合,但是自开效率角度看,用python开发比C又如果尽快多,毕竟用C开发工具,需要进行限编译,一凡出技巧门槛,另外的确维护相对较费心。

出于以上的故,既要性能相对够高一些,又能够便民维护,在命令执行是粒度上,又足以坐自己数处理逻辑,定制化自己之运行时序,因些,我们选了C和LUA配合的这种方法。实施点,就是之所以C来拍卖pcap的主件循环,接受pcap监听的buffer数据。然后,将监听的数目通过C与LUA之间的通信,将数据推送给LUA。

数提交LUA之后,如何保管数据的繁杂就指LUA的统筹方式来缓解,因为流量数据是文本流式的,程序原型就是想开了pipeline管道的法子开展集团管制。

发了管道的章程,我们虽足以以一个监听数据流上,叠加各种插件进地监听数据的处理,可以将复杂的工作,拆解成几个小之插件处理单元,写作就任务。演示原型代码的C部分是坏少之,主要的天职是抱buffer的多寡,推送lua,代码如下:

groupfunc 描述
grpavg 平均值
grpmax 最大值
grpmin 最小值
grpsum 总和

3.Lua以及管道插件设计

胡要用管道插件的方拆分和集团模块?以什么形式传送数据变成了一个手艺,解耦最直白的道是子,先管多少及为业分开,再管事情代码和共同通代码分开。数据层对我们系的话即使是平整,系统以的共通代码都打包到了框架层,而系统机能业务共通的有,以插件为职能单位分别并建关联。数据是面象用户之,框架是面向插件开发者的,
插件的贯彻就是法力负担要做的事体,不同的插件组合相对简便易行的生成新机能,也是插件便利的补益与有的含义。

 

 

以管道被的插件是会吃顺序调用的,因此插件模板被之init和action函数也会见给正常的回调,而这些回调函数在让调用时,管道体系会把流多少push给单元插件,而收数据流的插件在接受回调push过来的数码后,进行对应的判断筛选,将编辑后底多少经过sink插槽push给末端的插件,直到管道尾端的插件报警或记日志,一涂鸦管道启动运行的时序就结束了。

图片 2

 

马上是一个稍微图型化的pipeline示意图:

俺们所以代码说明管道的实现又直观,代码如下:

 

local pipeline = require "pipeline"
local status = pipeline:new {
    require"plugin.source_plugin",
    require"plugin.filter_plugin",
}
return pipeline

字符型式的管道图示:

 

 

+——————+     +—————-+
  

| source-plugin |     |  filter-plugin  |  
   

                      src – sink            
 src 

+——————+     +—————–+
    

 

 

俺们通过LUA特有的切近组织方式构建了一个梯次的管道数据结构,管道中的插件是按照声明的先后顺序来推行的。pipeline管道程序的重点逻辑就是是治本回调函数的调用,代码如下:

 

local Pipeline = {}
local Pobj = {}
function Pipeline.output(self, list, flg)
    if flg == 0 then
        return 
    end
    for k,v in pairs(list) do
        print(k,v)
    end
end
function Pipeline.new(self, elements)
    self.element_list = elements
    self:output(elements, 0)
    return PObj
end
function Pipeline.run(self, pcapdata)
    local src = {
        metadata= { 
            data= pcapdata,
            request = {
                uri="http://www.candylab.net"
            }
        }
    }
    for k,v in pairs(self.element_list) do
        v:init()
        v:push(src)
        local src, sink = v:match(pcapdata)
        if type(sink) == "table" then
            self:output(sink, 0)
        end
        src = sink
    end
end
return Pipeline

插件抽像有了几个单纯的函数给支付用户,时序是预先计划好之,最要紧的多寡及回调也判了,主要是Pipeline.run统一回调了几单模板的函数,init、push、match函数,这样顶层的筹划几乎是稳定的,之后有的事务逻辑都以模板了,按这个时序执行,而插件之间的多少传递依靠的即使src和sink这个插件。

依据管道插件的规划特性就是是前面的插件会拿源的数码推送给后面的插件,如果管道被的多少以头里为编了,会体现在后的插件接受多少后见变化,具体的实现,代码如下:

 

local source_plugin = {}
local src = {
   args="source args"
}
local sink = {
    name = "source_plugin",
    ver = "0.1"
}
function source_plugin.output(self, list, flg)
    if flg == 0 then
        return
    end
    for k,v in pairs(list) do
        print(k,v)
    end
end
function source_plugin.push(self, stream) 
    for k,v in pairs(stream.metadata) do
        self.source[k]=v
    end 
end
function  source_plugin.init(self)
    self.source = src
    self.sink = sink
end
function source_plugin.action(self, stream) 
end
function  source_plugin.match(self, param)
    self.sink['found_flg']=false
    for kn,kv in pairs(self.source) do
         self.sink[kn] = kv
    end
    self.sink['metadata'] = { data=self.source['data'] }
    self:action(self.sink)
    return self.source, self.sink
end
return  source_plugin

source_plugin是一个名列前茅的插件模板,所有给pipeline回调函数都了如指掌,但对此插件的以以来,可以完全不用关爱内部细节,只关注一个函数就实行了,就action(self,
stream)这个函数,能提供的兼具数据还早已为保留及stream这个数据结构里了,对监听的保有后期处理还起此间开始。如果创建一个初插件呢?就是复制源文件改一个称作就推行了,创建了一个filter_plugin的插件,代码如下:

 

local filter_plugin = {}
local src = {
   args="filter args"
}
local sink = {
    name = "filter_plugin",
    ver = "0.1"
}
function filter_plugin.output(self, list, flg)
    if flg == 0 then
        return
    end
    for k,v in pairs(list) do
        print(k,v)
    end
end
function filter_plugin.push(self, stream) 
    for k,v in pairs(stream.metadata) do
        self.source[k]=v
    end 
end
function  filter_plugin.init(self)
    self.source = src
    self.sink = sink
end
function filter_plugin.action(self, stream) 
    io.write(stream.data, "\n")
end
function  filter_plugin.match(self, param)
    self.sink['found_flg']=false
    for kn,kv in pairs(self.source) do
         self.sink[kn] = kv
    end
    self.sink['metadata'] = { data=self.source['data'] }
    self:action(self.sink)
    return self.source, self.sink
end
return  filter_plugin

变化了这个文件,我们于管理里进入这插件就是OK了,代码:

 

local pipeline = require "pipeline"
local status = pipeline:new {
    require"plugin.source_plugin",
    require"plugin.filter_plugin",
    require"plugin.syslog_plugin",
}
return pipeline

管道图示:

 

 

+—————+       +—————–+
    +——————+

| source-plugin |     |  filter-plugin  |  
  | syslog-plugin |  

                  src – sink                
 src – sink                ….

+—————+       +—————–+
    +——————+

 

 

咱以此管道图示的后面,看到多了一个syslog-plugin的插件,这个插件追加的作用就是拿眼前插件处理的流数据,通过syslog协议,将反复所抱到远端的syslog服务器上,集中到很数目日志中心进行剖析显示,这样这个次就算是一个大概的督察代理的模型。下面我们以先后过行起来,看一下执之成效。

示例3
组web 5分钟内之平均查询速度(次/秒)

grpavg[["Servers A","Servers B","Servers C"],system.cpu.load,last,0]

示例4
大多单组的cpu负载平均值

grpsum["web","vfs.fs.size[/,total]",last,0]
  • groupfunc 可以选取如下:

1、创办专门用来放聚合监控项的主机

创建All-host

  • aggregate item key语法如下:

2、择品种zabbix整合(zabbix Aggregate)

grpavg["web",mysql.qps,avg,5m]

itemfunc 描述
avg 平均值
count value个数
last 最新值
max 最大值
min 最小值
sum 总值

示例1
组web剩余硬盘空间大小

#汇总web 组 em2 网卡下行流量最新值之和
grpsum["web","net.if.in[em2]",last,0]
  • itemfunc 可以选择如下

示例2
组web的平均CPU负载

以下使用范例来自网络

由于监督项目是寄于主机,所以我们得以专程创建一个127.0.0.1的主机(不需要加模板),把集的监察项都合并置于这令主机及。

grpavg["web","system.cpu.load[,avg1]",last,0]

发表评论

电子邮件地址不会被公开。 必填项已用*标注