本文共 4417 字,大约阅读时间需要 14 分钟。
3.4.3 创建模板文件
Logstash的工作是从MySQL中读取数据,向ES中创建索引,这里需要提前创建mapping的模板文件以便logstash 使用。在logstach的config目录创建xc_course_template.json,内容如下:本教程的xc_course_template.json目录是:D:/ElasticSearch/logstash-6.2.1/config/xc_course_template.json[mw_shl_code=applescript,true] { "mappings" : { "doc" : { "properties" : { "charge" : { "type" : "keyword" }, "description" : { "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart", "type" : "text" }, "end_time" : { "format" : "yyyy‐MM‐dd HH:mm:ss", "type" : "date" }, "expires" : { "format" : "yyyy‐MM‐dd HH:mm:ss", "type" : "date" }, "grade" : { "type" : "keyword" }, "id" : { "type" : "keyword" }, "mt" : { "type" : "keyword" }, "name" : {[/mw_shl_code][mw_shl_code=applescript,true] "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart", "type" : "text" }, "pic" : { "index" : false, "type" : "keyword" }, "price" : { "type" : "float" }, "price_old" : { "type" : "float" }, "pub_time" : { "format" : "yyyy‐MM‐dd HH:mm:ss", "type" : "date" }, "qq" : { "index" : false, "type" : "keyword" }, "st" : { "type" : "keyword" }, "start_time" : { "format" : "yyyy‐MM‐dd HH:mm:ss", "type" : "date" }, "status" : { "type" : "keyword" }, "studymodel" : { "type" : "keyword" }, "teachmode" : { "type" : "keyword" }, "teachplan" : { "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart", "type" : "text" }, "users" : { "index" : false, "type" : "text" }, "valid" : { "type" : "keyword" } } } },[/mw_shl_code][mw_shl_code=applescript,true] "template" : "xc_course" } [/mw_shl_code]3.4.4 配置mysql.conf 在logstash的config目录下配置mysql.conf文件供logstash使用,logstash会根据mysql.conf文件的配置的地址从 MySQL中读取数据向ES中写入索引。 参考https://www.elastic.co/guide/en/ ... ns-inputs-jdbc.html配置输入数据源和输出数据源。[mw_shl_code=applescript,true] "template" : "xc_course" } input { stdin { } jdbc { jdbc_connection_string => "jdbc:mysql://localhost:3306/xc_course? useUnicode=true&characterEncoding=utf‐8&useSSL=true&serverTimezone=UTC" # the user we wish to excute our statement as jdbc_user => "root" jdbc_password => mysql # the path to our downloaded jdbc driver jdbc_driver_library => "F:/develop/maven/repository3/mysql/mysql‐connector‐java/5.1.41/mysqlconnector‐java‐5.1.41.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" #要执行的sql文件 #statement_filepath => "/conf/course.sql" statement => "select * from course_pub where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)" #定时配置 schedule => "* * * * *" record_last_run => true last_run_metadata_path => "D:/ElasticSearch/logstash‐6.2.1/config/logstash_metadata" } } output { elasticsearch { #ES的ip地址和端口 hosts => "localhost:9200" #hosts => ["localhost:9200","localhost:9202","localhost:9203"] #ES索引库名称 index => "xc_course" document_id => "%{id}"[/mw_shl_code][mw_shl_code=applescript,true]document_type => "doc" template =>"D:/ElasticSearch/logstash‐6.2.1/config/xc_course_template.json" template_name =>"xc_course" template_overwrite =>"true" } stdout { #日志输出 codec => json_lines } } [/mw_shl_code]
说明:
1、ES采用UTC时区问题 ES采用UTC 时区,比北京时间早8小时,所以ES读取数据时让最后更新时间加8小时 where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)2、logstash每个执行完成会在D:/ElasticSearch/logstash-6.2.1/config/logstash_metadata记录执行时间下次以此 时间为基准进行增量同步数据到索引库。转载于:https://blog.51cto.com/13517854/2397751