1行ごとに値の入ったjsonファイルをinfluxdbのfieldsに保存するプログラムを書いた
influxdbはgolangで書かれているようなので、golangで実行プログラムを作成してみた
環境
- go: 1.9.2
- influxdb: 1.3.7
準備
go get
influxdbのクライアントライブラリをインストール
go get "github.com/influxdata/influxdb/client/v2"
jsonファイル
1行ごとに値の入ったjsonファイルを用意
cat test.json
{"idle": 40.5, "system": 29.9, "user": 31.3}
{"idle": 56.8, "system": 7.7, "user": 47.9}
{"idle": 34.8, "system": 74.5, "user": 75.4}
{"idle": 15.7, "system": 2.3, "user": 41.9}
{"idle": 43.3, "system": 19.0, "user": 19.9}
{"idle": 9.9, "system": 96.7, "user": 86.5}
{"idle": 87.7, "system": 12.9, "user": 64.0}
{"idle": 4.3, "system": 55.1, "user": 80.3}
{"idle": 31.9, "system": 71.6, "user": 8.6}
{"idle": 48.7, "system": 49.1, "user": 86.6}
実装
main.go
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
"time"
client "github.com/influxdata/influxdb/client/v2"
)
const (
db = "test"
username = "bubba"
password = "bumblebeetuna"
meas = "cpu_usage"
)
type Point struct {
name string
tags map[string]string
fields map[string]interface{}
}
type Data struct {
Idle float64 `json:"idle"`
System float64 `json:"system"`
User float64 `json:"user"`
}
func runQuery(c client.Client, cmd string) {
q := client.NewQuery(cmd, db, "")
if res, err := c.Query(q); err != nil && res.Error() == nil {
fmt.Errorf("runQuery() error: %v: %s\n", res.Results, err)
}
}
func writeData(c client.Client, p Point) {
// Create a new point batch
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: db,
Precision: "",
})
if err != nil {
log.Fatal(err)
}
pt, err := client.NewPoint(p.name, p.tags, p.fields, time.Now())
if err != nil {
log.Fatal(err)
}
bp.AddPoint(pt)
// Write the batch
if err := c.Write(bp); err != nil {
log.Fatal(err)
}
}
func fromFile(filePath string) []string {
f, err := os.Open(filePath)
if err != nil {
fmt.Errorf("%s could not read: %v\n", filePath, err)
}
defer f.Close()
// read each line by Scanner()
lines := []string{}
scanner := bufio.NewScanner(f)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
if serr := scanner.Err(); serr != nil {
fmt.Fprintf(os.Stderr, "%s scanner error: %v\n", filePath, err)
}
return lines
}
func main() {
// Create a new HTTPClient
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
Username: username,
Password: password,
})
if err != nil {
log.Fatal(err)
}
defer c.Close()
// create database if not exists
runQuery(c, fmt.Sprintf("CREATE DATABASE %s", db))
// read json each line
lines := fromFile("test.json")
d := new(Data)
for _, v := range lines {
jsonBytes := ([]byte)(v)
if err := json.Unmarshal(jsonBytes, d); err != nil {
fmt.Errorf("JSON Unmarshal error: %s", err)
return
}
// insert data
p := Point{
name: meas,
tags: map[string]string{
"cpu": "cpu-total",
},
fields: map[string]interface{}{
"idle": d.Idle,
"system": d.System,
"user": d.User,
},
}
writeData(c, p)
}
}
確認
influxdb APIが
http://localhost:8086
go run main.go
curl -G -XGET 'http://localhost:8086/query?pretty=true' --data-urlencode "db=test" --data-urlencode 'q=SELECT "idle","system","user" FROM cpu_usage'
{
"results": [
{
"statement_id": 0,
"series": [
{
"name": "cpu_usage",
"columns": [
"time",
"idle",
"system",
"user"
],
"values": [
[
"2017-11-26T07:59:46.984294Z",
40.5,
29.9,
31.3
],
[
"2017-11-26T07:59:46.990029Z",
56.8,
7.7,
47.9
],
[
"2017-11-26T07:59:46.991943Z",
34.8,
74.5,
75.4
],
[
"2017-11-26T07:59:46.993493Z",
15.7,
2.3,
41.9
],
[
"2017-11-26T07:59:46.994823Z",
43.3,
19,
19.9
],
[
"2017-11-26T07:59:46.996119Z",
9.9,
96.7,
86.5
],
[
"2017-11-26T07:59:46.997135Z",
87.7,
12.9,
64
],
[
"2017-11-26T07:59:46.9983Z",
4.3,
55.1,
80.3
],
[
"2017-11-26T07:59:46.999264Z",
31.9,
71.6,
8.6
],
[
"2017-11-26T07:59:47.000413Z",
48.7,
49.1,
86.6
]
]
}
]
}
]
}
おまけ(jsonファイルの生成コマンド)
test.json
generate_json.sh
#!/bin/bash
#引数の数は1個
if [ $# -ne 1 ]; then
echo "arguments are $# items" 1>&2
echo "1 arguments(how many generate json row) required to execute" 1>&2
exit 1
fi
# 引数で指定した行数だけ生成
for l in `seq $1`; do
echo "{\"idle\": `awk 'BEGIN{ srand('"$RANDOM"'); printf("%2.1f", rand()*100) }'`, \
\"system\": `awk 'BEGIN{ srand('"$RANDOM"'); printf("%2.1f", rand()*100) }'`, \
\"user\": `awk 'BEGIN{ srand('"$RANDOM"'); printf("%2.1f", rand()*100) }'`\
}"
done
jsonファイルの生成
chmod +x generate_json.sh
./generate_json.sh 10 > test.json
cat test.json
{"idle": 40.5, "system": 29.9, "user": 31.3}
{"idle": 56.8, "system": 7.7, "user": 47.9}
{"idle": 34.8, "system": 74.5, "user": 75.4}
{"idle": 15.7, "system": 2.3, "user": 41.9}
{"idle": 43.3, "system": 19.0, "user": 19.9}
{"idle": 9.9, "system": 96.7, "user": 86.5}
{"idle": 87.7, "system": 12.9, "user": 64.0}
{"idle": 4.3, "system": 55.1, "user": 80.3}
{"idle": 31.9, "system": 71.6, "user": 8.6}
{"idle": 48.7, "system": 49.1, "user": 86.6}