Ingest CSV with Go into Cassandra
For a software project at university I recently had to implement an ingestion from a CSV dataset to an Apache Cassandra database. The task itself is not that difficult and can be nicely handled with the classical divide-and-conquer approach, so I decided to share it here.
We are going to:
- Read a CSV file line-by-line
- Process the records in each line
- Insert the values into the database
Here is a short snippet of the data we are going to use:
station_id,datapoint_id,alarm_id,event_time,value,valueThreshold,isActive
1161115055,121,308,2017-02-18 18:28:05 UTC,240,240,false
1161114056,143,312,2017-02-18 18:56:20 UTC,28.5,28,true
1161115060,141,312,2017-02-18 18:22:03 UTC,56.5,56,true
1161114088,121,308,2017-02-18 18:34:09 UTC,240,240,false
1161115090,141,312,2017-02-18 18:20:49 UTC,56,56,false
Let’s get started.
Read
The first task is to read the data from a given file.
We don’t want to read all data at once, but rather line-by-line to save memory usage, so the trivial ioutil
approach of slurping all data is not adequate.
For this purpose an io.Reader is an ideal solution.
It wraps the basic io.Read
primitive, keeps the current state and only reads the data we need.
But instead of reading each line with io.Reader
and then splitting it into fields, we can use the NewReader
function included in the csv
package which returns an io.Reader interface.
Then, a Read()
on that interface will return exactly one line of CSV record (already tokenized).
|
|
If you want to read the CSV file from an HTTP URL rather than a local file, we can simply swap the os.File
type against http.Request.Body
or http.Response.Body
, since both implement the io.Reader
interface.
Read CSV file from URL:
|
|
Get CSV from HTTP POST request:
|
|
Transform
After obtaining the CSV records, we still need to parse them into a native data format. For this task, we first create a struct that will hold our variables:
|
|
Then we parse all values in the CSV record and put them into the struct. The strconv package provides various function to convert strings to native data types. Before we start parsing though, we need to skip the first line of CSV input, since it may contain the CSV header.
Please note that in the example below there is no error checking after parsing. It has been omitted because it is quite repetitive and the concrete error handling depends on the application and data (it may be acceptable to set some values to zero, for example).
|
|
Insert
After we have made sure our data fits the variable types, we can insert it into the database.
The insertIntoDb
function takes an argument of the type CsvLine
(our struct), so we can be sure that all values are at least zero-initialized.
We are using the excellent gocql package to handle the database connection and queries. It also handles the automatic conversion from the Go data types to Cassandra data types (and back).
For the Query
function we first specify the format for the insertion and then the individual values (like a printf
function).
The gocql
package also supports more advanced data-binding features, but for this simple use-case manually specifying the columns is sufficient.
|
|
But when did we connect to the database? And where is that SESSION
variable coming from?
The main
function is doing the rest of the setup:
|
|
First, we get the configuration from the command line. The user first has to specify his data source (such as “./data/dataset.csv” or “http://127.0.0.1/dataset.csv") and then the address (either IP or hostname) of the Cassandra server. Note that for a cluster of Cassandra servers only a single IP is needed, since after the initial connection the client will auto-discover the other nodes in the cluster.
Next, we connect to the database by creating a session and making this connection available globally (since we are only using a single thread here).
Then we are parsing the name of the input file we are given. If it starts with “http”, we assume it is an HTTP URL and query that URL for the data. If not read, we simply read the file from disk.
Initialization
Note that before we can connect to the database host as shown above, we need to create a so called keyspace (aka database schema) in our Cassandra instance along with a table which has the appropriate data structure.
Connect to your Cassandra host with cqlsh
and run the following query:
create keyspace exampleCSV with replication = {
'class' : 'SimpleStrategy',
'replication_factor' : 2
};
create table exampleCSV.events(
stationid text,
datapointid int,
alarmid int,
eventtime text,
value double,
valueThreshold double,
active boolean,
PRIMARY KEY( (stationid, eventtime) )
);
create index on exampleCSV.events(stationid);
For details about what each of these configuration items mean, please refer to the Cassandra documentation.
Run
Now all that’s left to do is downloading the gocql
package and compiling the binary:
go get -u github.com/gocql/gocql
go build -o ingest .
./ingest dataset.csv 127.0.0.1
./ingest http://127.0.0.1:8080/dataset.csv cassandra1.example.com
Download the full source file here: main.go.
Feel free to use wherever you like, suggestions for improvement are very welcome!