柴少鹏的官方网站 技术在分享中进步,水平在学习中升华

ELK利用GeoIP映射用户地理位置(十二)

      IP地址位置用于确定IP地址的物理位置,因为ELK已经收集了web日志,如果通过ELK分析出来用户来源地址的比例,那么多网站机构的调整都是非常有帮助的。

一、跟着官网了解一波Geoip

       GeoIP过滤器根据来自Maxmind GeoLite2数据库的数据添加有关IP地址的地理位置的信息。

1.1 支持的数据库

       这个插件与开箱即用的GeoLite2(https://dev.maxmind.com/geoip/geoip2/geolite2/)城市数据库捆绑在一起。 从Maxmind的描述 ----GeoLite2数据库是免费的IP地理位置数据库,可与MaxMind的GeoIP2数据库相媲美,但不太准确”。 有关更多详细信息,请参阅GeoIP Lite2许可证。Maxmind的商业数据库(https://www.maxmind.com/en/geoip2-databases)也支持这个插件。简单来说就是两个数据库一个免费的一个商业的,免费的不如商业的地理位置精准。

       如果您需要使用捆绑的GeoLite2城市以外的数据库,则可以直接从Maxmind网站下载数据库,并使用数据库选项指定其位置。 GeoLite2数据库可以从这里下载(https://dev.maxmind.com/geoip/geoip2/geolite2/)。

       如果您想获得自治系统编号(ASN)信息,您可以使用geolite2 - ASN数据库。

1.2 细节

       如果GeoIP查找返回经度和纬度,则会创建[geoip][location]字段。该字段以GeoJSON(http://geojson.org/geojson-spec.html)格式存储。此外,还提供了默认的Elasticsearch模板elasticsearch输出(https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html)映射这[geoip] [location]字段添加到Elasticsearch geo_point。

       由于这个字段是一个geo_point,它仍然是有效的GeoJSON,您将获得Elasticsearch的地理空间查询,构面和过滤器功能以及为所有其他应用程序(如Kibana的地图可视化)提供GeoJSON的灵活性。

      本产品包含由MaxMind创建的GeoLite2数据,可从中获取http://www.maxmind.com. 这个数据库是根据授权知识共享署名 - 相同方式共享4.0国际许可(https://creativecommons.org/licenses/by-sa/4.0/)。

      GeoIP过滤器的版本4.0.0和更高版本使用MaxMind GeoLite2数据库并支持IPv4和IPv6查找。 4.0.0之前的版本使用传统的MaxMind GeoLite数据库,仅支持IPv4查找。 

1.3 Geoip筛选器配置选项

       该插件支持以下配置选项和稍后介绍的通用选项。

image.png

cache_size:

      值类型是数字,默认值是1000。

      GeoIP查询的成本非常高。 该过滤器使用缓存来利用IP代理通常在日志文件中彼此相邻的情况,并且很少具有随机分布。 设置的越高,项目在缓存中的可能性就越大,并且此过滤器运行得越快。 但是,如果将此设置得太高,则可能会使用比所需更多的内存。 由于Geoip API升级到v2,目前没有任何驱逐策略,如果缓存已满,则不能添加更多记录。 尝试使用此选项的不同值来查找数据集的最佳性能。

      这必须设置为大于0的值。真的没有理由不想要这种行为,开销很小,速度增益很大。

      注意这个配置值对于geoip_type是全局的,这一点很重要。 也就是说,相同geoip_type的geoip过滤器的所有实例共享相同的高速缓存。 最后声明的缓存大小将获胜。 这样做的原因是,在流水线中的不同点上为不同的实例分配多个缓存是没有好处的,这只会增加缓存未命中次数和浪费内存。

database:

       值类型是path,这个设置没有默认值。

       Logstash应该使用的Maxmind数据库文件的路径。 默认数据库是GeoLite2-City。 GeoLite2-City,GeoLite2-Country,GeoLite2-ASN是Maxmind支持的免费数据库。 GeoIP2-City,GeoIP2-ISP,GeoIP2-Country是Maxmind支持的商业数据库。

      如果未指定,则默认为Logstash附带的GeoLite2城市数据库。

default_database_type:

       这个插件现在包括GeoLite2-City和GeoLite2-ASN数据库。如果未设置数据库和default_database_type,则将选择GeoLite2-City数据库。要使用包含的GeoLite2-ASN数据库,请将default_database_type设置为ASN。

       值类型是字符串,默认值是city。唯一可接受的值是City和ASN。

fields:

       值类型是array,这个设置没有默认值。

       要包含在事件中的geoip字段数组。可能的字段取决于数据库类型。 默认情况下,所有geoip字段都包含在事件中。

对于内置的GeoLite2城市数据库,可以使用以下内容:city_name, continent_code, country_code2, country_code3, country_name, dma_code, ip, latitude, longitude, postal_code, region_name 和 timezone.

source:

       这是一个必需的设置。值类型是字符串,这个设置没有默认值。

       包含要通过geoip映射的IP地址或主机名的字段。 如果这个字段是一个数组,则只会使用第一个值。

tag_on_failure:

      值类型是数组,默认值是[“_geoip_lookup_failure”]。这个设置没有默认值。

      将事件标记为未能查找地理信息。 这可以在以后的分析中使用。

target:

      值类型是字符串,默认值是“geoip”。

      指定Logstash应该存储geoip数据的字段。 例如,如果您有src_ip和dst_ip字段,并希望两个IP的GeoIP信息,这可能是有用的。

       如果将数据保存到geoip以外的目标字段,并希望在Elasticsearch中使用与geo_point相关的函数,则需要更改Elasticsearch输出提供的模板,并将输出配置为使用新模板。

       即使您不使用geo_point映射,[target] [location]字段仍然是有效的GeoJSON。

1.4 通用选项

#这个在前面几节都翻译了,这里就简单记录下了。

所有过滤器插件都支持以下配置选项:

image.png

add_field:

     值类型是hash,默认值是{}

     如果此过滤器成功,请将任意字段添加到此事件。 字段名称可以是动态的,并使用%{field}来包含事件的一部分。

add_tag:

      值类型是数组。默认值是[]。如果此过滤器成功,请向该事件添加任意标签。 标签可以是动态的,并使用%{field}语法包含事件的一部分。

enable_metric:

       值类型是布尔值,默认值为true。为特定的插件实例禁用或启用度量标准日志记录,我们默认记录所有的度量标准,但是您可以禁用特定插件的度量标准收集。

id:

     值类型是字符串,这个设置没有默认值。为插件配置添加一个唯一的ID。 如果没有指定ID,Logstash将会生成一个。 强烈建议在您的配置中设置此ID。 当你有两个或多个相同类型的插件时,这是特别有用的,例如,如果你有两个geoip过滤器。 在这种情况下添加一个命名的ID将有助于在使用监视API时监视Logstash。

periodic_flush:

      值类型是布尔值,默认值是false。定期调用过滤器刷新方法。 可选的。

remove_field:

       值类型是数组,默认值是[]。如果此过滤器成功,请从此事件中删除任意字段。

remove_tag:

        值类型是数组,默认值是[]。如果此过滤器成功,请从该事件中移除任意标签。 标签可以是动态的,并使用%{field}语法包含事件的一部分。

博文来自:www.51niux.com

二、环境部署

2.1 Logstash日志格式匹配编写:

223.11.97.79 - - [14/Mar/2017:00:00:04 +0800] "GET /00/11/00114827.mp3 HTTP/1.1" 206 941328 "http://d.51niux.com/search/searchKsc.htm?wd=%E4%AA%8E%E5%B0%8F%E5%8D%96%E8%92%AA&fieldStr=&encode=encode" "Player3.1.0.4" "-" 0.199 - - HIT

 #类似于这样一种nginx proxy cache的日志格式,先对其进行一下日志的转变

$ cat /home/elk/logstash/patterns/51niux 

PROXY_HTTP_ACCESS %{IP:fromip} - - \[%{HTTPDATE:nginx.access.time}\] "%{WORD:nginx.access.method} %{DATA:nginx.access.url} HTTP/%{NUMBER:nginx.access.http_version}\" %{NUMBER:nginx.access.response_code} %{NUMBER:nginx.access.body_sent.bytes} \"%{DATA:nginx.access.referrer}\" \"%{DATA:nginx.access.agent}" "-" %{NUMBER:request_time}[\s-]+(?<cache_status>[a-zA-Z]+)

$ vim /home/elk/logstash/conf/nginx-cache.conf 

input {
    beats {
        host => "192.168.14.67"
        port => 5066
    }
}
filter {
       grok {
                 patterns_dir => "/home/elk/logstash/patterns"
                 match => {
                          "message" => "%{PROXY_HTTP_ACCESS}"
                 }
            }
}
output {
    stdout { codec => rubydebug }   #先输出测试一下看看格式对不对哈
}

$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/nginx-cache.conf 

"cache_status" => "HIT",
nginx.access.url" => "/00/11/00114827.mp3",
"fromip" => "223.11.97.79",
"nginx.access.body_sent.bytes" => "941328"

#就粘贴了一小点输出哈,可以看出我们定义的匹配规则已经可以了,已经对日志进行了切分。

2.2 Geoip的配置

下载城市数据库:

$cd /home/elk/logstash/conf

$wget http://geolite.maxmind.com/download/geoip/database/GeoLiteCity.dat.gz

$tar -zxvf GeoLite2-City.tar.gz
$cd GeoLite2-City_20171205/

$ ls -lh

总用量 61M
-rw-r--r-- 1 2000 2000  55 12月  7 04:04 COPYRIGHT.txt
-rw-r--r-- 1 2000 2000 61M 12月  7 04:04 GeoLite2-City.mmdb
-rw-r--r-- 1 2000 2000 433 12月  7 04:04 LICENSE.txt
-rw-r--r-- 1 2000 2000 116 12月  7 04:04 README.txt

#GeoLite数据库的官网说自己每月的第一个星期二的MaxMind更新。 因此,如果想要始终拥有最新的数据库,则应该设置一个每月一次下载数据库的cron作业。

$ cp GeoLite2-City.mmdb  /home/elk/logstash/conf/

=====================================旧版的方式=============================================================

$gunzip GeoLiteCity.dat.gz

# ls -lh /home/elk/logstash/conf/GeoLiteCity.dat

-rw-r--r-- 1 elk elk 17M 12月  7 03:22 /home/elk/logstash/conf/GeoLiteCity.dat

#旧版本Logstash2.x系统使用的是这种二进制文件,但是在新版本使用不了会有报错的。

[2017-12-18T10:14:53,867][ERROR][logstash.pipeline        ] Error registering plugin {:pipeline_id=>"main", :plugin=>"#<LogStash::FilterDelegator:0x2db88668 @metric_events_out=org.jruby.proxy.org.logstash.instrument.metrics.counter.LongCounter$Proxy2 - namespace: [stats, pipelines, main, plugins, filters, cfcf4d91f00a2e40c80b84aa6ba0d21b2f502729498fc50ad8bb3682d0c7bde3, events] key: out value:0, @metric_events_in=org.jruby.proxy.org.logstash.instrument.metrics.counter.LongCounter$Proxy2 - namespace: 
......
[2017-12-18T10:14:53,963][ERROR][logstash.pipeline        ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>java.lang.IllegalArgumentException: The database provided is invalid or corrupted., :backtrace=>["org.logstash.filters.GeoIPFilter.<init>(org/lo

#大概意思就是库文件可能有损坏也就是读取不了不支持这种形式.......

===========================================================================================================

2.3 Logstash配置文件中加入Geoip过滤并启动测试

$ vim /home/elk/logstash/conf/nginx-cache.conf

input {
    beats {
        host => "192.168.14.67"
        port => 5066
    }
}
filter {
            grok {
                 patterns_dir => "/home/elk/logstash/patterns"
                 match => {
                          "message" => "%{PROXY_HTTP_ACCESS}"
                 }
            }
            geoip {
                        source => "fromip"   #指定哪个字段过滤成物理地址肯定是来源IP啊
                        database => "/home/elk/logstash/conf/GeoLite2-City.mmdb"    #指定库的位置
            }
}
output {
    stdout { codec => rubydebug }   #还是先打印到屏幕先看看效果出来没有
}

$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/nginx-cache.conf   #启动服务这次没有报错了。从下面的启动过程中已经可以看到已经加载了地址库并启动了程序

Sending Logstash's logs to /home/elk/logstash/logs which is now configured via log4j2.properties
[2017-12-18T14:24:11,308][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/home/elk/logstash/modules/fb_apache/configuration"}
[2017-12-18T14:24:11,314][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/home/elk/logstash/modules/netflow/configuration"}
[2017-12-18T14:24:11,728][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2017-12-18T14:24:11,993][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-12-18T14:24:16,042][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/home/elk/logstash/conf/GeoLite2-City.mmdb"}
[2017-12-18T14:24:16,108][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500, :thread=>"#<Thread:0x104c045a@/home/elk/logstash/logstash-core/lib/logstash/pipeline.rb:290 run>"}
[2017-12-18T14:24:16,934][INFO ][logstash.inputs.beats    ] Beats inputs: Starting input listener {:address=>"192.168.14.67:5066"}
[2017-12-18T14:24:17,029][INFO ][logstash.pipeline        ] Pipeline started {"pipeline.id"=>"main"}
[2017-12-18T14:24:17,059][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2017-12-18T14:24:17,077][INFO ][org.logstash.beats.Server] Starting server on port: 5066

2.4 web日志写入测试并调整

# cat /opt/lzotest1.log |head -1 >>/usr/local/nginx/logs/access.log   #搞了一份web缓存的日志,先把第一行日志打到测试环境的的nginx日志文件中。要公网IP地址哦。

查看下Logstash的屏幕输出:

{
       "nginx.access.http_version" => "1.1",
                           "geoip" => {    #因为我们走的默认,默认target标签就是geoip
             "city_name" => "Hebei",        #城市地址有的时候精确到市有的时候就是省地址
              "timezone" => "Asia/Shanghai",   #时区是上海时区
                    "ip" => "123.183.135.141",   #IP地址
              "latitude" => 39.8897,           #维度
          "country_name" => "China",      #国家的名字
         "country_code2" => "CN",         #国家简写
        "continent_code" => "AS",         #哪个大洲
         "country_code3" => "CN",          
           "region_name" => "Hebei",      #地区名称 
              "location" => {             #位置
            "lon" => 115.275,           #经度
            "lat" => 39.8897            #维度
        },

#默认就是会输出好多信息,看来还是需要精简一下输出field。

图片.png

#通过IP查询,IP地址也是正确的确实是河北省。
#如果是内网IP地址呢,输入就是下面的结果:

                           "geoip" => {},  #geoip里面是空的不包含任何信息,因为内网解析不出来。
#if [fromip] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." {   #当然可以加下if判断吧geoip过滤包一下,这样内网的就不过滤了也就不会出现geoip空字段了,不过如果你不自己测试着玩一般也没内网IP
     geoip {
                        source => "fromip"

2.5 修改Logstash并写入ES集群

$ vim /home/elk/logstash/conf/nginx-cache.conf

input {
    beats {
        host => "192.168.14.67"
        port => 5066
    }
}
filter {
            grok {
                 patterns_dir => "/home/elk/logstash/patterns"
                 match => {
                          "message" => "%{PROXY_HTTP_ACCESS}"
                 }
            }
            geoip {
                        source => "fromip"
                        target => "geoip"   #默认就是这个target,就是过滤的内容都写到这个字段下面
                        database => "/home/elk/logstash/conf/GeoLite2-City.mmdb"
                        fields => ["city_name", "country_code2", "country_name", "latitude", "longitude", "region_name"]  #指定自己所需的字段
                        remove_field => ["[geoip][latitude]", "[geoip][longitude]"]  #删除多余的字段
                        #fields => ["city_name", "country_code2", "country_name", "region_name"]  直接这样一句话也是可以达到上面两句话的效果的。

            }
}
output {
            elasticsearch {
                hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"]
                index => "nginx-cache-51niux-com-%{+YYYY.MM}"
            }
}

$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/nginx-cache.conf   #再次启动

# cat /opt/lzotest1.log |head -100 >>/usr/local/nginx/logs/access.log  #然后写点测试数据进去,然后查看下Kibana界面。

图片.png

图片.png

#通过Kibana界面查看最新的一条,可以看到在Kibana上面效果已经出来了。

博文来自:www.51niux.com

三、Kibana的设置

3.1 加一个缓存命中率的图形

饼图命中率图标:

图片.png

图片.png

图片.png

图片.png

图片.png

图片.png

图片.png

查看命中率图标:

图片.png
图片.png

3.2 设置地图映射用户位置

发现问题:

图片.png

No Compatible Fields: The "nginx*" index pattern does not contain any of the following field types: geo_point

#加地图的时候出现这个问题。

#原因链接:http://www.tonitech.com/2517.html 也好多地方又说。

#http://blog.csdn.net/yanggd1987/article/details/50469113     #你可以用head插件看模板索引格式,当然有点基础可以用Kibana自带的Dev Tools来查看。

#在Elasticsearch中,所有的数据都有一个类型,什么样的类型,就可以在其上做一些对应类型的特殊操作。geo信息中的location字段是经纬度,我们需要使用经纬度来定位地理位置;在Elasticsearch中,对于经纬度来说,要想使用Elasticsearch提供的地理位置查询相关的功能,就需要构造一个结构,并且将其类型属性设置为geo_point,此错误明显是由于我们的geo的location字段类型不是geo_point。

#关键点就是如果你不想改索引的话,就logstash这个索引引用的模板里面有geo_point这个类型。

往回导解决问题:

$vim /home/elk/logstash/conf/nginx-cache.conf

input {
    beats {
        host => "192.168.14.67"
        port => 5066
    }
}
filter {
            grok {
                 patterns_dir => "/home/elk/logstash/patterns"
                 match => {
                          "message" => "%{PROXY_HTTP_ACCESS}"
                 }
            }
            geoip {
                        source => "fromip"
                        target => "geoip"
                        database => "/home/elk/logstash/conf/GeoLite2-City.mmdb"
                        #fields => ["city_name", "country_code2", "country_name", "latitude", "longitude", "region_name"]
                        #fields => ["city_name", "country_code2", "country_name", "region_name"]
                        #remove_field => ["[geoip][latitude]", "[geoip][longitude]"]
                        add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] #这里就不能像上面那样把经纬度给删掉了,因为ES的map图形得依据经纬度来定位位置。
                        add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
                }
        mutate {
                convert => [ "[geoip][coordinates]", "float"]  
        }
}
output {
            elasticsearch {
                hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"]
                index => "logstash-nginx-cache-51niux-com-%{+YYYY.MM.dd}"   #主要是这里索引改掉,改成logstash开头的,我觉得主要是这个地方。
            }
}

$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/nginx-cache.conf    #重新启动一下

# curl -XDELETE 'http://192.168.14.64:9200/nginx-*'
# curl -XDELETE 'http://192.168.14.64:9200/logstash*'      #索引都删除掉,测试环境随便搞嘿嘿。

# tail -100000 /opt/lzotest1.log >>/usr/local/nginx/logs/access.log  #再来点数据重新测试一下。

博文来自:www.51niux.com

Kibana再次来创建一下:

图片.png

图片.png

图片.png

图片.png

#好了效果出来了,怎么保存这个图像就不说了右上角有save哈。

使用高德地图将地图地名显示成中文:

$# vim /home/elk/kibana/config/kibana.yml  #在文件底部加上下面一句话

tilemap.url: 'http://webrd02.is.autonavi.com/appmaptile?lang=zh_cn&size=1&scale=1&style=7&x={x}&y={y}&z={z}'

$/home/elk/kibana/bin/kibana  #kibana重新启动一下

图片.png

#出图上面已经可以了,可以看到颜色越深说明这个地点用户越多。

图片.png

#geoip.location里面包的经纬度。

作者:忙碌的柴少 分类:ELK 浏览:16881 评论:5
留言列表
彪彪
彪彪 也可以参考 IP2Location Elastic Stack
https://www.ip2location.com/tutorials/how-to-use-ip2location-filter-plugin-with-elastic-stack  回复
忙碌的柴少
忙碌的柴少 谢谢啊 昨天太晚了没写完今天补上  回复
你大爷
你大爷 厉害了博主  回复
随风
随风 高德地图已经不能这样用了  回复
忙碌的柴少
忙碌的柴少 这就是技术的更新迭代,麻烦分享下链接  回复
发表评论
来宾的头像