Ingest CSV with Go into Cassandra

For a software project at university I recently had to implement an ingestion from a CSV dataset to an Apache Cassandra database. The task itself is not that difficult and can be nicely handled with the classical divide-and-conquer approach, so I decided to share it here.

We are going to:

  1. Read a CSV file line-by-line
  2. Process the records in each line
  3. Insert the values into the database

Here is a short snippet of the data we are going to use:

station_id,datapoint_id,alarm_id,event_time,value,valueThreshold,isActive
1161115055,121,308,2017-02-18 18:28:05 UTC,240,240,false
1161114056,143,312,2017-02-18 18:56:20 UTC,28.5,28,true
1161115060,141,312,2017-02-18 18:22:03 UTC,56.5,56,true
1161114088,121,308,2017-02-18 18:34:09 UTC,240,240,false
1161115090,141,312,2017-02-18 18:20:49 UTC,56,56,false

Let’s get started.

#  Read

The first task is to read the data from a given file. We don’t want to read all data at once, but rather line-by-line to save memory usage, so the trivial ioutil approach of slurping all data is not adequate. For this purpose an io.Reader is an ideal solution. It wraps the basic io.Read primitive, keeps the current state and only reads the data we need. But instead of reading each line with io.Reader and then splitting it into fields, we can use the NewReader function included in the csv package which returns an io.Reader interface. Then, a Read() on that interface will return exactly one line of CSV record (already tokenized).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 1. Read a CSV file line-by-line (from local file)
func readFromFile(filepath string) (err error) {
	file, err := os.Open(filepath)
	if err != nil {
		return err
	}
	defer file.Close()

	reader := csv.NewReader(file)
	for {
		record, err := reader.Read()
		if err == io.EOF {
			break
		}
		if err != nil {
			fmt.Printf("%s\n", err)
            continue
		}

		processRecord(record)
	}

	return nil
}

If you want to read the CSV file from an HTTP URL rather than a local file, we can simply swap the os.File type against http.Request.Body or http.Response.Body, since both implement the io.Reader interface.

Read CSV file from URL:

1
2
3
4
5
6
7
8
9
func readFromUrl(url string) (err error) {
    resp, err := http.Get(url)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
	reader := csv.NewReader(resp.Body)
...
}

Get CSV from HTTP POST request:

1
2
3
4
5
6
http.HandleFunc("/create/csv", readFromRequest)

func readFromRequest(w http.ResponseWriter, r *http.Request) {
	reader := csv.NewReader(r.Body)
...
}

#  Transform

After obtaining the CSV records, we still need to parse them into a native data format. For this task, we first create a struct that will hold our variables:

1
2
3
4
5
6
7
8
9
type CsvLine struct {
	StationId      string
	DatapointId    int
	AlarmId        int
	EventTime      string
	Value          float64
	ValueThreshold float64
	IsActive       bool
}

Then we parse all values in the CSV record and put them into the struct. The strconv package provides various function to convert strings to native data types. Before we start parsing though, we need to skip the first line of CSV input, since it may contain the CSV header.

Please note that in the example below there is no error checking after parsing. It has been omitted because it is quite repetitive and the concrete error handling depends on the application and data (it may be acceptable to set some values to zero, for example).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 2. Process the records in each line
func processRecord(line []string) {
	if len(line) < 7 {
		fmt.Println("Invalid length, discarding line...")
		return
	}

	if line[0] == "station_id" {
		// ignore first line
		return
	}

    // note: error checking omitted for brevity
	stationId := line[0]
	datapointId, _ := strconv.Atoi(line[1])
	alarmId, _ := strconv.Atoi(line[2])
	eventTime := line[3]
	value, _ := strconv.ParseFloat(line[4], 64)
	valueThreshold, _ := strconv.ParseFloat(line[5], 64)
	active, _ := strconv.ParseBool(line[6])

	buf := CsvLine{
		StationId:      stationId,
		DatapointId:    datapointId,
		AlarmId:        alarmId,
		EventTime:      eventTime,
		Value:          value,
		ValueThreshold: valueThreshold,
		IsActive:       active,
	}

	insertIntoDb(buf)
}

