Skip to content

Software Development Blogs: Programming, Software Testing, Agile Project Management

Methods & Tools

Subscribe to Methods & Tools
if you are not afraid to read more than one page to be a smarter software developer, software tester or project manager!

Mark Needham
Syndicate content
Thoughts on Software Development
Updated: 2 hours 56 min ago

Spark: Parse CSV file and group by column value

Sun, 11/16/2014 - 23:53

I’ve found myself working with large CSV files quite frequently and realising that my existing toolset didn’t let me explore them quickly I thought I’d spend a bit of time looking at Spark to see if it could help.

I’m working with a crime data set released by the City of Chicago: it’s 1GB in size and contains details of 4 million crimes:

$ ls -alh ~/Downloads/Crimes_-_2001_to_present.csv
-rw-r--r--@ 1 markneedham  staff   1.0G 16 Nov 12:14 /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv
 
$ wc -l ~/Downloads/Crimes_-_2001_to_present.csv
 4193441 /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv

We can get a rough idea of the contents of the file by looking at the first row along with the header:

$ head -n 2 ~/Downloads/Crimes_-_2001_to_present.csv
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,"(41.75017626412204, -87.55494559131228)"

I wanted to do a count of the ‘Primary Type’ column to see how many of each crime we have. Using just Unix command line tools this is how we’d do that:

$ time tail +2 ~/Downloads/Crimes_-_2001_to_present.csv | cut -d, -f6  | sort | uniq -c | sort -rn
859197 THEFT
757530 BATTERY
489528 NARCOTICS
488209 CRIMINAL DAMAGE
257310 BURGLARY
253964 OTHER OFFENSE
247386 ASSAULT
197404 MOTOR VEHICLE THEFT
157706 ROBBERY
137538 DECEPTIVE PRACTICE
124974 CRIMINAL TRESPASS
47245 PROSTITUTION
40361 WEAPONS VIOLATION
31585 PUBLIC PEACE VIOLATION
26524 OFFENSE INVOLVING CHILDREN
14788 CRIM SEXUAL ASSAULT
14283 SEX OFFENSE
10632 GAMBLING
8847 LIQUOR LAW VIOLATION
6443 ARSON
5178 INTERFERE WITH PUBLIC OFFICER
4846 HOMICIDE
3585 KIDNAPPING
3147 INTERFERENCE WITH PUBLIC OFFICER
2471 INTIMIDATION
1985 STALKING
 355 OFFENSES INVOLVING CHILDREN
 219 OBSCENITY
  86 PUBLIC INDECENCY
  80 OTHER NARCOTIC VIOLATION
  12 RITUALISM
  12 NON-CRIMINAL
   6 OTHER OFFENSE
   2 NON-CRIMINAL (SUBJECT SPECIFIED)
   2 NON - CRIMINAL
 
real	2m37.495s
user	3m0.337s
sys	0m1.471s

This isn’t too bad but it seems like the type of calculation that Spark is made for so I had a look at how I could go about doing that. To start with I created an SBT project with the following build file:

name := "playground"
 
version := "1.0"
 
scalaVersion := "2.10.4"
 
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"
 
libraryDependencies += "net.sf.opencsv" % "opencsv" % "2.3"
 
ideaExcludeFolders += ".idea"
 
ideaExcludeFolders += ".idea_modules"

I downloaded Spark and after unpacking it launched the Spark shell:

$ pwd
/Users/markneedham/projects/spark-play/spark-1.1.0/spark-1.1.0-bin-hadoop1
 
$ ./bin/spark-shell
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.1.0
      /_/
 
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51)
...
Spark context available as sc.
 
scala>

I first import some classes I’m going to need:

scala> import au.com.bytecode.opencsv.CSVParser
import au.com.bytecode.opencsv.CSVParser
 
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

Now, following the quick start example, we’ll create a Resilient Distributed Dataset (RDD) from our Crime CSV file:

scala> val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
crimeFile: String = /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv
 
