ELK Stack

[Elasticsearch & Kibana&Logstash] Linux MySQL data 준 실시간 시각화 하기

땅콩새싹 2020. 11. 4. 20:23
반응형

Linux MySQL data 준 실시간 시각화 하기

 


 

1. Logstash 다운로드

지금까지 데이터를 전송한 후 csv파일을 만들어서 데이터 시각화를 해 보았는데 이번에는 데이터를 전송할 때마다 준 실시간으로 시각화를 해 보겠습니다.

linux에 Logstash를 install 해 줍니다.

~$ sudo apt-get install logstash

 

 

2. DB-connector 다운로드

DB-connector을 다운로드합니다.

~$ wget 'https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.18.tar.gz'


다운로드한 압축 파일을 풀어줍니다.

~$ tar -xvf ./mysql-connector-java-8.0.18.tar.gz


압축을 풀고 'mysql-connector-java-8.0.18' 폴더 안의 'mysql-connector-java-8.0.18.jar' 파일을 logstash 폴더 안의 tools 폴더 안으로 옮겨줍니다.

~$ cd mysql-connector-java-8.0.18

$ sudo cp mysql-connector-java-8.0.18.jar /usr/share/logstash/tools/

 

 

3. DB의 table column 수정

저번에 만들어 놓은 외부 접속 권한을 허용해 준 MySQL 계정에 접속해 줍니다.

그리고 use '데이터베이스'; 명령어로 데이터베이스를 선택해 줍니다.

저번에 만든 테이블 명이 value1이었고 index로 설정해 주었던 id값을 이제는 준 실시간으로 자동으로 올라가게 해 주기 위해 설정값을 수정하겠습니다. (auto_increment 사용)

alter table value1 modify column id int not null auto_increment;

 

그리고 데이터베이스에 값이 들어온 현재 시간과 마지막으로 값이 입력된 날짜를 추가해주는 테이블을 새로 만들어 줍니다.

alter table value1 add column modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;

alter table value1 add column insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP;

테이블을 추가해 주는 문입니다.

 

select * from value1; (value1은 table 이름입니다.)

을 통해서 modification_time과 insertion_time이 추가된 것을 확인할 수 있습니다.

 

 

4. logstash conf 파일 설정 방법

다음은 logstash conf 설정을 해 주어야 합니다.

우분투 콘솔 창을 새롭게 열어서 logstash라는 디렉터리를 새로 만들어 주겠습니다.

mkdir logstash

그리고 logstash라는 디렉터리 안에 confs라는 디렉터리를 만들어 주겠습니다.

cd logstash

mkdir confs

cd confs

confs 디렉터리 안에서 db.conf라는 conf 파일을 만들어 주겠습니다.

nano db.conf

그러고 나서 아래의 로그스태시 파이프라인 코드를 복사해서 db.conf 파일에 붙여 넣습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/tools/mysql-connector-java-8.0.18.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:9876/sensor_data?characterEncoding=UTF-8&serverTimezone=UTC"
    jdbc_user => "raspberryPi"
    jdbc_password => "password"
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM value1 WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id""@version""unix_ts_in_secs"]
  }
}
output {
  # stdout { codec =>  "rubydebug"}
  elasticsearch {
      index => "sensor_value"
      document_id => "%{[@metadata][_id]}"
  }
}

맨 윗줄에 DB-connector를 다운로드한 버전에 맞춰서 입력합니다.

그리고 앞에 그 파일이 있는 경로를 입력해 줍니다.

그리고 localhost:9876 부분에 자신이 포트포워딩을 해준 포트 번호를 입력하고 그 뒤에 자신이 만든 데이터베이스 이름을 입력해 줍니다. 저는 9876 포트와 sensor_data라는 데이터베이스를 사용했습니다.

jdbc_user에는 외부 접속을 허용해 준 MySQL의 계정 이름을 입력해 줍니다. 그리고 그 계정에 맞는 password도 입력해 줍니다.

그리고 statement문에서 value1이라는 부분에 자신이 만들어 준 table 이름을 입력해 줍니다. 저는 value1이라고 table을 생성했습니다.

그리고 elasticsearch에서 index를 어떤 이름으로 사용할지 입력해 주고 db.conf 파일을 저장해 줍니다.

 

 

5. OpenJdk 설치

Logstash에는 java 8이 필요합니다. 또는 오픈소스 배포판(ex:OpenJdk)을 사용합니다.

그렇기 때문에 OpenJdk를 설치해 주겠습니다.

경로를 /usr/share/logstash/bin으로 이동해 줍니다.

$ sudo apt install openjdk-11-jdk

$ sudo ./logstash -f ~/logstash/confs/db.conf

 

 

6. raspberrypi query문 작성

데이터베이스로 값을 전송해 줄 query문을 작성합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from random import *
import time
import pymysql
 
count = 0
sensor_value = 0
 
while(1):
    i = randint(100,999)
    sensor_value=sensor_value+i
    time.sleep(1)
    count=count+1
 
    if count==5:
        conn = pymysql.Connect(host='IPv4 주소', user='사용자 계정',
            password='사용자 계정 비밀번호', db='사용할 DB 이름', charset='utf8',
            port=설정해 준 포트 번호)
        curr = conn.cursor()
        res=sensor_value/5
        sql = "insert into value1 values(NULL"+", "+"'192.168.xx.xx', '"+str(res)+"', NULL, NULL)"
 
        print(sql)
        print(curr.execute(sql))
        conn.commit()
        count=0
        sensor_value=0
        index += 1
        curr.close()
        conn.close()
        conn = None
        curr = None
 

 

정상적으로 동작하고 있습니다.

 

 

7. 시각화 하기

elasticsearch와 kibana 서버를 열어주고 localhost:5601로 접속해 줍니다.

 

1


2


아까 conf 파일에서 입력해 주었던 elasticsearch index 이름


3


4


5


6


7


Y-axis
X-axis

Update


 

보고싶은 그래프로 설정해 줍니다.


Refresh 시간 설정

15초마다 update 됩니다.


데이터베이스로 들어간 Data값이 15초마다 update 됩니다.

반응형