Skip to content

Software Development Blogs: Programming, Software Testing, Agile Project Management

Methods & Tools

Subscribe to Methods & Tools
if you are not afraid to read more than one page to be a smarter software developer, software tester or project manager!

Mark Needham
Syndicate content
Thoughts on Software Development
Updated: 4 hours 31 min ago

Spark: Write to CSV file with header using saveAsFile

Sun, 11/30/2014 - 09:21

In my last blog post I showed how to write to a single CSV file using Spark and Hadoop and the next thing I wanted to do was add a header row to the resulting row.

Hadoop’s FileUtil#copyMerge function does take a String parameter but it adds this text to the end of each partition file which isn’t quite what we want.

However, if we copy that function into our own FileUtil class we can restructure it to do what we want:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
 
public class MyFileUtil {
    public static boolean copyMergeWithHeader(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String header) throws IOException {
        dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
        if(!srcFS.getFileStatus(srcDir).isDir()) {
            return false;
        } else {
            FSDataOutputStream out = dstFS.create(dstFile);
            if(header != null) {
                out.write((header + "\n").getBytes("UTF-8"));
            }
 
            try {
                FileStatus[] contents = srcFS.listStatus(srcDir);
 
                for(int i = 0; i < contents.length; ++i) {
                    if(!contents[i].isDir()) {
                        FSDataInputStream in = srcFS.open(contents[i].getPath());
 
                        try {
                            IOUtils.copyBytes(in, out, conf, false);
 
                        } finally {
                            in.close();
                        }
                    }
                }
            } finally {
                out.close();
            }
 
            return deleteSource?srcFS.delete(srcDir, true):true;
        }
    }
 
    private static Path checkDest(String srcName, FileSystem dstFS, Path dst, boolean overwrite) throws IOException {
        if(dstFS.exists(dst)) {
            FileStatus sdst = dstFS.getFileStatus(dst);
            if(sdst.isDir()) {
                if(null == srcName) {
                    throw new IOException("Target " + dst + " is a directory");
                }
 
                return checkDest((String)null, dstFS, new Path(dst, srcName), overwrite);
            }
 
            if(!overwrite) {
                throw new IOException("Target " + dst + " already exists");
            }
        }
        return dst;
    }
}

We can then update our merge function to call this instead:

def merge(srcPath: String, dstPath: String, header:String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  MyFileUtil.copyMergeWithHeader(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, header)
}

We call merge from our code like this:

merge(file, destinationFile, "type,count")

I wasn’t sure how to import my Java based class into the Spark shell so I compiled the code into a JAR and submitted it as a job instead:

$ sbt package
[info] Loading global plugins from /Users/markneedham/.sbt/0.13/plugins
[info] Loading project definition from /Users/markneedham/projects/spark-play/playground/project
[info] Set current project to playground (in build file:/Users/markneedham/projects/spark-play/playground/)
[info] Compiling 3 Scala sources to /Users/markneedham/projects/spark-play/playground/target/scala-2.10/classes...
[info] Packaging /Users/markneedham/projects/spark-play/playground/target/scala-2.10/playground_2.10-1.0.jar ...
[info] Done packaging.
[success] Total time: 8 s, completed 30-Nov-2014 08:12:26
 
$ time ./bin/spark-submit --class "WriteToCsvWithHeader" --master local[4] /path/to/playground/target/scala-2.10/playground_2.10-1.0.jar
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.propertie
...
14/11/30 08:16:15 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
14/11/30 08:16:15 INFO SparkContext: Job finished: saveAsTextFile at WriteToCsvWithHeader.scala:49, took 0.589036 s
 
real	0m13.061s
user	0m38.977s
sys	0m3.393s

And if we look at our destination file:

$ cat /tmp/singlePrimaryTypes.csv
type,count
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
RITUALISM,12
NON-CRIMINAL,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

Happy days!

The code is available as a gist if you want to see all the details.

Categories: Programming

Spark: Write to CSV file

Sun, 11/30/2014 - 08:40

A couple of weeks ago I wrote how I’d been using Spark to explore a City of Chicago Crime data set and having worked out how many of each crime had been committed I wanted to write that to a CSV file.

Spark provides a saveAsTextFile function which allows us to save RDD’s so I refactored my code into the following format to allow me to use that:

import au.com.bytecode.opencsv.CSVParser
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
 
def dropHeader(data: RDD[String]): RDD[String] = {
  data.mapPartitionsWithIndex((idx, lines) => {
    if (idx == 0) {
      lines.drop(1)
    }
    lines
  })
}
 
// https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2
val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
 
val crimeData = sc.textFile(crimeFile).cache()
val withoutHeader: RDD[String] = dropHeader(crimeData)
 
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
 
val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => {
  val parser = new CSVParser(',')
  lines.map(line => {
    val columns = parser.parseLine(line)
    (columns(5), 1)
  })
})
 
val counts = partitions.
  reduceByKey {case (x,y) => x + y}.
  sortBy {case (key, value) => -value}.
  map { case (key, value) => Array(key, value).mkString(",") }
 
counts.saveAsTextFile(file)

If we run that code from the Spark shell we end up with a folder called /tmp/primaryTypes.csv containing multiple part files:

$ ls -lah /tmp/primaryTypes.csv/
total 496
drwxr-xr-x  66 markneedham  wheel   2.2K 30 Nov 07:17 .
drwxrwxrwt  80 root         wheel   2.7K 30 Nov 07:16 ..
-rw-r--r--   1 markneedham  wheel     8B 30 Nov 07:16 ._SUCCESS.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00000.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00001.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00002.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00003.crc
...
-rwxrwxrwx   1 markneedham  wheel     0B 30 Nov 07:16 _SUCCESS
-rwxrwxrwx   1 markneedham  wheel    28B 30 Nov 07:16 part-00000
-rwxrwxrwx   1 markneedham  wheel    17B 30 Nov 07:16 part-00001
-rwxrwxrwx   1 markneedham  wheel    23B 30 Nov 07:16 part-00002
-rwxrwxrwx   1 markneedham  wheel    16B 30 Nov 07:16 part-00003
...

If we look at some of those part files we can see that it’s written the crime types and counts as expected:

$ cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
 
$ cat /tmp/primaryTypes.csv/part-00003
BURGLARY,257310

This is fine if we’re going to pass those CSV files into another Hadoop based job but I actually want a single CSV file so it’s not quite what I want.

One way to achieve this is to force everything to be calculated on one partition which will mean we only get one part file generated:

val counts = partitions.repartition(1).
  reduceByKey {case (x,y) => x + y}.
  sortBy {case (key, value) => -value}.
  map { case (key, value) => Array(key, value).mkString(",") }
 
 
counts.saveAsTextFile(file)

part-00000 now looks like this:

$ cat !$
cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
NON-CRIMINAL,12
RITUALISM,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

This works but it’s quite a bit slower than when we were doing the aggregation across partitions so it’s not ideal.

Instead, what we can do is make use of one of Hadoop’s merge functions which squashes part files together into a single file.

First we import Hadoop into our SBT file:

libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2"

Now let’s bring our merge function into the Spark shell:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
 
def merge(srcPath: String, dstPath: String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}

And now let’s make use of it:

val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
 
val destinationFile= "/tmp/singlePrimaryTypes.csv"
FileUtil.fullyDelete(new File(destinationFile))
 
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
 
counts.saveAsTextFile(file)
 
merge(file, destinationFile)

And now we’ve got the best of both worlds:

$ cat /tmp/singlePrimaryTypes.csv
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
RITUALISM,12
NON-CRIMINAL,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

The full code is available as a gist if you want to play around with it.

Categories: Programming

Docker/Neo4j: Port forwarding on Mac OS X not working

Thu, 11/27/2014 - 13:28

Prompted by Ognjen Bubalo’s excellent blog post I thought it was about time I tried running Neo4j on a docker container on my Mac Book Pro to make it easier to play around with different data sets.

I got the container up and running by following Ognien’s instructions and had the following ports forwarded to my host machine:

$ docker ps
CONTAINER ID        IMAGE                 COMMAND                CREATED             STATUS              PORTS                                              NAMES
c62f8601e557        tpires/neo4j:latest   "/bin/bash -c /launc   About an hour ago   Up About an hour    0.0.0.0:49153->1337/tcp, 0.0.0.0:49154->7474/tcp   neo4j

This should allow me to access Neo4j on port 49154 but when I tried to access that host:port pair I got a connection refused message:

$ curl -v http://localhost:49154
* Adding handle: conn: 0x7ff369803a00
* Adding handle: send: 0
* Adding handle: recv: 0
* Curl_addHandleToPipeline: length: 1
* - Conn 0 (0x7ff369803a00) send_pipe: 1, recv_pipe: 0
* About to connect() to localhost port 49154 (#0)
*   Trying ::1...
*   Trying 127.0.0.1...
*   Trying fe80::1...
* Failed connect to localhost:49154; Connection refused
* Closing connection 0
curl: (7) Failed connect to localhost:49154; Connection refused

My first thought was the maybe Neo4j hadn’t started up correctly inside the container so I checked the logs:

$ docker logs --tail=10 c62f8601e557
10:59:12.994 [main] INFO  o.e.j.server.handler.ContextHandler - Started o.e.j.w.WebAppContext@2edfbe28{/webadmin,jar:file:/usr/share/neo4j/system/lib/neo4j-server-2.1.5-static-web.jar!/webadmin-html,AVAILABLE}
10:59:13.449 [main] INFO  o.e.j.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@192efb4e{/db/manage,null,AVAILABLE}
10:59:13.699 [main] INFO  o.e.j.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@7e94c035{/db/data,null,AVAILABLE}
10:59:13.714 [main] INFO  o.e.j.w.StandardDescriptorProcessor - NO JSP Support for /browser, did not find org.apache.jasper.servlet.JspServlet
10:59:13.715 [main] INFO  o.e.j.server.handler.ContextHandler - Started o.e.j.w.WebAppContext@3e84ae71{/browser,jar:file:/usr/share/neo4j/system/lib/neo4j-browser-2.1.5.jar!/browser,AVAILABLE}
10:59:13.807 [main] INFO  o.e.j.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@4b6690b1{/,null,AVAILABLE}
10:59:13.819 [main] INFO  o.e.jetty.server.ServerConnector - Started ServerConnector@495350f0{HTTP/1.1}{c62f8601e557:7474}
10:59:13.900 [main] INFO  o.e.jetty.server.ServerConnector - Started ServerConnector@23ad0c5a{SSL-HTTP/1.1}{c62f8601e557:7473}
2014-11-27 10:59:13.901+0000 INFO  [API] Server started on: http://c62f8601e557:7474/
2014-11-27 10:59:13.902+0000 INFO  [API] Remote interface ready and available at [http://c62f8601e557:7474/]

Nope! It’s up and running perfectly fine which suggested the problemw was with port forwarding.

I eventually found my way to Chris Jones’ ‘How to use Docker on OS X: The Missing Guide‘ which explained the problem:

The Problem: Docker forwards ports from the container to the host, which is boot2docker, not OS X.

The Solution: Use the VM’s IP address.

So to access Neo4j on my machine I need to use the VM’s IP address rather than localhost. We can get the VM’s IP address like so:

$ boot2docker ip
 
The VM's Host only interface IP address is: 192.168.59.103

Let’s strip out that surrounding text though:

$ boot2docker ip 2> /dev/null
192.168.59.103

Now if we cURL using that IP instead:

$ curl -v http://192.168.59.103:49154
* About to connect() to 192.168.59.103 port 49154 (#0)
*   Trying 192.168.59.103...
* Adding handle: conn: 0x7fd794003a00
* Adding handle: send: 0
* Adding handle: recv: 0
* Curl_addHandleToPipeline: length: 1
* - Conn 0 (0x7fd794003a00) send_pipe: 1, recv_pipe: 0
* Connected to 192.168.59.103 (192.168.59.103) port 49154 (#0)
> GET / HTTP/1.1
> User-Agent: curl/7.30.0
> Host: 192.168.59.103:49154
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: application/json; charset=UTF-8
< Access-Control-Allow-Origin: *
< Content-Length: 112
* Server Jetty(9.0.5.v20130815) is not blacklisted
< Server: Jetty(9.0.5.v20130815)
<
{
  "management" : "http://192.168.59.103:49154/db/manage/",
  "data" : "http://192.168.59.103:49154/db/data/"
* Connection #0 to host 192.168.59.103 left intact

Happy days!

Chris has solutions to lots of other common problems people come across when using Docker with Mac OS X so it’s worth having a flick through his post.

Categories: Programming

R: dplyr – Select ‘random’ rows from a data frame

Wed, 11/26/2014 - 01:01

Frequently I find myself wanting to take a sample of the rows in a data frame where just taking the head isn’t enough.

Let’s say we start with the following data frame:

data = data.frame(
    letter = sample(LETTERS, 50000, replace = TRUE),
    number = sample (1:10, 50000, replace = TRUE)
    )

And we’d like to sample 10 rows to see what it contains. We’ll start by generating 10 random numbers to represent row numbers using the runif function:

> randomRows = sample(1:length(data[,1]), 10, replace=T)
> randomRows
 [1]  8723 18772  4964 36134 27467 31890 16313 12841 49214 15621

We can then pass that list of row numbers into dplyr’s slice function like so:

> data %>% slice(randomRows)
   letter number
1       Z      4
2       F      1
3       Y      6
4       R      6
5       Y      4
6       V     10
7       R      6
8       D      6
9       J      7
10      E      2

If we’re using that code throughout our code then we might want to pull out a function like so:

pickRandomRows = function(df, numberOfRows = 10) {
  df %>% slice(runif(numberOfRows,0, length(df[,1])))
}

And then call it like so:

> data %>% pickRandomRows()
   letter number
1       W      5
2       Y      3
3       E      6
4       Q      8
5       M      9
6       H      9
7       E     10
8       T      2
9       I      5
10      V      4
 
> data %>% pickRandomRows(7)
  letter number
1      V      7
2      N      4
3      W      1
4      N      8
5      G      7
6      V      1
7      N      7
Update

Antonios pointed out via email that we could just make use of the in-built sample_n function which I didn’t know about until now:

> data %>% sample_n(10)
      letter number
29771      U      1
48666      T     10
30635      A      1
34865      X      7
20140      A      3
41715      T     10
43786      E     10
18284      A      7
21406      S      8
35542      J      8
Categories: Programming

R: dplyr – “Variables not shown”

Sun, 11/23/2014 - 02:02

I recently ran into a problem where the result of applying some operations to a data frame wasn’t being output the way I wanted.

I started with this data frame:

words = function(numberOfWords, lengthOfWord) {
  w = c(1:numberOfWords)  
  for(i in 1:numberOfWords) {
    w[i] = paste(sample(letters, lengthOfWord, replace=TRUE), collapse = "")
  }
  w
}
 
numberOfRows = 100
df = data.frame(a = sample (1:numberOfRows, 10, replace = TRUE),
                b = sample (1:numberOfRows, 10, replace = TRUE),
                name = words(numberOfRows, 10))

I wanted to group the data frame by a and b and output a comma separated list of the associated names. I started with this:

> df %>% 
    group_by(a,b) %>%
    summarise(n = n(), words = paste(name, collapse = ",")) %>%
    arrange(desc(n)) %>%
    head(5)
 
Source: local data frame [5 x 4]
Groups: a
 
   a  b  n
1 19 90 10
2 24 36 10
3 29 20 10
4 29 80 10
5 62 54 10
Variables not shown: words (chr)

Unfortunately the words column has been excluded and I came across this Stack Overflow post which suggested that the print.tbl_df function was the one responsible for filtering columns.

Browsing the docs I found a couple of ways to overwrite this behaviour:

> df %>% 
    group_by(a,b) %>%
    summarise(n = n(), words = paste(name, collapse = ",")) %>%
    arrange(desc(n)) %>%
    head(5) %>%
    print(width = Inf)

or

> options(dplyr.width = Inf)
> df %>% 
    group_by(a,b) %>%
    summarise(n = n(), words = paste(name, collapse = ",")) %>%
    arrange(desc(n)) %>%
    head(5)

And now we see this output instead:

Source: local data frame [5 x 4]
Groups: a
 
   a  b  n                                                                                                         words
1 19 90 10 dfhtcgymxt,zpemxbpnri,rfmkksuavp,jxaarxzdzd,peydpxjizc,trdzchaxiy,arthnxbaeg,kjbpdvvghm,kpvsddlsua,xmysfcynxw
2 24 36 10 wtokzdfecx,eprsvpsdcp,kzgxtwnqli,jbyuicevrn,klriuenjzu,qzgtmkljoy,bonbhmqfaz,uauoybprrl,rzummfbkbx,icyeorwzxl
3 29 20 10 ebubytlosp,vtligdgvqw,ejlqonhuit,jwidjvtark,kmdzcalblg,qzrlewxcsr,eckfgjnkys,vfdaeqbfqi,rumblliqmn,fvezcdfiaz
4 29 80 10 wputpwgayx,lpawiyhzuh,ufykwguynu,nyqnwjallh,abaxicpixl,uirudflazn,wyynsikwcl,usescualww,bkvsowfaab,gfhyifzepx
5 62 54 10 beuegfzssp,gfmegjtrys,wkubhvnkkk,rkhgprxttb,cwsrzulnpo,hzkvjbiywc,gbmiupnlbw,gffovxwtok,uxadfrjvdn,aojjfhxygs

Much better!

Categories: Programming

R: ggmap – Overlay shapefile with filled polygon of regions

Mon, 11/17/2014 - 01:53

I’ve been playing around with plotting maps in R over the last week and got to the point where I wanted to have a google map in the background with a filled polygon on a shapefile in the foreground.

The first bit is reasonably simple – we can just import the ggmap library and make a call to get_map:

> library(ggmap)
> sfMap = map = get_map(location = 'San Francisco', zoom = 12)
2014 11 17 00 27 11

Next I wanted to show the outlines of the different San Francisco zip codes and came across a blog post by Paul Bidanset on Baltimore neighbourhoods which I was able to adapt.

I downloaded a shapefile of San Francisco’s zip codes from the DataSF website and then loaded it into R using the readOGR and spTransform functions from the rgdal package:

> library(rgdal)
> library(ggplot2)
> sfn = readOGR(".","sfzipcodes") %>% spTransform(CRS("+proj=longlat +datum=WGS84"))
> ggplot(data = sfn, aes(x = long, y = lat, group = group)) + geom_path()
2014 11 17 00 38 32

sfn is a spatial type of data frame…

> class(sfn)
[1] "SpatialPolygonsDataFrame"
attr(,"package")
[1] "sp"

…but we need a normal data frame to be able to easily merge other data onto the map and then plot it. We can use ggplot2’s fortify command to do this:

> names(sfn)
[1] "OBJECTID" "ZIP_CODE" "ID"   
 
> sfn.f = sfn %>% fortify(region = 'ZIP_CODE')
 
SFNeighbourhoods  = merge(sfn.f, sfn@data, by.x = 'id', by.y = 'ZIP_CODE')

I then made up some fake values for each zip code so that we could have different colour shadings for each zip code on the visualisation:

> library(dplyr) 
 
> postcodes = SFNeighbourhoods %>% select(id) %>% distinct()
 
> values = data.frame(id = c(postcodes),
                      value = c(runif(postcodes %>% count() %>% unlist(),5.0, 25.0)))

I then merged those values onto SFNeighbourhoods:

> sf = merge(SFNeighbourhoods, values, by.x='id')
 
> sf %>% group_by(id) %>% do(head(., 1)) %>% head(10)
Source: local data frame [10 x 10]
Groups: id
 
      id      long      lat order  hole piece   group OBJECTID    ID     value
1  94102 -122.4193 37.77515     1 FALSE     1 94102.1       14 94102  6.184814
2  94103 -122.4039 37.77006   106 FALSE     1 94103.1       12 94103 21.659752
3  94104 -122.4001 37.79030   255 FALSE     1 94104.1       10 94104  5.173199
4  94105 -122.3925 37.79377   293 FALSE     1 94105.1        2 94105 15.723456
5  94107 -122.4012 37.78202   504 FALSE     1 94107.1        1 94107  8.402726
6  94108 -122.4042 37.79169  2232 FALSE     1 94108.1       11 94108  8.632652
7  94109 -122.4139 37.79046  2304 FALSE     1 94109.1        8 94109 20.129402
8  94110 -122.4217 37.73181  2794 FALSE     1 94110.1       16 94110 12.410610
9  94111 -122.4001 37.79369  3067 FALSE     1 94111.1        9 94111 10.185054
10 94112 -122.4278 37.73469  3334 FALSE     1 94112.1       18 94112 24.297588

Now we can easily plot those colours onto our shapefile by calling geom_polgon instead of geom_path:

> ggplot(sf, aes(long, lat, group = group)) + 
    geom_polygon(aes(fill = value))

2014 11 17 00 49 11

And finally let’s wire it up to our google map:

> ggmap(sfMap) + 
    geom_polygon(aes(fill = value, x = long, y = lat, group = group), 
                 data = sf,
                 alpha = 0.8, 
                 color = "black",
                 size = 0.2)
2014 11 17 00 50 13

I spent way too long with the alpha value set to ‘0’ on this last plot wondering why I wasn’t seeing any shading so don’t make that mistake!

Categories: Programming

Spark: Parse CSV file and group by column value

Sun, 11/16/2014 - 23:53

I’ve found myself working with large CSV files quite frequently and realising that my existing toolset didn’t let me explore them quickly I thought I’d spend a bit of time looking at Spark to see if it could help.

I’m working with a crime data set released by the City of Chicago: it’s 1GB in size and contains details of 4 million crimes:

$ ls -alh ~/Downloads/Crimes_-_2001_to_present.csv
-rw-r--r--@ 1 markneedham  staff   1.0G 16 Nov 12:14 /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv
 
$ wc -l ~/Downloads/Crimes_-_2001_to_present.csv
 4193441 /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv

We can get a rough idea of the contents of the file by looking at the first row along with the header:

$ head -n 2 ~/Downloads/Crimes_-_2001_to_present.csv
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,"(41.75017626412204, -87.55494559131228)"

I wanted to do a count of the ‘Primary Type’ column to see how many of each crime we have. Using just Unix command line tools this is how we’d do that:

$ time tail +2 ~/Downloads/Crimes_-_2001_to_present.csv | cut -d, -f6  | sort | uniq -c | sort -rn
859197 THEFT
757530 BATTERY
489528 NARCOTICS
488209 CRIMINAL DAMAGE
257310 BURGLARY
253964 OTHER OFFENSE
247386 ASSAULT
197404 MOTOR VEHICLE THEFT
157706 ROBBERY
137538 DECEPTIVE PRACTICE
124974 CRIMINAL TRESPASS
47245 PROSTITUTION
40361 WEAPONS VIOLATION
31585 PUBLIC PEACE VIOLATION
26524 OFFENSE INVOLVING CHILDREN
14788 CRIM SEXUAL ASSAULT
14283 SEX OFFENSE
10632 GAMBLING
8847 LIQUOR LAW VIOLATION
6443 ARSON
5178 INTERFERE WITH PUBLIC OFFICER
4846 HOMICIDE
3585 KIDNAPPING
3147 INTERFERENCE WITH PUBLIC OFFICER
2471 INTIMIDATION
1985 STALKING
 355 OFFENSES INVOLVING CHILDREN
 219 OBSCENITY
  86 PUBLIC INDECENCY
  80 OTHER NARCOTIC VIOLATION
  12 RITUALISM
  12 NON-CRIMINAL
   6 OTHER OFFENSE
   2 NON-CRIMINAL (SUBJECT SPECIFIED)
   2 NON - CRIMINAL
 
real	2m37.495s
user	3m0.337s
sys	0m1.471s

This isn’t too bad but it seems like the type of calculation that Spark is made for so I had a look at how I could go about doing that. To start with I created an SBT project with the following build file:

name := "playground"
 
version := "1.0"
 
scalaVersion := "2.10.4"
 
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"
 
libraryDependencies += "net.sf.opencsv" % "opencsv" % "2.3"
 
ideaExcludeFolders += ".idea"
 
ideaExcludeFolders += ".idea_modules"

I downloaded Spark and after unpacking it launched the Spark shell:

$ pwd
/Users/markneedham/projects/spark-play/spark-1.1.0/spark-1.1.0-bin-hadoop1
 
$ ./bin/spark-shell
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.1.0
      /_/
 
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51)
...
Spark context available as sc.
 
scala>

I first import some classes I’m going to need:

scala> import au.com.bytecode.opencsv.CSVParser
import au.com.bytecode.opencsv.CSVParser
 
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

Now, following the quick start example, we’ll create a Resilient Distributed Dataset (RDD) from our Crime CSV file:

scala> val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
crimeFile: String = /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv
 
scala> val crimeData = sc.textFile(crimeFile).cache()
14/11/16 22:31:16 INFO MemoryStore: ensureFreeSpace(32768) called with curMem=0, maxMem=278302556
14/11/16 22:31:16 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 265.4 MB)
crimeData: org.apache.spark.rdd.RDD[String] = /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv MappedRDD[1] at textFile at <console>:17

Our next step is to process each line of the file using our CSV Parser. A simple way to do this would be to create a new CSVParser for each line:

scala> crimeData.map(line => {
         val parser = new CSVParser(',')
         parser.parseLine(line).mkString(",")
       }).take(5).foreach(println)
14/11/16 22:35:49 INFO SparkContext: Starting job: take at <console>:23
...
4/11/16 22:35:49 INFO SparkContext: Job finished: take at <console>:23, took 0.013904 s
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,(41.75017626412204, -87.55494559131228)
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,(41.729576153145636, -87.57568059471686)
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,(41.884543798701515, -87.72803579358926)
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,(41.785633535413176, -87.74148516669783)

That works but it’s a bit wasteful to create a new CSVParser each time so instead let’s just create one for each partition that Spark splits our file up into:

scala> crimeData.mapPartitions(lines => {
         val parser = new CSVParser(',')
         lines.map(line => {
           parser.parseLine(line).mkString(",")
         })
       }).take(5).foreach(println)
14/11/16 22:38:44 INFO SparkContext: Starting job: take at <console>:25
...
14/11/16 22:38:44 INFO SparkContext: Job finished: take at <console>:25, took 0.015216 s
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,(41.75017626412204, -87.55494559131228)
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,(41.729576153145636, -87.57568059471686)
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,(41.884543798701515, -87.72803579358926)
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,(41.785633535413176, -87.74148516669783)

You’ll notice that we’ve still got the header being printed which isn’t ideal – let’s get rid of it!

I expected there to be a ‘drop’ function which would allow me to do that but in fact there isn’t. Instead we can make use of our knowledge that the first partition will contain the first line and strip it out that way:

scala> def dropHeader(data: RDD[String]): RDD[String] = {
         data.mapPartitionsWithIndex((idx, lines) => {
           if (idx == 0) {
             lines.drop(1)
           }
           lines
         })
       }
dropHeader: (data: org.apache.spark.rdd.RDD[String])org.apache.spark.rdd.RDD[String]

Now let’s grab the first 5 lines again and print them out:

scala> val withoutHeader: RDD[String] = dropHeader(crimeData)
withoutHeader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:14
 
scala> withoutHeader.mapPartitions(lines => {
         val parser = new CSVParser(',')
         lines.map(line => {
           parser.parseLine(line).mkString(",")
         })
       }).take(5).foreach(println)
14/11/16 22:43:27 INFO SparkContext: Starting job: take at <console>:29
...
14/11/16 22:43:27 INFO SparkContext: Job finished: take at <console>:29, took 0.018557 s
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,(41.75017626412204, -87.55494559131228)
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,(41.729576153145636, -87.57568059471686)
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,(41.884543798701515, -87.72803579358926)
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,(41.785633535413176, -87.74148516669783)
9460355,HX113738,01/14/2014 04:21:00 AM,070XX S PEORIA ST,0820,THEFT,$500 AND UNDER,STREET,true,false,0733,007,17,68,06,1171480,1858195,2014,01/16/2014 12:40:00 AM,41.766348042591375,-87.64702037047671,(41.766348042591375, -87.64702037047671)

We’re finally in good shape to extract the values from the ‘Primary Type’ column and count how many times each of those appears in our data set:

scala> withoutHeader.mapPartitions(lines => {
         val parser=new CSVParser(',')
         lines.map(line => {
           val columns = parser.parseLine(line)
           Array(columns(5)).mkString(",")
         })
       }).countByValue().toList.sortBy(-_._2).foreach(println)
14/11/16 22:45:20 INFO SparkContext: Starting job: countByValue at <console>:30
14/11/16 22:45:20 INFO DAGScheduler: Got job 7 (countByValue at <console>:30) with 32 output partitions (allowLocal=false)
...
14/11/16 22:45:30 INFO SparkContext: Job finished: countByValue at <console>:30, took 9.796565 s
(THEFT,859197)
(BATTERY,757530)
(NARCOTICS,489528)
(CRIMINAL DAMAGE,488209)
(BURGLARY,257310)
(OTHER OFFENSE,253964)
(ASSAULT,247386)
(MOTOR VEHICLE THEFT,197404)
(ROBBERY,157706)
(DECEPTIVE PRACTICE,137538)
(CRIMINAL TRESPASS,124974)
(PROSTITUTION,47245)
(WEAPONS VIOLATION,40361)
(PUBLIC PEACE VIOLATION,31585)
(OFFENSE INVOLVING CHILDREN,26524)
(CRIM SEXUAL ASSAULT,14788)
(SEX OFFENSE,14283)
(GAMBLING,10632)
(LIQUOR LAW VIOLATION,8847)
(ARSON,6443)
(INTERFERE WITH PUBLIC OFFICER,5178)
(HOMICIDE,4846)
(KIDNAPPING,3585)
(INTERFERENCE WITH PUBLIC OFFICER,3147)
(INTIMIDATION,2471)
(STALKING,1985)
(OFFENSES INVOLVING CHILDREN,355)
(OBSCENITY,219)
(PUBLIC INDECENCY,86)
(OTHER NARCOTIC VIOLATION,80)
(NON-CRIMINAL,12)
(RITUALISM,12)
(OTHER OFFENSE ,6)
(NON-CRIMINAL (SUBJECT SPECIFIED),2)
(NON - CRIMINAL,2)

We get the same results as with the Unix commands except it took less than 10 seconds to calculate which is pretty cool!

Categories: Programming

R: dplyr – Sum for group_by multiple columns

Tue, 11/11/2014 - 01:17

Over the weekend I was playing around with dplyr and had the following data frame grouped by both columns:

> library(dplyr)
 
> data = data.frame(
    letter = sample(LETTERS, 50000, replace = TRUE),
    number = sample (1:10, 50000, replace = TRUE)
    )
 
> data %>% count(letter, number) %>% head(20)
Source: local data frame [20 x 3]
Groups: letter
 
   letter number   n
1       A      1 184
2       A      2 192
3       A      3 183
4       A      4 193
5       A      5 214
6       A      6 172
7       A      7 196
8       A      8 198
9       A      9 174
10      A     10 196
11      B      1 212
12      B      2 198
13      B      3 194
14      B      4 181
15      B      5 203
16      B      6 234
17      B      7 221
18      B      8 179
19      B      9 182
20      B     10 170

I wanted to add an extra column which would show what percentage of the values for that letter each number had.

If we wrote that code standalone we’d have the following:

> data %>% count(letter)
Source: local data frame [26 x 2]
 
   letter    n
1       A 1902
2       B 1974
3       C 1911
4       D 1948
5       E 1888
6       F 1975
7       G 1914
8       H 1886
9       I 1910
10      J 1924
11      K 1974
12      L 1891
13      M 1894
14      N 1946
15      O 1956
16      P 1934
17      Q 1865
18      R 1992
19      S 1946
20      T 1854
21      U 1919
22      V 1913
23      W 1928
24      X 1934
25      Y 1908
26      Z 1914

We can also get that value by summing up the individual (letter, number) pairs from the previous data frame. The ungroup function allows us to do so:

> data %>% count(letter, number) %>% ungroup %>% group_by(letter) %>% mutate(sum.n = sum(n)) %>% head(20)
Source: local data frame [20 x 4]
Groups: letter
 
   letter number   n sum.n
1       A      1 184  1902
2       A      2 192  1902
3       A      3 183  1902
4       A      4 193  1902
5       A      5 214  1902
6       A      6 172  1902
7       A      7 196  1902
8       A      8 198  1902
9       A      9 174  1902
10      A     10 196  1902
11      B      1 212  1974
12      B      2 198  1974
13      B      3 194  1974
14      B      4 181  1974
15      B      5 203  1974
16      B      6 234  1974
17      B      7 221  1974
18      B      8 179  1974
19      B      9 182  1974
20      B     10 170  1974

Pretty neat!

Categories: Programming

R: dplyr – Maximum value row in each group

Mon, 11/10/2014 - 23:06

In my continued work with R’s dplyr I wanted to be able to group a data frame by some columns and then find the maximum value for each group.

We’ll continue with my favourite dummy data set:

> library(dplyr)
 
> data = data.frame(
    letter = sample(LETTERS, 50000, replace = TRUE),
    number = sample (1:10, 50000, replace = TRUE)
    )

I started with the following code to count how many occurrences of each (letter, number) pair there were:

> data %>% count(letter, number)
Source: local data frame [260 x 3]
Groups: letter
 
   letter number   n
1       A      1 184
2       A      2 192
3       A      3 183
4       A      4 193
5       A      5 214
6       A      6 172
7       A      7 196
8       A      8 198
9       A      9 174
10      A     10 196
..    ...    ... ...

I wanted to find the top number for each letter and with a bit of help from Stack Overflow I ended up with the following:

> data %>% count(letter, number) %>% filter(n == max(n))
Source: local data frame [26 x 3]
Groups: letter
 
   letter number   n
1       A      5 214
2       B      6 234
3       C      8 213
4       D      6 211
5       E      9 208
6       F      2 219
7       G      1 213
8       H      2 208
9       I      9 220
10      J      7 213
11      K      3 228
12      L      2 206
13      M      3 212
14      N      4 222
15      O      1 211
16      P      7 211
17      Q      9 210
18      R      5 224
19      S      2 211
20      T      9 204
21      U      8 217
22      V      6 220
23      W      2 213
24      X      2 214
25      Y      3 211
26      Z      3 206

Here we’re filtering over a collection of rows grouped by letter and only selecting the row which has the max value. We can see more clearly what’s happening if we filter the data frame to only contain one letter:

> letterA = data %>% filter(letter == "A") %>% count(letter, number)
> letterA
Source: local data frame [10 x 3]
Groups: letter
 
   letter number   n
1       A      1 184
2       A      2 192
3       A      3 183
4       A      4 193
5       A      5 214
6       A      6 172
7       A      7 196
8       A      8 198
9       A      9 174
10      A     10 196

And now let’s apply the filter command while also printing out information about each row:

> letterA %>% filter(print(n), print(n == max(n)), n == max(n))
 [1] 184 192 183 193 214 172 196 198 174 196
 [1] FALSE FALSE FALSE FALSE  TRUE FALSE FALSE FALSE FALSE FALSE
Source: local data frame [1 x 3]
Groups: letter
 
  letter number   n
1      A      5 214

If instead of getting the top number by letter we wanted to get the top letter by number we just need to reorder the field names in the count method:

> data %>% count(number, letter) %>% filter(n == max(n))
Source: local data frame [10 x 3]
Groups: number
 
   number letter   n
1       1      G 213
2       2      F 219
3       3      K 228
4       4      N 222
5       5      R 224
6       6      B 234
7       7      B 221
8       8      U 217
9       9      I 220
10     10      O 209

And if we want to see the letter that shows up least frequently we can change the call to ‘max’ to an equivalent one to ‘min':

> data %>% count(number, letter) %>% filter(n == min(n))
Source: local data frame [11 x 3]
Groups: number
 
   number letter   n
1       1      H 166
2       2      O 160
3       3      E 156
4       4      R 169
5       5      L 169
6       6      I 164
7       7      H 170
8       7      I 170
9       8      Q 166
10      9      W 162
11     10      J 168
Categories: Programming

R: dplyr – Ordering by count after multiple column group_by

Sun, 11/09/2014 - 10:30

I was recently trying to group a data frame by two columns and then sort by the count using dplyr but it wasn’t sorting in the way I expecting which was initially very confusing.

I started with this data frame:

library(dplyr)
 
data = data.frame(
  letter = sample(LETTERS, 50000, replace = TRUE),
  number = sample (1:10, 50000, replace = TRUE)
  )

And I wanted to find out how many occurrences of each (letter, number) pair exist in the data set. I started with the following code:

> data %>% count(letter, number, sort = TRUE)
Source: local data frame [260 x 3]
Groups: letter
 
   letter number   n
1       A      4 205
2       A      9 201
3       A      3 197
4       A      1 195
5       A     10 191
6       A      2 189
7       A      8 184
8       A      7 183
9       A      5 181
10      A      6 173
..    ...    ... ...

As you can see it’s only showing A’s which is interesting as I wouldn’t expect there to be a bias towards that letter. Let’s filter out the A’s:

> data %>% filter(letter != "A") %>% count(letter, number, sort = TRUE)
Source: local data frame [250 x 3]
Groups: letter
 
   letter number   n
1       B      8 222
2       B      9 212
3       B      5 207
4       B      6 201
5       B     10 200
6       B      7 192
7       B      2 189
8       B      3 189
9       B      1 187
10      B      4 181
..    ...    ... ...

Now all we see are B’s and we can see that both (B,8) and (B,9) have a higher ‘n’ value than any of the A’s.

I put the code back into the more verbose form to see if it was the count function that behaved unexpectedly:

> data %>% group_by(letter, number) %>% summarise(n = n()) %>% arrange(desc(n))
Source: local data frame [260 x 3]
Groups: letter
 
   letter number   n
1       A      4 205
2       A      9 201
3       A      3 197
4       A      1 195
5       A     10 191
6       A      2 189
7       A      8 184
8       A      7 183
9       A      5 181
10      A      6 173
..    ...    ... ...

Nope, still the same behaviour.

At this point I vaguely remembered there being a function called ungroup which I hadn’t used and wondered if now was the time.

> data %>% group_by(letter, number) %>% summarise(n = n()) %>% ungroup() %>% arrange(desc(n))
Source: local data frame [260 x 3]
 
   letter number   n
1       L      2 236
2       V      1 231
3       Y      8 226
4       J      4 225
5       J     10 223
6       Q      7 223
7       B      8 222
8       O      9 222
9       Q     10 221
10      Z      9 221
..    ...    ... ...

Indeed it was and now we can go back to our original version of the code using count and handle the sorting afterwards:

> data %>% count(letter, number) %>% ungroup() %>% arrange(desc(n))
Source: local data frame [260 x 3]
 
   letter number   n
1       L      2 236
2       V      1 231
3       Y      8 226
4       J      4 225
5       J     10 223
6       Q      7 223
7       B      8 222
8       O      9 222
9       Q     10 221
10      Z      9 221
..    ...    ... ...
Categories: Programming

R: Refactoring to dplyr

Sun, 11/09/2014 - 01:11

I’ve been looking back over some of the early code I wrote using R before I knew about the dplyr library and thought it’d be an interesting exercise to refactor some of the snippets.

We’ll use the following data frame for each of the examples:

library(dplyr)
 
data = data.frame(
  letter = sample(LETTERS, 50000, replace = TRUE),
  number = sample (1:10, 50000, replace = TRUE)
  )
Take {n} rows
> data[1:5,]
  letter number
1      R      7
2      Q      3
3      B      8
4      R      3
5      U      2

becomes:

> data %>% head(5)
  letter number
1      R      7
2      Q      3
3      B      8
4      R      3
5      U      2
Order by numeric value descending
> data[order(-(data$number)),][1:5,]
   letter number
14      H     10
17      G     10
63      L     10
66      W     10
73      R     10

becomes:

> data %>% arrange(desc(number)) %>% head(5)
  letter number
1      H     10
2      G     10
3      L     10
4      W     10
5      R     10
Count number of items
> length(data[,1])
[1] 50000

becomes:

> data %>% count()
Source: local data frame [1 x 1]
 
      n
1 50000
Filter by column value
> length(subset(data, number == 1)[, 1])
[1] 4928

becomes:

> data %>% filter(number == 1) %>% count()
Source: local data frame [1 x 1]
 
     n
1 4928
Group by variable and count
> aggregate(data, by= list(data$number), function(x) length(x))
   Group.1 letter number
1        1   4928   4928
2        2   5045   5045
3        3   5064   5064
4        4   4823   4823
5        5   5032   5032
6        6   5163   5163
7        7   4945   4945
8        8   5077   5077
9        9   5025   5025
10      10   4898   4898

becomes:

> data %>% count(number)
Source: local data frame [10 x 2]
 
   number    n
1       1 4928
2       2 5045
3       3 5064
4       4 4823
5       5 5032
6       6 5163
7       7 4945
8       8 5077
9       9 5025
10     10 4898
Select a range of rows
> data[4:5,]
  letter number
4      R      3
5      U      2

becomes:

> data %>% slice(4:5)
  letter number
1      R      3
2      U      2

There’s certainly more code in some of the dplyr examples but I find it easier to remember how the dplyr code works when I come back to it and hence tend to favour that approach.

Categories: Programming

R: dplyr – Group by field dynamically (‘regroup’ is deprecated / no applicable method for ‘as.lazy’ applied to an object of class “list” )

Sat, 11/08/2014 - 23:29

A few months ago I wrote a blog explaining how to dynamically/programatically group a data frame by a field using dplyr but that approach has been deprecated in the latest version.

To recap, the original function looked like this:

library(dplyr)
 
groupBy = function(df, field) {
  df %.% regroup(list(field)) %.% summarise(n = n())
}

And if we execute that with a sample data frame we’ll see the following:

> data = data.frame(
      letter = sample(LETTERS, 50000, replace = TRUE),
      number = sample (1:10, 50000, replace = TRUE)
  )
 
> groupBy(data, 'letter') %>% head(5)
Source: local data frame [5 x 2]
 
  letter    n
1      A 1951
2      B 1903
3      C 1954
4      D 1923
5      E 1886
Warning messages:
1: %.% is deprecated. Please use %>% 
2: %.% is deprecated. Please use %>% 
3: 'regroup' is deprecated.
Use 'group_by_' instead.
See help("Deprecated")

I replaced each of the deprecated operators and ended up with this function:

groupBy = function(df, field) {
  df %>% group_by_(list(field)) %>% summarise(n = n())
}

Now if we run that:

> groupBy(data, 'letter') %>% head(5)
Error in UseMethod("as.lazy") : 
  no applicable method for 'as.lazy' applied to an object of class "list"

It turns out the ‘group_by_’ function doesn’t want to receive a list of fields so let’s remove the call to list:

groupBy = function(df, field) {
  df %>% group_by_(field) %>% summarise(n = n())
}

And now if we run that:

> groupBy(data, 'letter') %>% head(5)
Source: local data frame [5 x 2]
 
  letter    n
1      A 1951
2      B 1903
3      C 1954
4      D 1923
5      E 1886

Good times! We get the correct result and no deprecation messages.

If we want to group by multiple fields we can just pass in the field names like so:

groupBy = function(df, field1, field2) {
  df %>% group_by_(field1, field2) %>% summarise(n = n())
}
> groupBy(data, 'letter', 'number') %>% head(5)
Source: local data frame [5 x 3]
Groups: letter
 
  letter number   n
1      A      1 200
2      A      2 218
3      A      3 205
4      A      4 176
5      A      5 203

Or with this simpler version:

groupBy = function(df, ...) {
  df %>% group_by_(...) %>% summarise(n = n())
}
> groupBy(data, 'letter', 'number') %>% head(5)
Source: local data frame [5 x 3]
Groups: letter
 
  letter number   n
1      A      1 200
2      A      2 218
3      A      3 205
4      A      4 176
5      A      5 203

I realised that we can actually just use the group_by itself and pass in the field names without quotes, something I couldn’t get to work in earlier versions:

groupBy = function(df, ...) {
  df %>% group_by(...) %>% summarise(n = n())
}
> groupBy(data, letter, number) %>% head(5)
Source: local data frame [5 x 3]
Groups: letter
 
  letter number   n
1      A      1 200
2      A      2 218
3      A      3 205
4      A      4 176
5      A      5 203

We could even get a bit of pipelining going on if we fancied it:

> data %>% groupBy(letter, number) %>% head(5)
Source: local data frame [5 x 3]
Groups: letter
 
  letter number   n
1      A      1 200
2      A      2 218
3      A      3 205
4      A      4 176
5      A      5 203

And as of dplyr 0.3 we can simplify our groupBy function to make use of the new count function which combines group_by and summarise:

groupBy = function(df, ...) {
  df %>% count(...)
}
> data %>% groupBy(letter, number) %>% head(5)
Source: local data frame [5 x 3]
Groups: letter
 
  letter number   n
1      A      1 200
2      A      2 218
3      A      3 205
4      A      4 176
5      A      5 203
Categories: Programming

R: Joining multiple data frames

Fri, 11/07/2014 - 02:29

I’ve been looking through the code from Martin Eastwood’s excellent talk ‘Predicting Football Using R‘ and was intrigued by the code which reshaped the data into that expected by glm.

The original looks like this:

df <- read.csv('http://www.football-data.co.uk/mmz4281/1314/E0.csv')
 
# munge data into format compatible with glm function
df <- apply(df, 1, function(row){
  data.frame(team=c(row['HomeTeam'], row['AwayTeam']),
             opponent=c(row['AwayTeam'], row['HomeTeam']),
             goals=c(row['FTHG'], row['FTAG']),
             home=c(1, 0))
})
df <- do.call(rbind, df)

The initial data frame looks like this:

> library(dplyr)
> df %>% select(HomeTeam, AwayTeam, FTHG, FTAG) %>% head(1)
  HomeTeam    AwayTeam FTHG FTAG
1  Arsenal Aston Villa    1    3

And we want to get it to look like this:

> head(df, 2)
                team    opponent goals home
HomeTeam     Arsenal Aston Villa     1    1
AwayTeam Aston Villa     Arsenal     3    0

So for each row in the initial data frame we want to have two rows: one representing each team, how many goals they scored in the match and whether they were playing at home or away.

I really like dplyr’s pipelining function so I thought I’d try and translate Martin’s code to use that and other dplyr functions.

I ended up with the following two sets of function calls:

df %>% select(team = HomeTeam, opponent = AwayTeam, goals = FTHG) %>% mutate(home = 1)
df %>% select(team = AwayTeam, opponent = HomeTeam, goals = FTAG) %>% mutate(home = 0)

I’m doing pretty much the same thing as Martin except I’ve used dplyr’s select and mutate functions to transform the data frame.

The next step was to join those two data frames together and with Nicole’s help I realised that there are many ways we can do this.

The functions that will do the job are:

We decided to benchmark them to see which was able to transform the data frame the fastest:

# load data into data.frame
dfOrig <- read.csv('http://www.football-data.co.uk/mmz4281/1314/E0.csv')
 
original = function(df) {
  df <- apply(df, 1, function(row){
    data.frame(team=c(row['HomeTeam'], row['AwayTeam']),
               opponent=c(row['AwayTeam'], row['HomeTeam']),
               goals=c(row['FTHG'], row['FTAG']),
               home=c(1, 0))
  })
  do.call(rbind, df)
}
 
newRBind = function(df) {
  rbind(df %>% select(team = HomeTeam, opponent = AwayTeam, goals = FTHG) %>% mutate(home = 1),
        df %>% select(team = AwayTeam, opponent = HomeTeam, goals = FTAG) %>% mutate(home = 0))  
}
 
newUnion = function(df) {
  union(df %>% select(team = HomeTeam, opponent = AwayTeam, goals = FTHG) %>% mutate(home = 1),
        df %>% select(team = AwayTeam, opponent = HomeTeam, goals = FTAG) %>% mutate(home = 0))  
}
 
newJoin = function(df) {
  join(df %>% select(team = HomeTeam, opponent = AwayTeam, goals = FTHG) %>% mutate(home = 1),
       df %>% select(team = AwayTeam, opponent = HomeTeam, goals = FTAG) %>% mutate(home = 0),
      type = "full")  
}
 
newMerge = function(df) {
  merge(df %>% select(team = HomeTeam, opponent = AwayTeam, goals = FTHG) %>% mutate(home = 1),
       df %>% select(team = AwayTeam, opponent = HomeTeam, goals = FTAG) %>% mutate(home = 0),
       all = TRUE)  
}
> library(microbenchmark)
 
> microbenchmark(original(dfOrig))
Unit: milliseconds
             expr   min    lq  mean median    uq max neval
 original(dfOrig) 189.4 196.8 202.5    201 205.5 284   100
 
> microbenchmark(newRBind(dfOrig))
Unit: milliseconds
             expr   min    lq  mean median    uq   max neval
 newRBind(dfOrig) 2.197 2.274 2.396  2.309 2.377 4.526   100
 
> microbenchmark(newUnion(dfOrig))
Unit: milliseconds
             expr   min    lq  mean median   uq   max neval
 newUnion(dfOrig) 2.156 2.223 2.377  2.264 2.34 4.597   100
 
> microbenchmark(newJoin(dfOrig))
 
Unit: milliseconds
            expr   min    lq  mean median   uq   max neval
 newJoin(dfOrig) 5.961 6.132 6.817  6.253 6.65 11.95   100
 
> microbenchmark(newMerge(dfOrig))
Unit: milliseconds
             expr   min    lq  mean median    uq   max neval
 newMerge(dfOrig) 7.121 7.413 8.037  7.541 7.934 13.32   100

We actually get a 100 time speed up over the original function if we use rbind or union whereas with merge or join it’s around 35 times quicker.

In this case using merge or join is a bit misleading because we’re not actually connecting the data frames together based on any particular field – we are just appending one to the other.

The code’s available as a gist if you want to have a play.

Categories: Programming

R: Converting a named vector to a data frame

Sat, 11/01/2014 - 00:47

I’ve been playing around with igraph’s page rank function to see who the most central nodes in the London NoSQL scene are and I wanted to put the result in a data frame to make the data easier to work with.

I started off with a data frame containing pairs of people and the number of events that they’d both RSVP’d ‘yes’ to:

> library(dplyr)
> data %>% arrange(desc(times)) %>% head(10)
       p.name     other.name times
1  Amit Nandi Anish Mohammed    51
2  Amit Nandi Enzo Martoglio    49
3       louis          zheng    46
4       louis     Raja Kolli    45
5  Raja Kolli Enzo Martoglio    43
6  Amit Nandi     Raja Kolli    42
7       zheng Anish Mohammed    42
8  Raja Kolli          Rohit    41
9  Amit Nandi          zheng    40
10      louis          Rohit    40

I actually had ~ 900,000 such rows in the data frame:

> length(data[,1])
[1] 985664

I ran page rank over the data set like so:

g = graph.data.frame(data, directed = F)
pr = page.rank(g)$vector

If we evaluate pr we can see the person’s name and their page rank:

> head(pr)
Ioanna Eirini          Mjay       Baktash      madhuban    Karl Prior   Keith Bolam 
    0.0002190     0.0001206     0.0001524     0.0008819     0.0001240     0.0005702

I initially tried to convert this to a data frame with the following code…

> head(data.frame(pr))
                     pr
Ioanna Eirini 0.0002190
Mjay          0.0001206
Baktash       0.0001524
madhuban      0.0008819
Karl Prior    0.0001240
Keith Bolam   0.0005702

…which unfortunately didn’t create a column for the person’s name.

> colnames(data.frame(pr))
[1] "pr"

Nicole pointed out that I actually had a named vector and would need to explicitly extract the names from that vector into the data frame. I ended up with this:

> prDf = data.frame(name = names(pr), rank = pr)
> head(prDf)
                       name      rank
Ioanna Eirini Ioanna Eirini 0.0002190
Mjay                   Mjay 0.0001206
Baktash             Baktash 0.0001524
madhuban           madhuban 0.0008819
Karl Prior       Karl Prior 0.0001240
Keith Bolam     Keith Bolam 0.0005702

We can now sort the data frame to find the most central people on the NoSQL London scene based on meetup attendance:

> data.frame(prDf) %>%
+   arrange(desc(pr)) %>%
+   head(10)
             name     rank
1           louis 0.001708
2       Kannappan 0.001657
3           zheng 0.001514
4    Peter Morgan 0.001492
5      Ricki Long 0.001437
6      Raja Kolli 0.001416
7      Amit Nandi 0.001411
8  Enzo Martoglio 0.001396
9           Chris 0.001327
10          Rohit 0.001305
Categories: Programming

hdiutil: could not access / create failed – Operation canceled

Fri, 10/31/2014 - 10:45

Earlier in the year I wrote a blog post showing how to build a Mac OS X DMG file for a Java application and I recently revisited this script to update it to a new version and ran into a frustrating error message.

I tried to run the following command to create a new DMG file from a source folder…

$ hdiutil create -volname "DemoBench" -size 100m -srcfolder dmg/ -ov -format UDZO pack.temp.dmg

…but was met with the following error message:

...could not access /Volumes/DemoBench/DemoBench.app/Contents/Resources/app/database-agent.jar - Operation canceled
 
hdiutil: create failed - Operation canceled

I was initially a bit stumped and thought maybe the flags to hdiutil had changed but a quick look at the man page suggested that wasn’t the issue.

I decided to go back to my pre command line approach for creating a DMG – DiskUtility – and see if I could create it that way. This helped reveal the actual problem:

2014 10 31 09 42 20

I increased the volume size to 150 MB…

$ hdiutil create -volname "DemoBench" -size 100m -srcfolder dmg/ -ov -format UDZO pack.temp.dmg

and all was well:

....................................................................................................
..........................................................................
created: /Users/markneedham/projects/neo-technology/quality-tasks/park-bench/database-agent-desktop/target/pack.temp.dmg

And this post will serve as documentation to stop it catching me out next time!

Categories: Programming

Data Modelling: The Thin Model

Mon, 10/27/2014 - 07:55

About a third of the way through Mastering Data Modeling the authors describe common data modelling mistakes and one in particular resonated with me – ‘Thin LDS, Lost Users‘.

LDS stands for ‘Logical Data Structure’ which is a diagram depicting what kinds of data some person or group wants to remember. In other words, a tool to help derive the conceptual model for our domain.

They describe the problem that a thin model can cause as follows:

[...] within 30 minutes [of the modelling session] the users were lost…we determined that the model was too thin. That is, many entities had just identifying descriptors.

While this is syntactically okay, when we revisited those entities asking, What else is memorable here? the users had lots to say.

When there was flesh on the bones, the uncertainty abated and the session took a positive course.

I found myself making the same mistake a couple of weeks ago during a graph modelling session. I tend to spend the majority of the time focused on the relationships between the bits of data and treat the meta data or attributes almost as an after thought.

2014 10 27 06 41 19

The nice thing about the graph model is that it encourages an iterative approach so I was quickly able to bring the model to life and the domain experts back onside.

We can see a simple example of adding flesh to a model with a subset of the movies graph.

We might start out with the model on the right hand side which just describes the structure of the graph but doesn’t give us very much information about the entities.

I tend to sketch out the structure of all the data before adding any attributes but I think some people find it easier to follow if you add at least some flesh before moving on to the next part of the model.

In our next iteration of the movie graph we can add attributes to the actor and movie:

2014 10 27 06 57 32

We can then go on to evolve the model further but the lesson for me is value the attributes more, it’s not all about the structure.

Categories: Programming

Neo4j: Cypher – Avoiding the Eager

Thu, 10/23/2014 - 06:56

Although I love how easy Cypher’s LOAD CSV command makes it to get data into Neo4j, it currently breaks the rule of least surprise in the way it eagerly loads in all rows for some queries even those using periodic commit.

Neverwhere

Beware of the eager pipe

This is something that my colleague Michael noted in the second of his blog posts explaining how to use LOAD CSV successfully:

The biggest issue that people ran into, even when following the advice I gave earlier, was that for large imports of more than one million rows, Cypher ran into an out-of-memory situation.

That was not related to commit sizes, so it happened even with PERIODIC COMMIT of small batches.

I recently spent a few days importing data into Neo4j on a Windows machine with 4GB RAM so I was seeing this problem even earlier than Michael suggested.

Michael explains how to work out whether your query is suffering from unexpected eager evaluation:

If you profile that query you see that there is an “Eager” step in the query plan.

That is where the “pull in all data” happens.

You can profile queries by prefixing the word ‘PROFILE’. You’ll need to run your query in the console of /webadmin in your web browser or with the Neo4j shell.

I did this for my queries and was able to identify query patterns which get evaluated eagerly and in some cases we can work around it.

We’ll use the Northwind data set to demonstrate how the Eager pipe can sneak into our queries but keep in mind that this data set is sufficiently small to not cause issues.

This is what a row in the file looks like:

$ head -n 2 data/customerDb.csv
OrderID,CustomerID,EmployeeID,OrderDate,RequiredDate,ShippedDate,ShipVia,Freight,ShipName,ShipAddress,ShipCity,ShipRegion,ShipPostalCode,ShipCountry,CustomerID,CustomerCompanyName,ContactName,ContactTitle,Address,City,Region,PostalCode,Country,Phone,Fax,EmployeeID,LastName,FirstName,Title,TitleOfCourtesy,BirthDate,HireDate,Address,City,Region,PostalCode,Country,HomePhone,Extension,Photo,Notes,ReportsTo,PhotoPath,OrderID,ProductID,UnitPrice,Quantity,Discount,ProductID,ProductName,SupplierID,CategoryID,QuantityPerUnit,UnitPrice,UnitsInStock,UnitsOnOrder,ReorderLevel,Discontinued,SupplierID,SupplierCompanyName,ContactName,ContactTitle,Address,City,Region,PostalCode,Country,Phone,Fax,HomePage,CategoryID,CategoryName,Description,Picture
10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France,VINET,Vins et alcools Chevalier,Paul Henriot,Accounting Manager,59 rue de l'Abbaye,Reims,,51100,France,26.47.15.10,26.47.15.11,5,Buchanan,Steven,Sales Manager,Mr.,1955-03-04,1993-10-17,14 Garrett Hill,London,,SW1 8JR,UK,(71) 555-4848,3453,\x,"Steven Buchanan graduated from St. Andrews University, Scotland, with a BSC degree in 1976.  Upon joining the company as a sales representative in 1992, he spent 6 months in an orientation program at the Seattle office and then returned to his permanent post in London.  He was promoted to sales manager in March 1993.  Mr. Buchanan has completed the courses ""Successful Telemarketing"" and ""International Sales Management.""  He is fluent in French.",2,http://accweb/emmployees/buchanan.bmp,10248,11,14,12,0,11,Queso Cabrales,5,4,1 kg pkg.,21,22,30,30,0,5,Cooperativa de Quesos 'Las Cabras',Antonio del Valle Saavedra,Export Administrator,Calle del Rosal 4,Oviedo,Asturias,33007,Spain,(98) 598 76 54,,,4,Dairy Products,Cheeses,\x
MERGE, MERGE, MERGE

The first thing we want to do is create a node for each employee and each order and then create a relationship between them.

We might start with the following query:

USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM "file:/Users/markneedham/projects/neo4j-northwind/data/customerDb.csv" AS row
MERGE (employee:Employee {employeeId: row.EmployeeID})
MERGE (order:Order {orderId: row.OrderID})
MERGE (employee)-[:SOLD]->(order)

This does the job but if we profile the query like so…

PROFILE LOAD CSV WITH HEADERS FROM "file:/Users/markneedham/projects/neo4j-northwind/data/customerDb.csv" AS row
WITH row LIMIT 0
MERGE (employee:Employee {employeeId: row.EmployeeID})
MERGE (order:Order {orderId: row.OrderID})
MERGE (employee)-[:SOLD]->(order)

…we’ll notice an ‘Eager’ lurking on the third line:

==> +----------------+------+--------+----------------------------------+-----------------------------------------+
==> |       Operator | Rows | DbHits |                      Identifiers |                                   Other |
==> +----------------+------+--------+----------------------------------+-----------------------------------------+
==> |    EmptyResult |    0 |      0 |                                  |                                         |
==> | UpdateGraph(0) |    0 |      0 |    employee, order,   UNNAMED216 |                            MergePattern |
==> |          Eager |    0 |      0 |                                  |                                         |
==> | UpdateGraph(1) |    0 |      0 | employee, employee, order, order | MergeNode; :Employee; MergeNode; :Order |
==> |          Slice |    0 |      0 |                                  |                            {  AUTOINT0} |
==> |        LoadCSV |    1 |      0 |                              row |                                         |
==> +----------------+------+--------+----------------------------------+-----------------------------------------+

You’ll notice that when we profile each query we’re stripping off the periodic commit section and adding a ‘WITH row LIMIT 0′. This allows us to generate enough of the query plan to identify the ‘Eager’ operator without actually importing any data.

We want to split that query into two so it can be processed in a non eager manner:

USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM "file:/Users/markneedham/projects/neo4j-northwind/data/customerDb.csv" AS row
WITH row LIMIT 0
MERGE (employee:Employee {employeeId: row.EmployeeID})
MERGE (order:Order {orderId: row.OrderID})
==> +-------------+------+--------+----------------------------------+-----------------------------------------+
==> |    Operator | Rows | DbHits |                      Identifiers |                                   Other |
==> +-------------+------+--------+----------------------------------+-----------------------------------------+
==> | EmptyResult |    0 |      0 |                                  |                                         |
==> | UpdateGraph |    0 |      0 | employee, employee, order, order | MergeNode; :Employee; MergeNode; :Order |
==> |       Slice |    0 |      0 |                                  |                            {  AUTOINT0} |
==> |     LoadCSV |    1 |      0 |                              row |                                         |
==> +-------------+------+--------+----------------------------------+-----------------------------------------+

Now that we’ve created the employees and orders we can join them together:

USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM "file:/Users/markneedham/projects/neo4j-northwind/data/customerDb.csv" AS row
MATCH (employee:Employee {employeeId: row.EmployeeID})
MATCH (order:Order {orderId: row.OrderID})
MERGE (employee)-[:SOLD]->(order)
==> +----------------+------+--------+-------------------------------+-----------------------------------------------------------+
==> |       Operator | Rows | DbHits |                   Identifiers |                                                     Other |
==> +----------------+------+--------+-------------------------------+-----------------------------------------------------------+
==> |    EmptyResult |    0 |      0 |                               |                                                           |
==> |    UpdateGraph |    0 |      0 | employee, order,   UNNAMED216 |                                              MergePattern |
==> |      Filter(0) |    0 |      0 |                               |          Property(order,orderId) == Property(row,OrderID) |
==> | NodeByLabel(0) |    0 |      0 |                  order, order |                                                    :Order |
==> |      Filter(1) |    0 |      0 |                               | Property(employee,employeeId) == Property(row,EmployeeID) |
==> | NodeByLabel(1) |    0 |      0 |            employee, employee |                                                 :Employee |
==> |          Slice |    0 |      0 |                               |                                              {  AUTOINT0} |
==> |        LoadCSV |    1 |      0 |                           row |                                                           |
==> +----------------+------+--------+-------------------------------+-----------------------------------------------------------+

Not an Eager in sight!

MATCH, MATCH, MATCH, MERGE, MERGE

If we fast forward a few steps we may now have refactored our import script to the point where we create our nodes in one query and the relationships in another query.

Our create query works as expected:

USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM "file:/Users/markneedham/projects/neo4j-northwind/data/customerDb.csv" AS row
MERGE (employee:Employee {employeeId: row.EmployeeID})
MERGE (order:Order {orderId: row.OrderID})
MERGE (product:Product {productId: row.ProductID})
==> +-------------+------+--------+----------------------------------------------------+--------------------------------------------------------------+
==> |    Operator | Rows | DbHits |                                        Identifiers |                                                        Other |
==> +-------------+------+--------+----------------------------------------------------+--------------------------------------------------------------+
==> | EmptyResult |    0 |      0 |                                                    |                                                              |
==> | UpdateGraph |    0 |      0 | employee, employee, order, order, product, product | MergeNode; :Employee; MergeNode; :Order; MergeNode; :Product |
==> |       Slice |    0 |      0 |                                                    |                                                 {  AUTOINT0} |
==> |     LoadCSV |    1 |      0 |                                                row |                                                              |
==> +-------------+------+--------+----------------------------------------------------+------------------------------------------------------------

We’ve now got employees, products and orders in the graph. Now let’s create relationships between the trio:

USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM "file:/Users/markneedham/projects/neo4j-northwind/data/customerDb.csv" AS row
MATCH (employee:Employee {employeeId: row.EmployeeID})
MATCH (order:Order {orderId: row.OrderID})
MATCH (product:Product {productId: row.ProductID})
MERGE (employee)-[:SOLD]->(order)
MERGE (order)-[:PRODUCT]->(product)

If we profile that we’ll notice Eager has sneaked in again!

==> +----------------+------+--------+-------------------------------+-----------------------------------------------------------+
==> |       Operator | Rows | DbHits |                   Identifiers |                                                     Other |
==> +----------------+------+--------+-------------------------------+-----------------------------------------------------------+
==> |    EmptyResult |    0 |      0 |                               |                                                           |
==> | UpdateGraph(0) |    0 |      0 |  order, product,   UNNAMED318 |                                              MergePattern |
==> |          Eager |    0 |      0 |                               |                                                           |
==> | UpdateGraph(1) |    0 |      0 | employee, order,   UNNAMED287 |                                              MergePattern |
==> |      Filter(0) |    0 |      0 |                               |    Property(product,productId) == Property(row,ProductID) |
==> | NodeByLabel(0) |    0 |      0 |              product, product |                                                  :Product |
==> |      Filter(1) |    0 |      0 |                               |          Property(order,orderId) == Property(row,OrderID) |
==> | NodeByLabel(1) |    0 |      0 |                  order, order |                                                    :Order |
==> |      Filter(2) |    0 |      0 |                               | Property(employee,employeeId) == Property(row,EmployeeID) |
==> | NodeByLabel(2) |    0 |      0 |            employee, employee |                                                 :Employee |
==> |          Slice |    0 |      0 |                               |                                              {  AUTOINT0} |
==> |        LoadCSV |    1 |      0 |                           row |                                                           |
==> +----------------+------+--------+-------------------------------+-----------------------------------------------------------+

In this case the Eager happens on our second call to MERGE as Michael identified in his post:

The issue is that within a single Cypher statement you have to isolate changes that affect matches further on, e.g. when you CREATE nodes with a label that are suddenly matched by a later MATCH or MERGE operation.

We can work around the problem in this case by having separate queries to create the relationships:

LOAD CSV WITH HEADERS FROM "file:/Users/markneedham/projects/neo4j-northwind/data/customerDb.csv" AS row
MATCH (employee:Employee {employeeId: row.EmployeeID})
MATCH (order:Order {orderId: row.OrderID})
MERGE (employee)-[:SOLD]->(order)
==> +----------------+------+--------+-------------------------------+-----------------------------------------------------------+
==> |       Operator | Rows | DbHits |                   Identifiers |                                                     Other |
==> +----------------+------+--------+-------------------------------+-----------------------------------------------------------+
==> |    EmptyResult |    0 |      0 |                               |                                                           |
==> |    UpdateGraph |    0 |      0 | employee, order,   UNNAMED236 |                                              MergePattern |
==> |      Filter(0) |    0 |      0 |                               |          Property(order,orderId) == Property(row,OrderID) |
==> | NodeByLabel(0) |    0 |      0 |                  order, order |                                                    :Order |
==> |      Filter(1) |    0 |      0 |                               | Property(employee,employeeId) == Property(row,EmployeeID) |
==> | NodeByLabel(1) |    0 |      0 |            employee, employee |                                                 :Employee |
==> |          Slice |    0 |      0 |                               |                                              {  AUTOINT0} |
==> |        LoadCSV |    1 |      0 |                           row |                                                           |
==> +----------------+------+--------+-------------------------------+-----------------------------------------------------------+
USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM "file:/Users/markneedham/projects/neo4j-northwind/data/customerDb.csv" AS row
MATCH (order:Order {orderId: row.OrderID})
MATCH (product:Product {productId: row.ProductID})
MERGE (order)-[:PRODUCT]->(product)
==> +----------------+------+--------+------------------------------+--------------------------------------------------------+
==> |       Operator | Rows | DbHits |                  Identifiers |                                                  Other |
==> +----------------+------+--------+------------------------------+--------------------------------------------------------+
==> |    EmptyResult |    0 |      0 |                              |                                                        |
==> |    UpdateGraph |    0 |      0 | order, product,   UNNAMED229 |                                           MergePattern |
==> |      Filter(0) |    0 |      0 |                              | Property(product,productId) == Property(row,ProductID) |
==> | NodeByLabel(0) |    0 |      0 |             product, product |                                               :Product |
==> |      Filter(1) |    0 |      0 |                              |       Property(order,orderId) == Property(row,OrderID) |
==> | NodeByLabel(1) |    0 |      0 |                 order, order |                                                 :Order |
==> |          Slice |    0 |      0 |                              |                                           {  AUTOINT0} |
==> |        LoadCSV |    1 |      0 |                          row |                                                        |
==> +----------------+------+--------+------------------------------+--------------------------------------------------------+
MERGE, SET

I try to make LOAD CSV scripts as idempotent as possible so that if we add more rows or columns of data to our CSV we can rerun the query without having to recreate everything.

This can lead you towards the following pattern where we’re creating suppliers:

USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM "file:/Users/markneedham/projects/neo4j-northwind/data/customerDb.csv" AS row
MERGE (supplier:Supplier {supplierId: row.SupplierID})
SET supplier.companyName = row.SupplierCompanyName

We want to ensure that there’s only one Supplier with that SupplierID but we might be incrementally adding new properties and decide to just replace everything by using the ‘SET’ command. If we profile that query, the Eager lurks:

==> +----------------+------+--------+--------------------+----------------------+
==> |       Operator | Rows | DbHits |        Identifiers |                Other |
==> +----------------+------+--------+--------------------+----------------------+
==> |    EmptyResult |    0 |      0 |                    |                      |
==> | UpdateGraph(0) |    0 |      0 |                    |          PropertySet |
==> |          Eager |    0 |      0 |                    |                      |
==> | UpdateGraph(1) |    0 |      0 | supplier, supplier | MergeNode; :Supplier |
==> |          Slice |    0 |      0 |                    |         {  AUTOINT0} |
==> |        LoadCSV |    1 |      0 |                row |                      |
==> +----------------+------+--------+--------------------+----------------------+

We can work around this at the cost of a bit of duplication using ‘ON CREATE SET’ and ‘ON MATCH SET':

USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM "file:/Users/markneedham/projects/neo4j-northwind/data/customerDb.csv" AS row
MERGE (supplier:Supplier {supplierId: row.SupplierID})
ON CREATE SET supplier.companyName = row.SupplierCompanyName
ON MATCH SET supplier.companyName = row.SupplierCompanyName
==> +-------------+------+--------+--------------------+----------------------+
==> |    Operator | Rows | DbHits |        Identifiers |                Other |
==> +-------------+------+--------+--------------------+----------------------+
==> | EmptyResult |    0 |      0 |                    |                      |
==> | UpdateGraph |    0 |      0 | supplier, supplier | MergeNode; :Supplier |
==> |       Slice |    0 |      0 |                    |         {  AUTOINT0} |
==> |     LoadCSV |    1 |      0 |                row |                      |
==> +-------------+------+--------+--------------------+----------------------+

With the data set I’ve been working with I was able to avoid OutOfMemory exceptions in some cases and reduce the amount of time it took to run the query by a factor of 3 in others.

As time goes on I expect all of these scenarios will be addressed but as of Neo4j 2.1.5 these are the patterns that I’ve identified as being overly eager.

If you know of any others do let me know and I can add them to the post or write a second part.

Categories: Programming

Neo4j: Modelling sub types

Tue, 10/21/2014 - 00:08

A question which sometimes comes up when discussing graph data modelling is how you go about modelling sub/super types.

In my experience there are two reasons why we might want to do this:

  • To ensure that certain properties exist on bits of data
  • To write drill down queries based on those types

At the moment the former isn’t built into Neo4j and you’d only be able to achieve it by wiring up some code in a pre commit hook of a transaction event handler so we’ll focus on the latter.

The typical example used for showing how to design sub types is the animal kingdom and I managed to find a data set from Louiseville, Kentucky’s Animal Services which we can use.

In this case the sub types are used to represent the type of animal, breed group and breed. We then also have ‘real data’ in terms of actual dogs under the care of animal services.

We effectively end up with two graphs in one – a model and a meta model:

2014 10 20 22 32 31

The cypher query to create this graph looks like this:

LOAD CSV WITH HEADERS FROM "file:/Users/markneedham/projects/neo4j-subtypes/data/dogs.csv" AS line
MERGE (animalType:AnimalType {name: "Dog"})
MERGE (breedGroup:BreedGroup {name: line.BreedGroup})
MERGE (breed:Breed {name: line.PrimaryBreed})
MERGE (animal:Animal {id: line.TagIdentity, primaryColour: line.PrimaryColour, size: line.Size})
 
MERGE (animalType)<-[:PARENT]-(breedGroup)
MERGE (breedGroup)<-[:PARENT]-(breed)
MERGE (breed)<-[:PARENT]-(animal)

We could then write a simple query to find out how many dogs we have:

MATCH (animalType:AnimalType)<-[:PARENT*]-(animal)
RETURN animalType, COUNT(*) AS animals
ORDER BY animals DESC
==> +--------------------------------+
==> | animalType           | animals |
==> +--------------------------------+
==> | Node[89]{name:"Dog"} | 131     |
==> +--------------------------------+
==> 1 row

Or we could write a slightly more complex query to find the number of animals at each level of our type hierarchy:

MATCH path = (animalType:AnimalType)<-[:PARENT]-(breedGroup)<-[:PARENT*]-(animal)
RETURN [node IN nodes(path) | node.name][..-1] AS breed, COUNT(*) AS animals
ORDER BY animals DESC
LIMIT 5
==> +-----------------------------------------------------+
==> | breed                                     | animals |
==> +-----------------------------------------------------+
==> | ["Dog","SETTER/RETRIEVE","LABRADOR RETR"] | 15      |
==> | ["Dog","SETTER/RETRIEVE","GOLDEN RETR"]   | 13      |
==> | ["Dog","POODLE","POODLE MIN"]             | 10      |
==> | ["Dog","TERRIER","MIN PINSCHER"]          | 9       |
==> | ["Dog","SHEPHERD","WELSH CORGI CAR"]      | 6       |
==> +-----------------------------------------------------+
==> 5 rows

We might then decide to add an exercise sub graph which indicates how much exercise each type of dog requires:

MATCH (breedGroup:BreedGroup)
WHERE breedGroup.name IN ["SETTER/RETRIEVE", "POODLE"]
MERGE (exercise:Exercise {type: "2 hours hard exercise"})
MERGE (exercise)<-[:REQUIRES_EXERCISE]-(breedGroup);
MATCH (breedGroup:BreedGroup)
WHERE breedGroup.name IN ["TERRIER", "SHEPHERD"]
MERGE (exercise:Exercise {type: "1 hour gentle exercise"})
MERGE (exercise)<-[:REQUIRES_EXERCISE]-(breedGroup);

We could then query that to find out which dogs need to come out for 2 hours of hard exercise:

MATCH (exercise:Exercise {type: "2 hours hard exercise"})<-[:REQUIRES_EXERCISE]-()<-[:PARENT*]-(dog)
WHERE NOT (dog)<-[:PARENT]-()
RETURN dog
LIMIT 10
==> +-----------------------------------------------------------+
==> | dog                                                       |
==> +-----------------------------------------------------------+
==> | Node[541]{id:"664427",primaryColour:"BLACK",size:"SMALL"} |
==> | Node[542]{id:"543787",primaryColour:"BLACK",size:"SMALL"} |
==> | Node[543]{id:"584021",primaryColour:"BLACK",size:"SMALL"} |
==> | Node[544]{id:"584022",primaryColour:"BLACK",size:"SMALL"} |
==> | Node[545]{id:"664430",primaryColour:"BLACK",size:"SMALL"} |
==> | Node[546]{id:"535176",primaryColour:"BLACK",size:"SMALL"} |
==> | Node[567]{id:"613557",primaryColour:"WHITE",size:"SMALL"} |
==> | Node[568]{id:"531376",primaryColour:"WHITE",size:"SMALL"} |
==> | Node[569]{id:"613567",primaryColour:"WHITE",size:"SMALL"} |
==> | Node[570]{id:"531379",primaryColour:"WHITE",size:"SMALL"} |
==> +-----------------------------------------------------------+
==> 10 rows

In this query we ensured that we only returned dogs rather than breeds by checking that there was no incoming PARENT relationship. Alternatively we could have filtered on the Animal label…

MATCH (exercise:Exercise {type: "2 hours hard exercise"})<-[:REQUIRES_EXERCISE]-()<-[:PARENT*]-(dog:Animal)
RETURN dog
LIMIT 10

or if we wanted to only take the dogs out for exercise perhaps we’d have Dog label on the appropriate nodes.

People are often curious why labels don’t have super/sub types between them but I tend to use labels for simple categorisation – anything more complicated and we may as well use the built in power of the graph model!

The code is on github should you wish to play with it.

Categories: Programming

Python: Converting a date string to timestamp

Mon, 10/20/2014 - 16:53

I’ve been playing around with Python over the last few days while cleaning up a data set and one thing I wanted to do was translate date strings into a timestamp.

I started with a date in this format:

date_text = "13SEP2014"

So the first step is to translate that into a Python date – the strftime section of the documentation is useful for figuring out which format code is needed:

import datetime
 
date_text = "13SEP2014"
date = datetime.datetime.strptime(date_text, "%d%b%Y")
 
print(date)
$ python dates.py
2014-09-13 00:00:00

The next step was to translate that to a UNIX timestamp. I thought there might be a method or property on the Date object that I could access but I couldn’t find one and so ended up using calendar to do the transformation:

import datetime
import calendar
 
date_text = "13SEP2014"
date = datetime.datetime.strptime(date_text, "%d%b%Y")
 
print(date)
print(calendar.timegm(date.utctimetuple()))
$ python dates.py
2014-09-13 00:00:00
1410566400

It’s not too tricky so hopefully I shall remember next time.

Categories: Programming

Neo4j: LOAD CSV – The sneaky null character

Sat, 10/18/2014 - 11:49

I spent some time earlier in the week trying to import a CSV file extracted from Hadoop into Neo4j using Cypher’s LOAD CSV command and initially struggled due to some rogue characters.

The CSV file looked like this:

$ cat foo.csv
foo,bar,baz
1,2,3

I wrote the following LOAD CSV query to extract some of the fields and compare others:

load csv with headers from "file:/Users/markneedham/Downloads/foo.csv" AS line
RETURN line.foo, line.bar, line.bar = "2"
==> +--------------------------------------+
==> | line.foo | line.bar | line.bar = "2" |
==> +--------------------------------------+
==> | <null>   | "2"     | false          |
==> +--------------------------------------+
==> 1 row

I had expect to see a “1” in the first column and a ‘true’ in the third column, neither of which happened.

I initially didn’t have a text editor with hexcode mode available so I tried checking the length of the entry in the ‘bar’ field:

load csv with headers from "file:/Users/markneedham/Downloads/foo.csv" AS line
RETURN line.foo, line.bar, line.bar = "2", length(line.bar)
==> +---------------------------------------------------------+
==> | line.foo | line.bar | line.bar = "2" | length(line.bar) |
==> +---------------------------------------------------------+
==> | <null>   | "2"     | false          | 2                |
==> +---------------------------------------------------------+
==> 1 row

The length of that value is 2 when we’d expect it to be 1 given it’s a single character.

I tried trimming the field to see if that made any difference…

load csv with headers from "file:/Users/markneedham/Downloads/foo.csv" AS line
RETURN line.foo, trim(line.bar), trim(line.bar) = "2", length(line.bar)
==> +---------------------------------------------------------------------+
==> | line.foo | trim(line.bar) | trim(line.bar) = "2" | length(line.bar) |
==> +---------------------------------------------------------------------+
==> | <null>   | "2"            | true                 | 2                |
==> +---------------------------------------------------------------------+
==> 1 row

…and it did! I thought there was probably a trailing whitespace character after the “2” which trim had removed and that ‘foo’ column in the header row had the same issue.

I was able to see that this was the case by extracting the JSON dump of the query via the Neo4j browser:

{  
   "table":{  
      "_response":{  
         "columns":[  
            "line"
         ],
         "data":[  
            {  
               "row":[  
                  {  
                     "foo\u0000":"1\u0000",
                     "bar":"2\u0000",
                     "baz":"3"
                  }
               ],
               "graph":{  
                  "nodes":[  
 
                  ],
                  "relationships":[  
 
                  ]
               }
            }
         ],
      ...
}

It turns out there were null characters scattered around the file so I needed to pre process the file to get rid of them:

$ tr < foo.csv -d '\000' > bar.csv

Now if we process bar.csv it’s a much smoother process:

load csv with headers from "file:/Users/markneedham/Downloads/bar.csv" AS line
RETURN line.foo, line.bar, line.bar = "2", length(line.bar)
==> +---------------------------------------------------------+
==> | line.foo | line.bar | line.bar = "2" | length(line.bar) |
==> +---------------------------------------------------------+
==> | "1"      | "2"      | true           | 1                |
==> +---------------------------------------------------------+
==> 1 row

Note to self: don’t expect data to be clean, inspect it first!

Categories: Programming