scala> val crimeData = sc.textFile(crimeFile).cache()
14/11/16 22:31:16 INFO MemoryStore: ensureFreeSpace(32768) called with curMem=0, maxMem=278302556
14/11/16 22:31:16 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 265.4 MB)
crimeData: org.apache.spark.rdd.RDD[String] = /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv MappedRDD[1] at textFile at <console>:17

Our next step is to process each line of the file using our CSV Parser. A simple way to do this would be to create a new CSVParser for each line:

scala> crimeData.map(line => {
         val parser = new CSVParser(',')
         parser.parseLine(line).mkString(",")
       }).take(5).foreach(println)
14/11/16 22:35:49 INFO SparkContext: Starting job: take at <console>:23
...
4/11/16 22:35:49 INFO SparkContext: Job finished: take at <console>:23, took 0.013904 s
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,(41.75017626412204, -87.55494559131228)
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,(41.729576153145636, -87.57568059471686)
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,(41.884543798701515, -87.72803579358926)
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,(41.785633535413176, -87.74148516669783)

That works but it’s a bit wasteful to create a new CSVParser each time so instead let’s just create one for each partition that Spark splits our file up into:

scala> crimeData.mapPartitions(lines => {
         val parser = new CSVParser(',')
         lines.map(line => {
           parser.parseLine(line).mkString(",")
         })
       }).take(5).foreach(println)
14/11/16 22:38:44 INFO SparkContext: Starting job: take at <console>:25
...
14/11/16 22:38:44 INFO SparkContext: Job finished: take at <console>:25, took 0.015216 s
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,(41.75017626412204, -87.55494559131228)
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,(41.729576153145636, -87.57568059471686)
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,(41.884543798701515, -87.72803579358926)
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,(41.785633535413176, -87.74148516669783)

You’ll notice that we’ve still got the header being printed which isn’t ideal – let’s get rid of it!

I expected there to be a ‘drop’ function which would allow me to do that but in fact there isn’t. Instead we can make use of our knowledge that the first partition will contain the first line and strip it out that way:

scala> def dropHeader(data: RDD[String]): RDD[String] = {
         data.mapPartitionsWithIndex((idx, lines) => {
           if (idx == 0) {
             lines.drop(1)
           }
           lines
         })
       }
dropHeader: (data: org.apache.spark.rdd.RDD[String])org.apache.spark.rdd.RDD[String]

Now let’s grab the first 5 lines again and print them out:

scala> val withoutHeader: RDD[String] = dropHeader(crimeData)
withoutHeader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:14
 
scala> withoutHeader.mapPartitions(lines => {
         val parser = new CSVParser(',')
         lines.map(line => {
           parser.parseLine(line).mkString(",")
         })
       }).take(5).foreach(println)
14/11/16 22:43:27 INFO SparkContext: Starting job: take at <console>:29
...
14/11/16 22:43:27 INFO SparkContext: Job finished: take at <console>:29, took 0.018557 s
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,(41.75017626412204, -87.55494559131228)
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,(41.729576153145636, -87.57568059471686)
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,(41.884543798701515, -87.72803579358926)
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,(41.785633535413176, -87.74148516669783)
9460355,HX113738,01/14/2014 04:21:00 AM,070XX S PEORIA ST,0820,THEFT,$500 AND UNDER,STREET,true,false,0733,007,17,68,06,1171480,1858195,2014,01/16/2014 12:40:00 AM,41.766348042591375,-87.64702037047671,(41.766348042591375, -87.64702037047671)

We’re finally in good shape to extract the values from the ‘Primary Type’ column and count how many times each of those appears in our data set:

scala> withoutHeader.mapPartitions(lines => {
         val parser=new CSVParser(',')
         lines.map(line => {
           val columns = parser.parseLine(line)
           Array(columns(5)).mkString(",")
         })
       }).countByValue().toList.sortBy(-_._2).foreach(println)