#  Insert

After we have made sure our data fits the variable types, we can insert it into the database. The insertIntoDb function takes an argument of the type CsvLine (our struct), so we can be sure that all values are at least zero-initialized.

We are using the excellent gocql package to handle the database connection and queries. It also handles the automatic conversion from the Go data types to Cassandra data types (and back).

For the Query function we first specify the format for the insertion and then the individual values (like a printf function). The gocql package also supports more advanced data-binding features, but for this simple use-case manually specifying the columns is sufficient.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 3. Insert the values into the database
func insertIntoDb(record CsvLine) {
	err := SESSION.Query(`INSERT INTO events (stationid, datapointid, alarmid, eventtime, value, valuethreshold, active) VALUES (?, ?, ?, ?, ?, ?, ?)`,
		record.StationId,
		record.DatapointId,
		record.AlarmId,
		record.EventTime,
		record.Value,
		record.ValueThreshold,
		record.IsActive,
	).Exec()
	if err != nil {
		fmt.Printf("Insert failed: %s\n", err)
	}
}

But when did we connect to the database? And where is that SESSION variable coming from?

The main function is doing the rest of the setup:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func main() {
	// 1. get configuration from command line
	if len(os.Args) < 3 {
		log.Fatalf("Usage: %s URI CASSANDRASERVER\n", os.Args[0])
	}
	datasource := os.Args[1]
    cassandraUri := os.Args[2]

	// 2. connect to database
	cluster := gocql.NewCluster(cassandraUri)
	cluster.Keyspace = "exampleCSV"
	cluster.Consistency = gocql.Quorum
	session, err := cluster.CreateSession()
	if err != nil {
		log.Fatal(err)
	}
	defer session.Close()
	// make global
	SESSION = session

    // 3. read data from appropriate source
	if strings.HasPrefix(datasource, "http") {
		err = readFromUrl(datasource)
	} else {
		err = readFromFile(datasource)
	}
	if err != nil {
		log.Fatal(err)
	}
	return
}

First, we get the configuration from the command line. The user first has to specify his data source (such as “./data/dataset.csv” or “http://127.0.0.1/dataset.csv”) and then the address (either IP or hostname) of the Cassandra server. Note that for a cluster of Cassandra servers only a single IP is needed, since after the initial connection the client will auto-discover the other nodes in the cluster.

Next, we connect to the database by creating a session and making this connection available globally (since we are only using a single thread here).

Then we are parsing the name of the input file we are given. If it starts with “http”, we assume it is an HTTP URL and query that URL for the data. If not read, we simply read the file from disk.

#  Initialization

Note that before we can connect to the database host as shown above, we need to create a so called keyspace (aka database schema) in our Cassandra instance along with a table which has the appropriate data structure. Connect to your Cassandra host with cqlsh and run the following query:

create keyspace exampleCSV with replication = {
       'class' : 'SimpleStrategy',
       'replication_factor' : 2
       };
create table exampleCSV.events(
       stationid text,
       datapointid int,
       alarmid int,
       eventtime text,
       value double,
       valueThreshold double,
       active boolean,
       PRIMARY KEY( (stationid, eventtime) )
       );
create index on exampleCSV.events(stationid);

For details about what each of these configuration items mean, please refer to the Cassandra documentation.

#  Run

Now all that’s left to do is downloading the gocql package and compiling the binary:

go get -u github.com/gocql/gocql
go build -o ingest .
./ingest dataset.csv 127.0.0.1
./ingest http://127.0.0.1:8080/dataset.csv cassandra1.example.com

Download the full source file here: main.go.

Feel free to use wherever you like, suggestions for improvement are very welcome!