14/11/16 22:45:20 INFO SparkContext: Starting job: countByValue at <console>:30
14/11/16 22:45:20 INFO DAGScheduler: Got job 7 (countByValue at <console>:30) with 32 output partitions (allowLocal=false)
...
14/11/16 22:45:30 INFO SparkContext: Job finished: countByValue at <console>:30, took 9.796565 s
(THEFT,859197)
(BATTERY,757530)
(NARCOTICS,489528)
(CRIMINAL DAMAGE,488209)
(BURGLARY,257310)
(OTHER OFFENSE,253964)
(ASSAULT,247386)
(MOTOR VEHICLE THEFT,197404)
(ROBBERY,157706)
(DECEPTIVE PRACTICE,137538)
(CRIMINAL TRESPASS,124974)
(PROSTITUTION,47245)
(WEAPONS VIOLATION,40361)
(PUBLIC PEACE VIOLATION,31585)
(OFFENSE INVOLVING CHILDREN,26524)
(CRIM SEXUAL ASSAULT,14788)
(SEX OFFENSE,14283)
(GAMBLING,10632)
(LIQUOR LAW VIOLATION,8847)
(ARSON,6443)
(INTERFERE WITH PUBLIC OFFICER,5178)
(HOMICIDE,4846)
(KIDNAPPING,3585)
(INTERFERENCE WITH PUBLIC OFFICER,3147)
(INTIMIDATION,2471)
(STALKING,1985)
(OFFENSES INVOLVING CHILDREN,355)
(OBSCENITY,219)
(PUBLIC INDECENCY,86)
(OTHER NARCOTIC VIOLATION,80)
(NON-CRIMINAL,12)
(RITUALISM,12)
(OTHER OFFENSE ,6)
(NON-CRIMINAL (SUBJECT SPECIFIED),2)
(NON - CRIMINAL,2)

We get the same results as with the Unix commands except it took less than 10 seconds to calculate which is pretty cool!

Categories: Programming

R: dplyr – Sum for group_by multiple columns

Tue, 11/11/2014 - 01:17

Over the weekend I was playing around with dplyr and had the following data frame grouped by both columns:

> library(dplyr)
 
> data = data.frame(
    letter = sample(LETTERS, 50000, replace = TRUE),
    number = sample (1:10, 50000, replace = TRUE)
    )
 
> data %>% count(letter, number) %>% head(20)
Source: local data frame [20 x 3]
Groups: letter
 
   letter number   n
1       A      1 184
2       A      2 192
3       A      3 183
4       A      4 193
5       A      5 214
6       A      6 172
7       A      7 196
8       A      8 198
9       A      9 174
10      A     10 196
11      B      1 212
12      B      2 198
13      B      3 194
14      B      4 181
15      B      5 203
16      B      6 234
17      B      7 221
18      B      8 179
19      B      9 182
20      B     10 170

I wanted to add an extra column which would show what percentage of the values for that letter each number had.

If we wrote that code standalone we’d have the following:

> data %>% count(letter)
Source: local data frame [26 x 2]
 
   letter    n
1       A 1902
2       B 1974
3       C 1911
4       D 1948
5       E 1888
6       F 1975
7       G 1914
8       H 1886
9       I 1910
10      J 1924
11      K 1974
12      L 1891
13      M 1894
14      N 1946
15      O 1956
16      P 1934
17      Q 1865
18      R 1992
19      S 1946
20      T 1854
21      U 1919
22      V 1913
23      W 1928
24      X 1934
25      Y 1908
26      Z 1914

We can also get that value by summing up the individual (letter, number) pairs from the previous data frame. The ungroup function allows us to do so:

> data %>% count(letter, number) %>% ungroup %>% group_by(letter) %>% mutate(sum.n = sum(n)) %>% head(20)
Source: local data frame [20 x 4]
Groups: letter
 
   letter number   n sum.n
1       A      1 184  1902
2       A      2 192  1902
3       A      3 183  1902
4       A      4 193  1902
5       A      5 214  1902
6       A      6 172  1902
7       A      7 196  1902
8       A      8 198  1902
9       A      9 174  1902
10      A     10 196  1902
11      B      1 212  1974
12      B      2 198  1974
13      B      3 194  1974
14      B      4 181  1974
15      B      5 203  1974
16      B      6 234  1974
17      B      7 221  1974
18      B      8 179  1974
19      B      9 182  1974
20      B     10 170  1974

Pretty neat!

Categories: Programming

R: dplyr – Maximum value row in each group

Mon, 11/10/2014 - 23:06

In my continued work with R’s dplyr I wanted to be able to group a data frame by some columns and then find the maximum value for each group.

We’ll continue with my favourite dummy data set:

> library(dplyr)
 
> data = data.frame(
    letter = sample(LETTERS, 50000, replace = TRUE),
    number = sample (1:10, 50000, replace = TRUE)
    )

I started with the following code to count how many occurrences of each (letter, number) pair there were:

> data %>% count(letter, number)
Source: local data frame [260 x 3]
Groups: letter
 
   letter number   n
1       A      1 184
2       A      2 192
3       A      3 183
4       A      4 193
5       A      5 214
6       A      6 172
7       A      7 196
8       A      8 198
9       A      9 174
10      A     10 196
..    ...    ... ...

I wanted to find the top number for each letter and with a bit of help from Stack Overflow I ended up with the following:

> data %>% count(letter, number) %>% filter(n == max(n))
Source: local data frame [26 x 3]
Groups: letter
 
   letter number   n
1       A      5 214
2       B      6 234
3       C      8 213
4       D      6 211
5       E      9 208
6       F      2 219
7       G      1 213
8       H      2 208
9       I      9 220
10      J      7 213
11      K      3 228
12      L      2 206
13      M      3 212
14      N      4 222
15      O      1 211
16      P      7 211
17      Q      9 210
18      R      5 224
19      S      2 211
20      T      9 204
21      U      8 217
22      V      6 220
23      W      2 213
24      X      2 214
25      Y      3 211
26      Z      3 206

Here we’re filtering over a collection of rows grouped by letter and only selecting the row which has the max value. We can see more clearly what’s happening if we filter the data frame to only contain one letter:

> letterA = data %>% filter(letter == "A") %>% count(letter, number)
> letterA
Source: local data frame [10 x 3]
Groups: letter
 
   letter number   n
1       A      1 184
2       A      2 192
3       A      3 183
4       A      4 193
5       A      5 214
6       A      6 172
7       A      7 196
8       A      8 198
9       A      9 174
10      A     10 196

And now let’s apply the filter command while also printing out information about each row:

> letterA %>% filter(print(n), print(n == max(n)), n == max(n))
 [1] 184 192 183 193 214 172 196 198 174 196
 [1] FALSE FALSE FALSE FALSE  TRUE FALSE FALSE FALSE FALSE FALSE
Source: local data frame [1 x 3]
Groups: letter
 
  letter number   n
1      A      5 214

If instead of getting the top number by letter we wanted to get the top letter by number we just need to reorder the field names in the count method:

> data %>% count(number, letter) %>% filter(n == max(n))
Source: local data frame [10 x 3]
Groups: number
 
   number letter   n
1       1      G 213
2       2      F 219
3       3      K 228
4       4      N 222
5       5      R 224
6       6      B 234
7       7      B 221
8       8      U 217
9       9      I 220
10     10      O 209

And if we want to see the letter that shows up least frequently we can change the call to ‘max’ to an equivalent one to ‘min':

> data %>% count(number, letter) %>% filter(n == min(n))
Source: local data frame [11 x 3]
Groups: number
 
   number letter   n
1       1      H 166
2       2      O 160
3       3      E 156
4       4      R 169
5       5      L 169
6       6      I 164
7       7      H 170
8       7      I 170
9       8      Q 166
10      9      W 162
11     10      J 168
Categories: Programming