Warning: Table './devblogsdb/cache_page' is marked as crashed and last (automatic?) repair failed query: SELECT data, created, headers, expire, serialized FROM cache_page WHERE cid = 'http://www.softdevblogs.com/?q=aggregator/sources/23' in /home/content/O/c/n/Ocnarfparking9/html/softdevblogs/includes/database.mysql.inc on line 135

Warning: Cannot modify header information - headers already sent by (output started at /home/content/O/c/n/Ocnarfparking9/html/softdevblogs/includes/database.mysql.inc:135) in /home/content/O/c/n/Ocnarfparking9/html/softdevblogs/includes/bootstrap.inc on line 729

Warning: Cannot modify header information - headers already sent by (output started at /home/content/O/c/n/Ocnarfparking9/html/softdevblogs/includes/database.mysql.inc:135) in /home/content/O/c/n/Ocnarfparking9/html/softdevblogs/includes/bootstrap.inc on line 730

Warning: Cannot modify header information - headers already sent by (output started at /home/content/O/c/n/Ocnarfparking9/html/softdevblogs/includes/database.mysql.inc:135) in /home/content/O/c/n/Ocnarfparking9/html/softdevblogs/includes/bootstrap.inc on line 731

Warning: Cannot modify header information - headers already sent by (output started at /home/content/O/c/n/Ocnarfparking9/html/softdevblogs/includes/database.mysql.inc:135) in /home/content/O/c/n/Ocnarfparking9/html/softdevblogs/includes/bootstrap.inc on line 732
Software Development Blogs: Programming, Software Testing, Agile, Project Management
Skip to content

Software Development Blogs: Programming, Software Testing, Agile Project Management

Methods & Tools

Subscribe to Methods & Tools
if you are not afraid to read more than one page to be a smarter software developer, software tester or project manager!

Mark Needham
warning: Cannot modify header information - headers already sent by (output started at /home/content/O/c/n/Ocnarfparking9/html/softdevblogs/includes/database.mysql.inc:135) in /home/content/O/c/n/Ocnarfparking9/html/softdevblogs/includes/common.inc on line 153.
Syndicate content
Thoughts on Software Development
Updated: 1 hour 20 min ago

Clojure: First steps with reducers

Sun, 01/24/2016 - 23:01

I’ve been playing around with Clojure a bit today in preparation for a talk I’m giving next week and found myself writing the following code to apply the same function to three different scores:

(defn log2 [n]
  (/ (Math/log n) (Math/log 2)))
 
(defn score-item [n]
  (if (= n 0) 0 (log2 n)))
 
(+ (score-item 12) (score-item 13) (score-item 5)) 9.60733031374961

I’d forgotten about folding over a collection but quickly remembered that I could achieve the same result with the following code:

(reduce #(+ %1 (score-item %2)) 0 [12 13 5]) 9.60733031374961

The added advantage here is that if I want to add a 4th score to the mix all I need to do is append it to the end of the vector:

(reduce #(+ %1 (score-item %2)) 0 [12 13 5 6]) 12.192292814470767

However, while Googling to remind myself of the order of the arguments to reduce I kept coming across articles and documentation about reducers which I’d heard about but never used.

As I understand they’re used to achieve performance gains and easier composition of functions over collections so I’m not sure how useful they’ll be to me but I thought I’d give them a try.

Our first step is to bring the namespace into scope:

(require '[clojure.core.reducers :as r])

Now we can compute the same result using the reduce function:

(r/reduce #(+ %1 (score-item %2)) 0 [12 13 5 6]) 12.192292814470767

So far, so identical. If we wanted to calculate individual scores and then filter out those below a certain threshold the code would behave a little differently:

(->>[12 13 5 6]
    (map score-item)
    (filter #(> % 3))) (3.5849625007211565 3.700439718141092)
 
(->> [12 13 5 6]
     (r/map score-item)
     (r/filter #(> % 3))) #object[clojure.core.reducers$folder$reify__19192 0x5d0edf21 "clojure.core.reducers$folder$reify__19192@5d0edf21"]

Instead of giving us a vector of scores the reducers version returns a reducer which can pass into reduce or fold if we want an accumulated result or into if we want to output a collection. In this case we want the latter:

(->> [12 13 5 6]
     (r/map score-item)
     (r/filter #(> % 3))
     (into [])) (3.5849625007211565 3.700439718141092)

With a measly 4 item collection I don’t think the reducers are going to provide much speed improvement here but we’d need to use the fold function if we want processing of the collection to be done in parallel.

One for next time!

Categories: Programming

Neo4j: Cypher – avoid duplicate calls to NOT patterns

Sun, 01/17/2016 - 13:19

I’ve been reacquainting myself with the meetup.com dataset ahead of Wednesday’s meetup in London and wanted to write a collaborative filtering type query to work out which groups people in my groups were in.

This started simple enough:

MATCH (member:Member {name: "Mark Needham"})-[:MEMBER_OF]->(group:Group)<-[:MEMBER_OF]-(other:Member)-[:MEMBER_OF]->(otherGroup:Group)
RETURN otherGroup, COUNT(*) AS commonMembers
ORDER BY commonMembers DESC
LIMIT 5

And doesn’t take too long to run:

Cypher version: CYPHER 2.3, planner: COST. 1084378 total db hits in 1103 ms.

However, it was showing up several groups that I’m already a member of so I added in a “WHERE NOT” clause to sort that out:

MATCH (member:Member {name: "Mark Needham"})-[:MEMBER_OF]->(group:Group)<-[:MEMBER_OF]-(other:Member)-[:MEMBER_OF]->(otherGroup:Group)
WHERE NOT (member)-[:MEMBER_OF]->(otherGroup)
RETURN otherGroup, COUNT(*) AS commonMembers
ORDER BY commonMembers DESC
LIMIT 5

Unfortunately when I ran this the amount of db hits increased by 14x and it now took 3x as long to run:

Cypher version: CYPHER 2.3, planner: COST. 14061442 total db hits in 3364 ms.


The problem is that we’re making lots of duplicate calls to NOT (member)-[:MEMBER_OF]->(otherGroup) because each group shows up lots of times.

This is the ‘reduce cardinality of work in progress’ tip from Michael Hunger’s blog post:

Bonus Query Tuning Tip: Reduce Cardinality of Work in Progress

When following longer paths, you’ll encounter duplicates. If you’re not interested in all the possible paths – but just distinct information from stages of the path – make sure that you eagerly eliminate duplicates, so that later matches don’t have to be executed many multiple times.

We can reduce the WIP in our query by doing the counting of common members first and then filtering out the groups we’re already a member of:

MATCH (member:Member {name: "Mark Needham"})-[:MEMBER_OF]->(group:Group)<-[:MEMBER_OF]-(other:Member)-[:MEMBER_OF]->(otherGroup:Group)
WITH otherGroup, member, COUNT(*) AS commonMembers
WHERE NOT (member)-[:MEMBER_OF]->(otherGroup)
RETURN otherGroup, commonMembers
ORDER BY commonMembers DESC
LIMIT 5

This gets us back down to something closer to the running time/db hits of our initial query:

Cypher version: CYPHER 2.3, planner: COST. 1097114 total db hits in 1004 ms.
Categories: Programming

2015: A year in the life of the Neo4j London meetup group

Thu, 12/31/2015 - 14:58

Given we’ve only got a few more hours left of 2015 I thought it’d be fun to do a quick overview of how things have been going in the London chapter of the Neo4j meetup using Neo4j with a bit of R mixed in.

We’re going to be using the RNeo4j library to interact with the database along with a few other libraries which will help us out with different tasks:

library(RNeo4j)
library(ggplot2)
library(dplyr)
library(zoo)
 
graph = startGraph("http://localhost:7474/db/data/", username = "neo4j", password = "myPassword")

Let’s get to it:

Members
query = "MATCH (:Group {name: {name}})<-[membership:MEMBER_OF]-()
         RETURN membership.joined AS timestamp"
 
joinedDF = cypher(graph, query, name = "Neo4j - London User Group")
joinedDF$joinDate = as.Date(as.POSIXct(joinedDF$timestamp / 1000, origin="1970-01-01"))
joinedDF$joinDate = as.Date(as.POSIXct(joinedDF$timestamp / 1000, origin="1970-01-01"))
 
ggplot(aes(x = year, y = n, label = n), 
       data = joinedDF %>% mutate(year = format(joinDate, "%Y")) %>% count(year)) + 
  geom_bar(stat = "identity", fill = "Dark Blue") + 
  ggtitle("Number of new members by year") +
  geom_text(vjust=-0.5)
2015 12 31 12 23 06

A bit down on 2014 but not too far away. We’re still attracting new people who are interested in learning about graphs. Let’s drill into those numbers a bit:

byYearMon = joinedDF %>% 
  filter(format(joinDate, "%Y") == 2015) %>% 
  mutate(yearmon = as.Date(as.yearmon(joinDate))) %>% 
  count(yearmon)
 
ggplot(aes(x = yearmon, y = n, label = n), data = byYearMon) + 
  geom_bar(stat = "identity", fill = "Dark Blue") + 
  theme(axis.text.x = element_text(angle = 90, hjust = 1)) +
  scale_x_date(labels = date_format("%B"), breaks = "1 month") +
  ggtitle("Number of new members by month/year")
2015 12 31 12 39 04

We had a bit of an end of year surge in October/November which was unexpected. December has been low in previous years, there was an April dip which I think is because we stopped doing events before Graph Connect 2015. I’m not sure about the September dip so let’s have a look:

eventsQuery = "MATCH (:Group {name: {name}})-[:HOSTED_EVENT]->(event)
               RETURN event.time + event.utcOffset AS timestamp"
eventsDF = cypher(graph, eventsQuery, name = "Neo4j - London User Group")
eventsDF$timestamp = as.Date(as.POSIXct(eventsDF$timestamp / 1000, origin="1970-01-01"))
 
eventsByYearMon = eventsDF %>% 
  filter(format(timestamp, "%Y") == 2015) %>% 
  mutate(yearmon = as.Date(as.yearmon(timestamp))) %>% 
  count(yearmon) 
 
merge(eventsByYearMon, byYearMon, by="yearmon")
 
      yearmon n.x n.y
1  2015-01-01   3  80
2  2015-02-01   6  76
3  2015-03-01   2  70
4  2015-04-01   2  53
5  2015-05-01   4  78
6  2015-06-01   5  83
7  2015-07-01   3  73
8  2015-08-01   5  73
9  2015-09-01   3  40
10 2015-10-01   3  94
11 2015-11-01   4 117
12 2015-12-01   3  48

At first glance there doesn’t seem to be any correlation between the number of events held and the number of new members so I think we’ll have to look for another predictor of that variable!

Events

Next let’s have a look at the events we ran in 2015. We’ll start with a quick chart showing the number of events we’ve run over the years:

ggplot(aes(x = year, y = n, label = n), data = eventsDF %>% mutate(year = format(timestamp, "%Y")) %>% count(year)) + 
  geom_bar(stat = "identity", fill = "Dark Blue") + 
  theme(axis.text.x = element_text(angle = 90, hjust = 1)) +
  ggtitle("Number of events")

2015 12 31 13 43 15

So less events than last year but how many people RSVPD ‘yes’ to the ones we did host?

eventsQuery = "MATCH (:Group {name: {name}})-[:HOSTED_EVENT]->(event)<-[:RSVPD {response: 'yes'}]-()
               WHERE event.time + event.utcOffset < timestamp()
               WITH event, COUNT(*) AS rsvps
               RETURN event.time + event.utcOffset AS timestamp, rsvps"
eventsDF = cypher(graph, eventsQuery, name = "Neo4j - London User Group")
eventsDF$timestamp = as.Date(as.POSIXct(eventsDF$timestamp / 1000, origin="1970-01-01"))
 
ggplot(aes(x = year, y = rsvps), 
       data = eventsDF %>% mutate(year = format(timestamp, "%Y")) %>% group_by(year) %>% summarise(rsvps= sum(rsvps)) ) + 
  geom_bar(stat = "identity", fill = "Dark Blue") + 
  theme(axis.text.x = element_text(angle = 90, hjust = 1)) +
  ggtitle("Number of attendees")
2015 12 31 13 54 50

Slightly more ‘yes’ RSVPs than last year. Now let’s drill into the repeat events we ran this year:

eventsQuery = "MATCH (:Group {name: {name}})-[:HOSTED_EVENT]->(event)
               WHERE {startYear} <= (event.time + event.utcOffset) < {endYear}
               RETURN event.name AS event, COUNT(*) AS times
               ORDER BY times DESC"
eventsDF = cypher(graph, eventsQuery, name = "Neo4j - London User Group", 
                  startYear  = as.numeric(as.POSIXct("2015-01-01", format="%Y-%m-%d")) * 1000, 
                  endYear = as.numeric(as.POSIXct("2015-12-31", format="%Y-%m-%d")) * 1000)
eventsDF %>% filter(times > 1)
 
                                                       event times
1                      Relational to graph: A worked example     7
2                                            Intro to Graphs     6
3                          Graph Modelling - Do's and Don'ts     5
4          Hands On Intro to Cypher - Neo4j's Query Language     3
5 Build your own recommendation engine with Neo4j in an hour     2
6                                Fraud Detection using Neo4j     2

I thought we’d run ‘Intro to Graphs’ most often but the data doesn’t lie – it’s all about relational to graph. And which were the most popular repeat events in terms of ‘yes’ RSVPs?

eventsQuery = "MATCH (:Group {name: {name}})-[:HOSTED_EVENT]->(event)
               WHERE {startYear} <= (event.time + event.utcOffset) < {endYear}
               MATCH (event)<-[:RSVPD {response: 'yes'}]-()
               WITH event, COUNT(*) AS yesRSVPs
               WITH event.name AS event, COUNT(*) AS times, SUM(yesRSVPs) AS rsvps
               RETURN event, times, rsvps, rsvps / times AS rsvpsPerEvent
               ORDER BY rsvpsPerEvent DESC"
eventsDF = cypher(graph, eventsQuery, name = "Neo4j - London User Group", 
                  startYear  = as.numeric(as.POSIXct("2015-01-01", format="%Y-%m-%d")) * 1000, 
                  endYear = as.numeric(as.POSIXct("2015-12-31", format="%Y-%m-%d")) * 1000)
eventsDF %>% filter(times > 1)
 
                                                       event times rsvps rsvpsPerEvent
1                                Fraud Detection using Neo4j     2   150            75
2                                            Intro to Graphs     6   352            58
3                          Graph Modelling - Do's and Don'ts     5   281            56
4                      Relational to graph: A worked example     7   367            52
5 Build your own recommendation engine with Neo4j in an hour     2    85            42
6          Hands On Intro to Cypher - Neo4j's Query Language     3   104            34

It looks like fraud is a popular topic although we’ve only run it twice so perhaps best not to read too much into that. We’re running that one again in a couple of weeks if you’re interested.

Ignoring repeat events let’s see which event drew the biggest crowd:

eventsQuery = "MATCH (:Group {name: {name}})-[:HOSTED_EVENT]->(event)
               WHERE {startYear} <= (event.time + event.utcOffset) < {endYear}
               MATCH (event)<-[:RSVPD {response: 'yes'}]-()
               WITH event.id AS id, event.name AS event, COUNT(*) AS rsvps
               RETURN event, rsvps
               ORDER BY rsvps DESC"
eventsDF = cypher(graph, eventsQuery, name = "Neo4j - London User Group", 
                  startYear  = as.numeric(as.POSIXct("2015-01-01", format="%Y-%m-%d")) * 1000, 
                  endYear = as.numeric(as.POSIXct("2015-12-31", format="%Y-%m-%d")) * 1000)
eventsDF %>% head(5)
 
                                                                         event rsvps
1 Neo4j Full Stack Applications + Python, R and Neo4j - The Data Science Stack   133
2                          Modelling a recommendation engine: A worked example   118
3                    Building a repository of biomedical ontologies with Neo4j   107
4                     GraphHack @ Graph Connect: The night before Election Day    91
5                                        Bootstrapping a Recommendation Engine    88

A double header featuring Nicole White and Matt Wright proved to be the most popular event of the year and in fact the most popular in terms of ‘yes’ RSVPs so far:

eventsQuery = "MATCH (:Group {name: {name}})-[:HOSTED_EVENT]->(event)<-[:RSVPD {response: 'yes'}]-()
               WITH event, COUNT(*) AS rsvps
               RETURN event.name AS event, event.time + event.utcOffset AS time, rsvps
               ORDER BY rsvps DESC"
eventsDF = cypher(graph, eventsQuery, name = "Neo4j - London User Group")
eventsDF$time = as.Date(as.POSIXct(eventsDF$time / 1000, origin="1970-01-01"))
eventsDF %>% mutate(year = format(time, "%Y")) %>% dplyr::select(-time) %>% head(10)
 
                                                                          event rsvps year
1  Neo4j Full Stack Applications + Python, R and Neo4j - The Data Science Stack   133 2015
2                           Modelling a recommendation engine: A worked example   118 2015
3                     Building a repository of biomedical ontologies with Neo4j   107 2015
4                                                    Real world Neo4j use cases    98 2014
5                                                           The transport graph    94 2014
6                                                     The Visualisation Special    93 2014
7                  Impossible is Nothing by Jim Webber, Neo4j's Chief Scientist    93 2014
8                      GraphHack @ Graph Connect: The night before Election Day    91 2015
9                                         Bootstrapping a Recommendation Engine    88 2015
10                                    Scraping and Graphing the Apple app store    88 2015

3 of the top 4 belong to 2015 and 6 of the top 10. Let’s see what 2016 has in store.

Thanks to everyone who’s come along to one of our meetups and Happy New Year!

Categories: Programming

Study until your mind wanders

Thu, 12/31/2015 - 11:47

I’ve previously found it very difficult to read math heavy content which has made it challenging to read Distributed Computing which I bought last May.

After several false starts where I gave up after getting frustrated that I couldn’t understand things the first time around and forgot everything if I left it a couple of days I decided to try again with a different approach.

I’ve been trying a technique I learned from Mini Habits where every day I have a (very small) goal of reading one page of the book. Having such a small goal means that I can read the material as slowly as I like (repeating previous days if necessary).

So far I’ve read 4 chapters (~100 pages) over the last month – some days I read 6 or 7 pages, other days I only manage one. The key is to keep the rhythm of reading something.

I tried doing the reading at different times of the day – on the bus on the way to work, in the evening before going to sleep – and found that for me the best time is immediately when I wake up. To minimise my mind wandering I don’t read any emails, chat messages or social media accounts before I start.

Despite this, I’ve noticed that after a while my mind starts to wander while reading proofs and that’s a signal for me to stop for the day. When I pick up again the next day I’ve often found that I understand what I was having difficulty with.

I’ve read that meditation prior to studying is an effective way to quiet the mind and would be a more ‘on demand’ way of achieving the concentration required to read this type of material. I’ve never done any meditation so if anyone has any tips on where to start that’d be helpful.

Categories: Programming

R: Error in approxfun(x.values.1, y.values.1, method = “constant”, f = 1, : zero non-NA points

Sun, 12/27/2015 - 13:24

I’ve been following Michy Alice’s logistic regression tutorial to create an attendance model for London dev meetups and ran into an interesting problem while doing so.

Our dataset has a class imbalance i.e. most people RSVP ‘no’ to events which can lead to misleading accuracy score where predicting ‘no’ every time would lead to supposed high accuracy.

Source: local data frame [2 x 2]
 
  attended     n
     (dbl) (int)
1        0  1541
2        1    53

I sampled the data using caret‘s upSample function to avoid this:

attended = as.factor((df %>% dplyr::select(attended))$attended)
upSampledDf = upSample(df %>% dplyr::select(-attended), attended)
upSampledDf$attended = as.numeric(as.character(upSampledDf$Class))

I then trained a logistic regression model but when I tried to plot the area under the curve I ran into trouble:

p <- predict(model, newdata=test, type="response")
pr <- prediction(p, test$attended)
prf <- performance(pr, measure = "tpr", x.measure = "fpr")
 
Error in approxfun(x.values.1, y.values.1, method = "constant", f = 1,  : 
  zero non-NA points

I don’t have any NA values in my data frame so this message was a bit confusing to start with. As usual Stack Overflow came to the rescue with the suggestion that I was probably missing positive/negative values for the independent variable i.e. ‘approved’.

A quick count on the test data frame using dplyr confirmed my mistake:

> test %>% count(attended)
Source: local data frame [1 x 2]
 
  attended     n
     (dbl) (int)
1        1   582

I’ll have to randomly sort the data frame and then reassign my training and test data frames to work around it.

Categories: Programming

Python: Squashing ‘duplicate’ pairs together

Sun, 12/20/2015 - 13:12

As part of a data cleaning pipeline I had pairs of ids of duplicate addresses that I wanted to group together.

I couldn’t work out how to solve the problem immediately so I simplified the problem into pairs of letters i.e.

A	B		(A is the same as B)
B	C		(B is the same as C)
C	D		...
E	F		(E is the same as F)
F	G		...

The output that I want to get is:

(A, B, C, D)
(E, F, G)

I spent several hours trying to come up with a clever data structure to do this until Reshmee suggested tracking the sets of duplicates using an array of arrays or list of lists since we’re going to script this using Python.

The actual data is in a CSV file but we’ll create a list of tuples to save ourselves some work:

pairs = [ ("A", "B"), ("B", "C"), ("C", "D"), ("E", "F"), ("F", "G") ]

We’re going to iterate through the list of pairs and on each iteration we’ll check if there’s an entry in the list containing either of the values. There can be three outcomes from this check:

  1. No entry – we’ll add a new entry with our pair of values.
  2. One entry – we’ll add the other value to that entry.
  3. Two entries – we’ll merge them together replacing the existing entry.

The first step is to write a function to check the list of lists for a matching pair:

def find_matching_index(pair, dups):
    return [index
            for index, dup in enumerate(dups)
            if pair[0] in dup or pair[1] in dup]
 
print find_matching_index(("A", "B"), [set(["D", "E"])])
[]
 
print find_matching_index(("B", "C"), [set(["A", "B"])])
[0]
 
print find_matching_index(("B", "C"), [set(["A", "B"]), set(["C", "D"])])
[0, 1]

Next we need to write a function which iterates over all our pairs of values and uses find_matching_index to work out which decision to make:

def extract_groups(items):
    dups = []
    for pair in items:
        matching_index = find_matching_index(pair, dups)
 
        if len(matching_index) == 0:
            dups.append(set([pair[0], pair[1]]))
        elif len(matching_index) == 1:
            index = matching_index[0]
            matching_dup = dups[index]
            dups.pop(index)
            dups.append(matching_dup.union([pair[0], pair[1]]))
        else:
            index1, index2 = matching_index
            dup1 = dups[index1]
            dup2 = dups[index2]
 
            dups.pop(index1)
            dups.pop(index2 - 1) # the index decrements since we removed one entry on the previous line
            dups.append(dup1.union(dup2))
    return dups

Now let’s run this with a few test cases:

test_cases = [
    [ ("A", "B"), ("B", "C"), ("C", "D"), ("E", "F"), ("F", "G") ],
    [ ("A", "B"), ("B", "C"), ("C", "D"), ("E", "F"), ("F", "G"), ("G", "A"), ("G", "Z"), ("B", "D") ],
    [ ("A", "B"), ("B", "C"), ("C", "E"), ("E", "A") ],
    [ ("A", "B"), ("C", "D"), ("F", "G"), ("H", "I"), ("J", "A") ]
]
 
for test_case in test_cases:
    print extract_groups(test_case)
 
[set(['A', 'C', 'B', 'D']), set(['E', 'G', 'F'])]
[set(['A', 'C', 'B', 'E', 'D', 'G', 'F', 'Z'])]
[set(['A', 'C', 'B', 'E'])]
[set(['C', 'D']), set(['G', 'F']), set(['I', 'H']), set(['A', 'J', 'B'])]

This certainly doesn’t scale very well but since I only have a few hundred duplicate addresses it does the job for me.

It feels like there should be a more functional way to write these functions without mutating all these lists but I haven’t figured out what that is yet.

Categories: Programming

Neo4j: Specific relationship vs Generic relationship + property

Sun, 12/13/2015 - 22:22

For optimal traversal speed in Neo4j queries we should make our relationship types as specific as possible.

Let’s take a look at an example from the ‘modelling a recommendations engine‘ talk I presented at Skillsmatter a couple of weeks ago.

I needed to decided how to model the ‘RSVP’ relationship between a Member and an Event. A person can RSVP ‘yes’ or ‘no’ to an event and I’d like to capture both of these responses.

i.e. we can choose between:

2015 12 13 20 39 05

and:

2015 12 13 20 39 54

When deciding on a model we mainly need to think about the types of queries that we want to write. We shouldn’t forget about updating the model but in my experience more time is spent querying graphs than updating them.

Let’s take a look at each of those in turn:

What queries do we want to write?

The first query was going to use previous ‘yes’ RSVPs as an indicator of interest for future events. We’re not interested in ‘no’ RSVPs for this query.

I started out with the generic RSVP relationship type with a ‘response’ property to distinguish between ‘yes’ and ‘no’:

MATCH (member:Member {name: "Mark Needham"})
MATCH (futureEvent:Event) WHERE futureEvent.time >= timestamp()
MATCH (futureEvent)<-[:HOSTED_EVENT]-(group)
 
OPTIONAL MATCH (member)-[rsvp:RSVPD {response: "yes"}]->(pastEvent)<-[:HOSTED_EVENT]-(group)
WHERE pastEvent.time < timestamp()
 
RETURN group.name, futureEvent.name, COUNT(rsvp) AS previousEvents
ORDER BY  previousEvents DESC


This ran reasonably quickly but I was curious whether I could get the query to run any quicker by changing to the more specific model. Using the more specific relationship type our query reads:

MATCH (member:Member {name: "Mark Needham"})
MATCH (futureEvent:Event) WHERE futureEvent.time >= timestamp()
MATCH (futureEvent)<-[:HOSTED_EVENT]-(group)
 
OPTIONAL MATCH (member)-[rsvp:RSVP_YES]->(pastEvent)<-[:HOSTED_EVENT]-(group)
WHERE pastEvent.time < timestamp()
 
RETURN group.name, 
       futureEvent.name, 
       COUNT(rsvp) AS previousEvents
ORDER BY  previousEvents DESC

We can now profile our query and compare the db hits of both solutions:

RSVPD {response: "yes"}
Cypher version: CYPHER 2.3, planner: COST. 688635 total db hits in 232 ms.
 
RSVP_YES
Cypher version: CYPHER 2.3, planner: COST. 559866 total db hits in 207 ms.

So we get a slight gain by using the more specific relationship type. The reason the db hits is lower is partly because we’ve removed the need to lookup the ‘response’ property on every ‘RSVP’ property and check that it matches ‘yes’. We’re also evaluating fewer relationships since we only look at positive RSVPs, negative ones are ignored.

Our next query might be to capture all the RSVPs made by a member and list them alongside the events:

MATCH (member:Member {name: "Mark Needham"})-[rsvp:RSVPD]->(event)
WHERE event.time < timestamp()
RETURN event.name, event.time, rsvp.response
ORDER BY event.time DESC
MATCH (member:Member {name: "Mark Needham"})-[rsvp:RSVP_YES|:RSVP_NO]->(event)
WHERE event.time < timestamp()
RETURN event.name, event.time, CASE TYPE(rsvp) WHEN "RSVP_YES" THEN "yes" ELSE "no" END AS response
ORDER BY event.time DESC

Again we see a marginal db hits win for the more specific relationship type:

RSVPD {response: "yes"} / RSVPD {response: "no"}
Cypher version: CYPHER 2.3, planner: COST. 684 total db hits in 37 ms.
 
RSVP_YES / RSVP_NO
Cypher version: CYPHER 2.3, planner: COST. 541 total db hits in 24 ms.

However, the query is quite unwieldy and unless we store the response as a property on the relationship the code to return ‘yes’ or ‘no’ is a bit awkward. The more specific approach query would become even more painful to deal with if we introduced the ‘waitlist’ RSVP which we’ve chosen to exclude.

Will we need to update the relationship?

Yes! Users are able to change their RSVP up until the event happens so we need to be able to handle that.

Let’s have a look at the queries we’d have to write to handle a change in RSVP using both models:

Generic relationship type
MATCH (event:Event {id: {event_id}})
MATCH (member:Member {id: {member_id}})
MERGE (member)-[rsvpRel:RSVPD {id: {rsvp_id}}]->(event)
ON CREATE SET rsvpRel.created = toint({mtime})
ON MATCH  SET rsvpRel.lastModified = toint({mtime})
SET rsvpRel.response = {response}
Specific relationship type
MATCH (event:Event {id: {event_id}})
MATCH (member:Member {id: {member_id}})
 
FOREACH(ignoreMe IN CASE WHEN {response} = "yes" THEN [1] ELSE [] END |
  MERGE (member)-[rsvpYes:RSVP_YES {id: {rsvp_id}}]->(event)
  ON CREATE SET rsvpYes.created = toint({mtime})
  ON MATCH  SET rsvpYes.lastModified = toint({mtime})
 
  MERGE (member)-[oldRSVP:RSVP_NO]->(event)
  DELETE oldRSVP
)
 
FOREACH(ignoreMe IN CASE WHEN {response} = "no" THEN [1] ELSE [] END |
  MERGE (member)-[rsvpNo:RSVP_NO {id: {rsvp_id}}]->(event)
  ON CREATE SET rsvpNo.created = toint({mtime})
  ON MATCH  SET rsvpNo.lastModified = toint({mtime})
 
  MERGE (member)-[oldRSVP:RSVP_YES]->(event)
  DELETE oldRSVP
)

As you can see, the code to update an RSVP is more complicated when using the specific relationship type due in part to Cypher not yet having first class support for conditionals.

In summary, for our meetup.com model we gain speed improvements by using more specific relationship types but at the expense of some more complicated read queries and a significantly more convoluted update query.

Depending on the cardinality of relationships in your model your mileage may vary but it’s worth doing some profiling to compare all your options.

Categories: Programming

Neo4j: Facts as nodes

Fri, 12/04/2015 - 08:52

On Tuesday I spoke at the Neo4j London user group about incrementally building a recommendation engine and described the ‘facts as nodes’ modeling pattern, defined as follows in the Graph Databases book:

When two or more domain entities interact for a period of time, a fact emerges. We represent a fact as a separate node with connections to each of the entities engaged in that fact.

Modeling an action in terms of its product—that is, in terms of the thing that results from the action—produces a similar structure: an intermediate node that represents the outcome of an interaction between two or more entities.

We started with the following model describing a meetup member and the groups they’ve joined:

2015 12 04 07 26 11

This model works well for the query it was defined for – find groups similar to ones that I’m already a member of:

MATCH (member:Member {name: "Mark Needham"})-[:MEMBER_OF]->(group)-[:HAS_TOPIC]->(topic)
WITH member, topic, COUNT(*) AS score
MATCH (topic)<-[:HAS_TOPIC]-(otherGroup) 
WHERE NOT (member)-[:MEMBER_OF]->(otherGroup)
RETURN otherGroup.name, COLLECT(topic.name), SUM(score) as score
ORDER BY score DESC

Prefixing that query with the ‘PROFILE’ keyword yields a query plan and the following summary text:

Cypher version: CYPHER 2.3, planner: COST. 89100 total db hits in 113 ms.

In this model it feels like there is a membership fact waiting to become a node.

2015 12 04 07 35 38

We can refactor towards that model with the following query:

MATCH (member:Member)-[rel:MEMBER_OF]->(group)
 
MERGE (membership:Membership {id: member.id + "_" + group.id})
SET membership.joined = rel.joined
 
MERGE (member)-[:HAS_MEMBERSHIP]->(membership)
MERGE (membership)-[:OF_GROUP]->(group);

We’d answer our initial question with the following query:

MATCH (member:Member {name: "Mark Needham"})-[:HAS_MEMBERSHIP]->()-[:OF_GROUP]->(group:Group)-[:HAS_TOPIC]->(topic)
WITH member, topic, COUNT(*) AS score
MATCH (topic)<-[:HAS_TOPIC]-(otherGroup) 
WHERE NOT (member)-[:HAS_MEMBERSHIP]->(:Membership)-[:OF_GROUP]->(otherGroup:Group)
RETURN otherGroup.name, COLLECT(topic.name), SUM(score) as score
ORDER BY score DESC

at the following cost:

Cypher version: CYPHER 2.3, planner: COST. 468201 total db hits in 346 ms.

The membership node hasn’t proved its value yet – it does 4x more work to get the same result. However, the next question we want to answer is ‘what group do people join after the Neo4j user group?’ where it might come in handy.

First we’ll add a ‘NEXT’ relationship between a user’s adjacent group memberships by writing the following query:

MATCH (member:Member)-[:HAS_MEMBERSHIP]->(membership)
 
WITH member, membership ORDER BY member.id, membership.joined
 
WITH member, COLLECT(membership) AS memberships
UNWIND RANGE(0,SIZE(memberships) - 2) as idx
 
WITH memberships[idx] AS m1, memberships[idx+1] AS m2
MERGE (m1)-[:NEXT]->(m2);

And now for the query:

MATCH (group:Group {name: "Neo4j - London User Group"})<-[:OF_GROUP]-(membership)-[:NEXT]->(nextMembership),         
      (membership)<-[:HAS_MEMBERSHIP]-(member:Member)-[:HAS_MEMBERSHIP]->(nextMembership),
      (nextMembership)-[:OF_GROUP]->(nextGroup)
RETURN nextGroup.name, COUNT(*) AS times
ORDER BY times DESC
Cypher version: CYPHER 2.3, planner: COST. 23671 total db hits in 39 ms.

And for comparison – the same query using the initial model:

MATCH (group:Group {name: "Neo4j - London User Group"})<-[membership:MEMBER_OF]-(member),
      (member)-[otherMembership:MEMBER_OF]->(otherGroup)
WHERE membership.joined < otherMembership.joined
WITH member, otherGroup 
ORDER BY otherMembership.joined
WITH member, COLLECT(otherGroup)[0] AS nextGroup
RETURN nextGroup.name, COUNT(*) AS times
ORDER BY times DESC
Cypher version: CYPHER 2.3, planner: COST. 86179 total db hits in 138 ms.

This time the membership model does 3x less work, so depending on the question a different model works better.

Given this observation we might choose to keep both models. The disadvantage of doing that is that we pay write and maintenance penalties to keep them both in sync. e.g. this is what queries to add a new membership or remove one would look like

Adding group membership
WITH "Mark Needham" AS memberName, 
     "Neo4j - London User Group" AS groupName,
     timestamp() AS now
 
MATCH (group:Group {name: groupName})
MATCH (member:Member {name: memberName})
 
MERGE (member)-[memberOfRel:MEMBER_OF]->(group)
ON CREATE SET memberOfRel.time = now
 
MERGE (membership:Membership {id: member.id + "_" + group.id})
ON CREATE SET membership.joined = now
MERGE (member)-[:HAS_MEMBERSHIP]->(membership)
MERGE (membership)-[:OF_GROUP]->(group)
Removing group membership
WITH "Mark Needham" AS memberName, 
     "Neo4j - London User Group" AS groupName,
     timestamp() AS now
 
MATCH (group:Group {name: groupName})
MATCH (member:Member {name: memberName})
 
MATCH (member)-[memberOfRel:MEMBER_OF]->(group)
 
MATCH (membership:Membership {id: member.id + "_" + group.id})
MATCH (member)-[hasMembershipRel:HAS_MEMBERSHIP]->(membership)
MATCH (membership)-[ofGroupRel:OF_GROUP]->(group)
 
DELETE memberOfRel, hasMembershipRel, ofGroupRel

The dataset is on github so take a look at it and send any questions my way.

Categories: Programming

Python: Parsing a JSON HTTP chunking stream

Sat, 11/28/2015 - 14:56

I’ve been playing around with meetup.com’s API again and this time wanted to consume the chunked HTTP RSVP stream and filter RSVPs for events I’m interested in.

I use Python for most of my hacking these days and if HTTP requests are required the requests library is my first port of call.

I started out with the following script

import requests
import json
 
def stream_meetup_initial():
    uri = "http://stream.meetup.com/2/rsvps"
    response = requests.get(uri, stream = True)
    for chunk in response.iter_content(chunk_size = None):
        yield chunk
 
for raw_rsvp in stream_meetup_initial():
    print raw_rsvp
    try:
        rsvp = json.loads(raw_rsvp)
    except ValueError as e:
        print e
        continue

This mostly worked but I also noticed the following error from time to time:

No JSON object could be decoded

Although less frequent, I also saw errors suggesting I was trying to parse an incomplete JSON object. I tweaked the function to keep a local buffer and only yield that if the chunk ended in a new line character:

def stream_meetup_newline():
    uri = "http://stream.meetup.com/2/rsvps"
    response = requests.get(uri, stream = True)
    buffer = ""
    for chunk in response.iter_content(chunk_size = 1):
        if chunk.endswith("\n"):
            buffer += chunk
            yield buffer
            buffer = ""
        else:
            buffer += chunk

This mostly works although I’m sure I’ve seen some occasions where two JSON objects were being yielded and then the call to ‘json.loads’ failed. I haven’t been able to reproduce that though.

A second read through the requests documentation made me realise I hadn’t read it very carefully the first time since we can make our lives much easier by using ‘iter_lines’ rather than ‘iter_content’:

r = requests.get('http://stream.meetup.com/2/rsvps', stream=True)
for raw_rsvp in r.iter_lines():
    if raw_rsvp:
        rsvp = json.loads(raw_rsvp)
        print rsvp

We can then process ‘rsvp’, filtering out the ones we’re interested in.

Categories: Programming

jq: Cannot iterate over number / string and number cannot be added

Tue, 11/24/2015 - 01:12

In my continued parsing of meetup.com’s JSON API I wanted to extract some information from the following JSON file:

$ head -n40 data/members/18313232.json
[
  {
    "status": "active",
    "city": "London",
    "name": ". .",
    "other_services": {},
    "country": "gb",
    "topics": [],
    "lon": -0.13,
    "joined": 1438866605000,
    "id": 92951932,
    "state": "17",
    "link": "http://www.meetup.com/members/92951932",
    "photo": {
      "thumb_link": "http://photos1.meetupstatic.com/photos/member/8/d/6/b/thumb_250896203.jpeg",
      "photo_id": 250896203,
      "highres_link": "http://photos1.meetupstatic.com/photos/member/8/d/6/b/highres_250896203.jpeg",
      "photo_link": "http://photos1.meetupstatic.com/photos/member/8/d/6/b/member_250896203.jpeg"
    },
    "lat": 51.49,
    "visited": 1446745707000,
    "self": {
      "common": {}
    }
  },
  {
    "status": "active",
    "city": "London",
    "name": "Abdelkader Idryssy",
    "other_services": {},
    "country": "gb",
    "topics": [
      {
        "name": "Weekend Adventures",
        "urlkey": "weekend-adventures",
        "id": 16438
      },
      {
        "name": "Community Building",
        "urlkey": "community-building",

In particular I want to extract the member’s id, name, join date and the ids of topics they’re interested in. I started with the following jq query to try and extract those attributes:

$ jq -r '.[] | [.id, .name, .joined, (.topics[] | .id | join(";"))] | @csv' data/members/18313232.json
Cannot iterate over number (16438)

Annoyingly this treats topic ids on an individual basis rather than as an array as I wanted. I tweaked the query to the following with no luck:

$ jq -r '.[] | [.id, .name, .joined, (.topics[].id | join(";"))] | @csv' data/members/18313232.json
Cannot iterate over number (16438)

As a guess I decided to wrap ‘.topics[].id’ in an array literal to see if it had any impact:

$ jq -r '.[] | [.id, .name, .joined, ([.topics[].id] | join(";"))] | @csv' data/members/18313232.json
92951932,". .",1438866605000,""
jq: error (at data/members/18313232.json:60013): string ("") and number (16438) cannot be added

Woot! A different error message at least and this one seems to be due to a type mismatch between the string we want to end up with and the array of numbers that we currently have.

We can cast our way to victory with the ‘tostring’ function:

$ jq -r '.[] | [.id, .name, .joined, ([.topics[].id | tostring] | join(";"))] | @csv' data/members/18313232.json
...
92951932,". .",1438866605000,""
193866304,"Abdelkader Idryssy",1445195325000,"16438;20727;15401;9760;20246;20923;3336;2767;242;58259;4417;1789;10454;20274;10232;563;25375;16433;15187;17635;26273;21808;933;7789;23884;16212;144477;15322;21067;3833;108403;20221;1201;182;15083;9696;4377;15360;18296;15121;17703;10161;1322;3880;18333;3485;15585;44584;18692;21681"
28643052,"Abhishek Chanda",1439688955000,"646052;520302;15167;563;65735;537492;646072;537502;24959;1025832;8599;31197;24410;26118;10579;1064;189;48471;16216;18062;33089;107633;46831;20479;1423042;86258;21441;3833;21681;188;9696;58162;20398;113032;18060;29971;55324;30928;15261;58259;638;16475;27591;10107;242;109595;10470;26384;72514;1461192"
39523062,"Adam Kinder-Jones",1438677474000,"70576;21549;3833;42277;164111;21522;93380;48471;15167;189;563;25435;87614;9696;18062;58162;10579;21681;19882;108403;128595;15582;7029"
194119823,"Adam Lewis",1444867994000,"10209"
14847001,"Adam Rogers",1422917313000,""
87709042,"Adele Green",1436867381000,"15167;18062;102811;9696;30928;18060;78565;189;7029;48471;127567;10579;58162;563;3833;16216;21441;37708;209821;15401;59325;31792;21836;21900;984862;15720;17703;96823;4422;85951;87614;37428;2260;827;121802;19672;38660;84325;118991;135612;10464;1454542;17936;21549;21520;17628;148303;20398;66339;29661"
11497937,"Adrian Bridgett",1421067940000,"30372;15046;25375;638;498;933;374;27591;18062;18060;15167;10581;16438;15672;1998;1273;713;26333;15099;15117;4422;15892;242;142180;563;31197;20479;1502;131178;15018;43256;58259;1201;7319;15940;223;8652;66493;15029;18528;23274;9696;128595;21681;17558;50709;113737"
14151190,"adrian lawrence",1437142198000,"7029;78565;659;85951;15582;48471;9696;128595;563;10579;3833;101960;16137;1973;78566;206;223;21441;16216;108403;21681;186;1998;15731;17703;15043;16613;17885;53531;48375;16615;19646;62326;49954;933;22268;19243;37381;102811;30928;455;10358;73511;127567;106382;16573;36229;781;23981;1954"
183557824,"Adrien Pujol",1421882756000,"108403;563;9696;21681;188;24410;1064;32743;124668;15472;21123;1486432;1500742;87614;46831;1454542;46810;166000;126177;110474"
...
Categories: Programming

jq: Filtering missing keys

Sat, 11/14/2015 - 23:51

I’ve been playing around with the meetup.com API again over the last few days and having saved a set of events to disk I wanted to extract the venues using jq.

This is what a single event record looks like:

$ jq -r ".[0]" data/events/0.json
{
  "status": "past",
  "rating": {
    "count": 1,
    "average": 1
  },
  "utc_offset": 3600000,
  "event_url": "http://www.meetup.com/londonweb/events/3261890/",
  "group": {
    "who": "Web Peeps",
    "name": "London Web",
    "group_lat": 51.52000045776367,
    "created": 1034097743000,
    "join_mode": "approval",
    "group_lon": -0.12999999523162842,
    "urlname": "londonweb",
    "id": 163876
  },
  "name": "London Web Design October Meetup",
  "created": 1094756756000,
  "venue": {
    "city": "London",
    "name": "Roadhouse Live Music Restaurant , Bar & Club",
    "country": "GB",
    "lon": -0.1,
    "phone": "44-020-7240-6001",
    "address_1": "The Piazza",
    "address_2": "Covent Garden",
    "repinned": false,
    "lat": 51.52,
    "id": 11725
  },
  "updated": 1273536337000,
  "visibility": "public",
  "yes_rsvp_count": 2,
  "time": 1097776800000,
  "waitlist_count": 0,
  "headcount": 0,
  "maybe_rsvp_count": 5,
  "id": "3261890"
}

We want to extract the keys underneath ‘venue’.
I started with the following:

$ jq -r ".[] | .venue" data/events/0.json
...
{
  "city": "London",
  "name": "Counting House Pub",
  "country": "gb",
  "lon": -0.085022,
  "phone": "020 7283 7123",
  "address_1": "50 Cornhill Rd",
  "address_2": "EC3V 3PD",
  "repinned": false,
  "lat": 51.513407,
  "id": 835790
}
null
{
  "city": "Paris",
  "name": "Mozilla Paris",
  "country": "fr",
  "lon": 2.341002,
  "address_1": "16 Bis Boulevard Montmartre",
  "repinned": false,
  "lat": 48.871834,
  "id": 23591845
}
...

This is close to what I want but it includes ‘null’ values which means when you extract the keys inside ‘venue’ they are all empty as well:

jq -r ".[] | .venue | [.id, .name, .city, .address_1, .address_2, .lat, .lon] | @csv" data/events/0.json
...
101958,"The Green Man and French Horn,  -","London","54, St. Martins Lane - Covent Garden","WC2N 4EA",51.52,-0.1
,,,,,,
107295,"The Yorkshire Grey Pub","London","46 Langham Street","W1W 7AX",51.52,-0.1
...
,,,,,,

If functional programming lingo we want to filter out any JSON documents which don’t have the ‘venue’ key.
‘filter’ has a different meaning in jq so it took me a while to realise that the ‘select’ function was what I needed to get rid of the null values:

$ jq -r ".[] | select(.venue != null) | .venue | [.id, .name, .city, .address_1, .address_2, .lat, .lon] | @csv" data/events/0.json | head
11725,"Roadhouse Live Music Restaurant , Bar & Club","London","The Piazza","Covent Garden",51.52,-0.1
11725,"Roadhouse Live Music Restaurant , Bar & Club","London","The Piazza","Covent Garden",51.52,-0.1
11725,"Roadhouse Live Music Restaurant , Bar & Club","London","The Piazza","Covent Garden",51.52,-0.1
11725,"Roadhouse Live Music Restaurant , Bar & Club","London","The Piazza","Covent Garden",51.52,-0.1
76192,"Pied Bull Court","London","Galen Place, London, WC1A 2JR",,51.516747,-0.12719
76192,"Pied Bull Court","London","Galen Place, London, WC1A 2JR",,51.516747,-0.12719
85217,"Earl's Court Exhibition Centre","London","Warwick Road","SW5 9TA",51.49233,-0.199735
96579,"Olympia 2","London","Near Olympia tube station",,51.52,-0.1
76192,"Pied Bull Court","London","Galen Place, London, WC1A 2JR",,51.516747,-0.12719
101958,"The Green Man and French Horn,  -","London","54, St. Martins Lane - Covent Garden","WC2N 4EA",51.52,-0.1

And we’re done.

Categories: Programming

Docker 1.9: Port forwarding on Mac OS X

Sun, 11/08/2015 - 21:58

Since the Neo4j 2.3.0 release there’s been an official docker image which I thought I’d give a try this afternoon.

The last time I used docker about a year ago I had to install boot2docker which has now been deprecated in place of Docker Machine and the Docker Toolbox.

I created a container with the following command:

docker run --detach --publish=7474:7474 neo4j/neo4j

And then tried to access the Neo4j server locally:

$ curl http://localhost:7474
curl: (7) Failed to connect to localhost port 7474: Connection refused

I quickly checked that docker had started up Neo4j correctly:

$ docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                              NAMES
1f7c48e267f0        neo4j/neo4j         "/docker-entrypoint.s"   10 minutes ago      Up 10 minutes       7473/tcp, 0.0.0.0:7474->7474/tcp   kickass_easley

Looks good. Amusingly I then came across my own blog post from a year ago where I’d run into the same problem – the problem being that we need to access the Neo4j server via the VM’s IP address rather than localhost.

Instead of using boot2docker we now need to use docker-machine to find the VM’s IP address:

$ docker-machine ls
NAME      ACTIVE   DRIVER       STATE     URL                         SWARM
default   *        virtualbox   Running   tcp://192.168.99.100:2376
$ curl http://192.168.99.100:7474
{
  "management" : "http://192.168.99.100:7474/db/manage/",
  "data" : "http://192.168.99.100:7474/db/data/"
}

And we’re back in business.

Categories: Programming

IntelliJ ‘java: cannot find JDK 1.8’

Sun, 11/08/2015 - 12:47

I upgraded to IntelliJ 15.0 a few days ago and was initially seeing the following exception when trying to compile:

module-name
 
java: cannot find JDK 1.8

I’ve been compiling against JDK 1.8 for a while now using IntelliJ 14 so I wasn’t sure what was going on.

I checked my project settings and they seemed fine:

2015 11 08 11 39 16

The error message suggested I look in the logs to find more information but I wasn’t sure where those live! I eventually found out the answer via the comments of this support ticket although I later found a post describing it in more detail.

Looking into the logs revealed the following error message:

$ less /Users/markneedham/Library/Logs/IntelliJIdea15/idea.log
 
2015-11-05 16:31:28,429 [ 428129]   INFO - figurations.GeneralCommandLine - Cannot run program "/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/bin/java" (in directory "/Applications/IntelliJ IDEA 15.app/Contents/bin"): error=2, No such file or directory
java.io.IOException: Cannot run program "/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/bin/java" (in directory "/Applications/IntelliJ IDEA 15.app/Contents/bin"): error=2, No such file or directory
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at com.intellij.execution.configurations.GeneralCommandLine.startProcess(GeneralCommandLine.java:368)
	at com.intellij.execution.configurations.GeneralCommandLine.createProcess(GeneralCommandLine.java:354)
	at com.intellij.execution.process.OSProcessHandler.<init>(OSProcessHandler.java:38)
	at org.jetbrains.idea.maven.server.MavenServerManager$3.startProcess(MavenServerManager.java:359)
	at org.jetbrains.idea.maven.server.MavenServerManager$3.execute(MavenServerManager.java:345)
	at com.intellij.execution.rmi.RemoteProcessSupport.a(RemoteProcessSupport.java:206)
	at com.intellij.execution.rmi.RemoteProcessSupport.acquire(RemoteProcessSupport.java:139)
	at org.jetbrains.idea.maven.server.MavenServerManager.create(MavenServerManager.java:163)
	at org.jetbrains.idea.maven.server.MavenServerManager.create(MavenServerManager.java:71)
	at org.jetbrains.idea.maven.server.RemoteObjectWrapper.getOrCreateWrappee(RemoteObjectWrapper.java:41)
	at org.jetbrains.idea.maven.server.MavenServerManager$9.execute(MavenServerManager.java:525)
	at org.jetbrains.idea.maven.server.MavenServerManager$9.execute(MavenServerManager.java:522)
	at org.jetbrains.idea.maven.server.RemoteObjectWrapper.perform(RemoteObjectWrapper.java:76)
	at org.jetbrains.idea.maven.server.MavenServerManager.applyProfiles(MavenServerManager.java:522)
	at org.jetbrains.idea.maven.project.MavenProjectReader.applyProfiles(MavenProjectReader.java:369)
	at org.jetbrains.idea.maven.project.MavenProjectReader.doReadProjectModel(MavenProjectReader.java:98)
	at org.jetbrains.idea.maven.project.MavenProjectReader.access$300(MavenProjectReader.java:42)
	at org.jetbrains.idea.maven.project.MavenProjectReader$1.doProcessParent(MavenProjectReader.java:422)
	at org.jetbrains.idea.maven.project.MavenProjectReader$1.doProcessParent(MavenProjectReader.java:399)
	at org.jetbrains.idea.maven.project.MavenParentProjectFileProcessor.processRepositoryParent(MavenParentProjectFileProcessor.java:84)
	at org.jetbrains.idea.maven.project.MavenParentProjectFileProcessor.process(MavenParentProjectFileProcessor.java:62)
	at org.jetbrains.idea.maven.project.MavenProjectReader.resolveInheritance(MavenProjectReader.java:425)
	at org.jetbrains.idea.maven.project.MavenProjectReader.doReadProjectModel(MavenProjectReader.java:95)
	at org.jetbrains.idea.maven.project.MavenProjectReader.access$300(MavenProjectReader.java:42)
	at org.jetbrains.idea.maven.project.MavenProjectReader$1.doProcessParent(MavenProjectReader.java:422)
	at org.jetbrains.idea.maven.project.MavenProjectReader$1.doProcessParent(MavenProjectReader.java:399)
	at org.jetbrains.idea.maven.project.MavenParentProjectFileProcessor.processRepositoryParent(MavenParentProjectFileProcessor.java:84)
	at org.jetbrains.idea.maven.project.MavenParentProjectFileProcessor.process(MavenParentProjectFileProcessor.java:62)
	at org.jetbrains.idea.maven.project.MavenProjectReader.resolveInheritance(MavenProjectReader.java:425)
	at org.jetbrains.idea.maven.project.MavenProjectReader.doReadProjectModel(MavenProjectReader.java:95)
	at org.jetbrains.idea.maven.project.MavenProjectReader.access$300(MavenProjectReader.java:42)
	at org.jetbrains.idea.maven.project.MavenProjectReader$1.doProcessParent(MavenProjectReader.java:422)
	at org.jetbrains.idea.maven.project.MavenProjectReader$1.doProcessParent(MavenProjectReader.java:399)
	at org.jetbrains.idea.maven.project.MavenParentProjectFileProcessor.processRepositoryParent(MavenParentProjectFileProcessor.java:84)
	at org.jetbrains.idea.maven.project.MavenParentProjectFileProcessor.process(MavenParentProjectFileProcessor.java:62)
	at org.jetbrains.idea.maven.project.MavenProjectReader.resolveInheritance(MavenProjectReader.java:425)
	at org.jetbrains.idea.maven.project.MavenProjectReader.doReadProjectModel(MavenProjectReader.java:95)
	at org.jetbrains.idea.maven.project.MavenProjectReader.readProject(MavenProjectReader.java:53)
	at org.jetbrains.idea.maven.project.MavenProject.read(MavenProject.java:626)
	at org.jetbrains.idea.maven.project.MavenProjectsTree.doUpdate(MavenProjectsTree.java:564)
	at org.jetbrains.idea.maven.project.MavenProjectsTree.doAdd(MavenProjectsTree.java:509)
	at org.jetbrains.idea.maven.project.MavenProjectsTree.update(MavenProjectsTree.java:470)
	at org.jetbrains.idea.maven.project.MavenProjectsTree.updateAll(MavenProjectsTree.java:441)
	at org.jetbrains.idea.maven.project.MavenProjectsProcessorReadingTask.perform(MavenProjectsProcessorReadingTask.java:60)
	at org.jetbrains.idea.maven.project.MavenProjectsProcessor.doProcessPendingTasks(MavenProjectsProcessor.java:134)
	at org.jetbrains.idea.maven.project.MavenProjectsProcessor.access$100(MavenProjectsProcessor.java:30)
	at org.jetbrains.idea.maven.project.MavenProjectsProcessor$2.run(MavenProjectsProcessor.java:109)
	at org.jetbrains.idea.maven.utils.MavenUtil$7.run(MavenUtil.java:464)
	at com.intellij.openapi.application.impl.ApplicationImpl$8.run(ApplicationImpl.java:365)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
	at org.jetbrains.ide.PooledThreadExecutor$1$1.run(PooledThreadExecutor.java:55)
Caused by: java.io.IOException: error=2, No such file or directory
	at java.lang.UNIXProcess.forkAndExec(Native Method)
	at java.lang.UNIXProcess.<init>(UNIXProcess.java:248)
	at java.lang.ProcessImpl.start(ProcessImpl.java:134)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
	... 55 more

Somewhere I had a JDK 1.7 defined which no longer existed on my machine. I actually only have one JDK installed at the moment:

$ /usr/libexec/java_home -V
Matching Java Virtual Machines (1):
    1.8.0_51, x86_64:	"Java SE 8"	/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home
 
/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home

A bit of exploring led me to ‘Platform Settings’ which is where the culprit was:

2015 11 08 11 45 00

That setting lives actually lives in /Users/markneedham/Library/Preferences/IntelliJIdea15/options/jdk.table.xml and once I removed it IntelliJ resumed normal service.

Categories: Programming

Hadoop: HDFS – ava.lang.NoSuchMethodError: org.apache.hadoop.fs.FSOutputSummer.(Ljava/util/zip/Checksum;II)V

Sun, 11/01/2015 - 00:58

I wanted to write a little program to check that one machine could communicate a HDFS server running on the other and adapted some code from the Hadoop wiki as follows:

package org.playground;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
import java.io.IOException;
 
public class HadoopDFSFileReadWrite {
 
    static void printAndExit(String str) {
        System.err.println( str );
        System.exit(1);
    }
 
    public static void main (String[] argv) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/Users/markneedham/Downloads/core-site.xml"));
 
        FileSystem fs = FileSystem.get(conf);
 
        Path inFile = new Path("hdfs://192.168.0.11/user/markneedham/explore.R");
        Path outFile = new Path("hdfs://192.168.0.11/user/markneedham/output-" + System.currentTimeMillis());
 
        // Check if input/output are valid
        if (!fs.exists(inFile))
            printAndExit("Input file not found");
        if (!fs.isFile(inFile))
            printAndExit("Input should be a file");
        if (fs.exists(outFile))
            printAndExit("Output already exists");
 
        // Read from and write to new file
        byte buffer[] = new byte[256];
        try ( FSDataInputStream in = fs.open( inFile ); FSDataOutputStream out = fs.create( outFile ) )
        {
            int bytesRead = 0;
            while ( (bytesRead = in.read( buffer )) > 0 )
            {
                out.write( buffer, 0, bytesRead );
            }
        }
        catch ( IOException e )
        {
            System.out.println( "Error while copying file" );
        }
    }
}

I initially thought I only had the following in my POM file:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.0</version>
</dependency>

But when I ran the script I got the following exception:

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.fs.FSOutputSummer.<init>(Ljava/util/zip/Checksum;II)V
	at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1553)
	at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1582)
	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1614)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1465)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1390)
	at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:394)
	at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:390)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:390)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:334)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:776)
	at org.playground.HadoopDFSFileReadWrite.main(HadoopDFSFileReadWrite.java:37)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

From following the stack trace I realised I’d made a mistake and had accidentally pulled in a dependency on hadoop-hdfs 2.4.1. If we don’t have the hadoop-hdfs dependency we’d actually see this error instead:

Exception in thread "main" java.io.IOException: No FileSystem for scheme: hdfs
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2644)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2651)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:170)
	at org.playground.HadoopDFSFileReadWrite.main(HadoopDFSFileReadWrite.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

Now let’s add the correct version of the dependency and make sure it all works as expected:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.7.0</version>
    <exclusions>
        <exclusion>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </exclusion>
        <exclusion>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>

When we run that a new file is created in HDFS on the other machine with the current timestamp:

$ date +%s000
1446336801000
 
$ hdfs dfs -ls
...
-rw-r--r--   3 markneedham supergroup       9249 2015-11-01 00:13 output-1446337098257
...
Categories: Programming

Spark: MatchError (of class org.apache.spark.sql.catalyst.expressions.GenericRow) spark

Wed, 10/28/2015 - 00:10

I’ve been using Spark again lately to do some pre-processing on the Land Registry data set and ran into an initially confusing problem when trying to parse the CSV file.

I’m using the Databricks CSV parsing library and wrote the following script to go over each row, collect up the address components and then derive a ‘fullAddress’ field.

To refresh, this is what the CSV file looks like:

$ head  -n5 pp-complete.csv
"{0C7ADEF5-878D-4066-B785-0000003ED74A}","163000","2003-02-21 00:00","UB5 4PJ","T","N","F","106","","READING ROAD","NORTHOLT","NORTHOLT","EALING","GREATER LONDON","A"
"{35F67271-ABD4-40DA-AB09-00000085B9D3}","247500","2005-07-15 00:00","TA19 9DD","D","N","F","58","","ADAMS MEADOW","ILMINSTER","ILMINSTER","SOUTH SOMERSET","SOMERSET","A"
"{B20B1C74-E8E1-4137-AB3E-0000011DF342}","320000","2010-09-10 00:00","W4 1DZ","F","N","L","58","","WHELLOCK ROAD","","LONDON","EALING","GREATER LONDON","A"
"{7D6B0915-C56B-4275-AF9B-00000156BCE7}","104000","1997-08-27 00:00","NE61 2BH","D","N","F","17","","WESTGATE","MORPETH","MORPETH","CASTLE MORPETH","NORTHUMBERLAND","A"
"{47B60101-B64C-413D-8F60-000002F1692D}","147995","2003-05-02 00:00","PE33 0RU","D","N","F","4","","MASON GARDENS","WEST WINCH","KING'S LYNN","KING'S LYNN AND WEST NORFOLK","NORFOLK","A"
import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.{SparkConf, SparkContext}
 
case class BlogTransaction(price: Integer, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String)
 
object BlogApp {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
 
    sqlContext.read.format("com.databricks.spark.csv").load("/Users/markneedham/projects/land-registry/pp-complete.csv").registerTempTable("transactions")
 
    val rows = sqlContext.sql("select C1,C2,C3,C7,C8,C9,C10,C11,C12,C13 from transactions where transactions.C3 = 'SW3 4EU'").map(x =>
      Row.fromSeq(x.toSeq ++ Array(Array(x.get(4), x.get(3), x.get(5), x.get(6), x.get(7), x.get(8), x.get(9), x.get(2))
        .map(x => x.toString)
        .filter(x => !x.isEmpty)
        .distinct
        .mkString(" / "))))
 
    val path: String = "/tmp/tx-" + System.currentTimeMillis() + ".csv"
    rows.map {
      case Row(price: Integer, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String) =>
        BlogTransaction(price, date, postCode, paon, saon, street, locality, city, district, county) }
      .toDF()
      .write
      .format("com.databricks.spark.csv")
      .save(path)
  }
}

Let’s execute that job against a local Spark worker:

./spark-1.5.0-bin-hadoop2.6/bin/spark-submit --class BlogApp --master local[8] --packages com.databricks:spark-csv_2.10:1.2.0 target/scala-2.10/simple-project_2.10-1.0.jar
 
15/10/27 22:56:41 INFO Executor: Executor killed task 7.0 in stage 1.0 (TID 8)
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 1 times, most recent failure: Lost task 2.0 in stage 1.0 (TID 3, localhost): scala.MatchError: [14850000,2013-11-13 00:00,SW3 4EU,9,,ORMONDE GATE,,LONDON,KENSINGTON AND CHELSEA,GREATER LONDON,9 / ORMONDE GATE / LONDON / KENSINGTON AND CHELSEA / GREATER LONDON / SW3 4EU] (of class org.apache.spark.sql.catalyst.expressions.GenericRow)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:154)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:147)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
 
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:989)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:896)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1426)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1405)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1405)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1405)
	at com.databricks.spark.csv.package$CsvSchemaRDD.saveAsCsvFile(package.scala:169)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:165)
	at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
	at BlogApp$.main(BlogApp.scala:30)
	at BlogApp.main(BlogApp.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: scala.MatchError: [14850000,2013-11-13 00:00,SW3 4EU,9,,ORMONDE GATE,,LONDON,KENSINGTON AND CHELSEA,GREATER LONDON,9 / ORMONDE GATE / LONDON / KENSINGTON AND CHELSEA / GREATER LONDON / SW3 4EU] (of class org.apache.spark.sql.catalyst.expressions.GenericRow)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:154)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:147)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

So it looks like we have something wrong with our matching code and the only place we’re matching anything is the Row case class when we’re mapping over rows.

Although I thought price should be an integer I tweaked it to be a string just in case that was the issue:

case class BlogTransaction(price: Integer, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String)
...
case Row(price: Integer, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String) =>

changed to:

case class BlogTransaction(price: String, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String)
...
case Row(price: String, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String) =>

Attempt #2:

./spark-1.5.0-bin-hadoop2.6/bin/spark-submit --class BlogApp --master local[8] --packages com.databricks:spark-csv_2.10:1.2.0 target/scala-2.10/simple-project_2.10-1.0.jar
 
15/10/27 23:01:35 WARN TaskSetManager: Lost task 6.0 in stage 1.0 (TID 7, localhost): TaskKilled (killed intentionally)
Exception in thread "main" 15/10/27 23:01:35 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 1 times, most recent failure: Lost task 2.0 in stage 1.0 (TID 3, localhost): scala.MatchError: [14850000,2013-11-13 00:00,SW3 4EU,9,,ORMONDE GATE,,LONDON,KENSINGTON AND CHELSEA,GREATER LONDON,9 / ORMONDE GATE / LONDON / KENSINGTON AND CHELSEA / GREATER LONDON / SW3 4EU] (of class org.apache.spark.sql.catalyst.expressions.GenericRow)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:154)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:147)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
 
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:989)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:896)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1426)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1405)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1405)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1405)
	at com.databricks.spark.csv.package$CsvSchemaRDD.saveAsCsvFile(package.scala:169)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:165)
	at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
	at BlogApp$.main(BlogApp.scala:30)
	at BlogApp.main(BlogApp.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: scala.MatchError: [14850000,2013-11-13 00:00,SW3 4EU,9,,ORMONDE GATE,,LONDON,KENSINGTON AND CHELSEA,GREATER LONDON,9 / ORMONDE GATE / LONDON / KENSINGTON AND CHELSEA / GREATER LONDON / SW3 4EU] (of class org.apache.spark.sql.catalyst.expressions.GenericRow)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:154)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:147)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Hmmm….no improvement. At this point I realised I’d accidentally missed off the fullAddress argument from the case statement so I added that in:

case class BlogTransaction(price: String, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String)
...
case Row(price: String, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String) =>

changed to:

case class BlogTransaction(price: String, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String, fullAddress:String)
...
case Row(price: String, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String, fullAddress:String) =>

Attempt #3:

./spark-1.5.0-bin-hadoop2.6/bin/spark-submit --class BlogApp --master local[8] --packages com.databricks:spark-csv_2.10:1.2.0 target/scala-2.10/simple-project_2.10-1.0.jar
...
15/10/27 23:06:03 INFO DAGScheduler: Job 1 finished: saveAsTextFile at package.scala:169, took 39.665661 s

Hoorah, it took a bit of guess work but finally it’s finally working!

For completeness, here’s the final version of the Spark job:

import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.{SparkConf, SparkContext}
 
case class BlogTransaction(price: Integer, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String, fullAddress:String)
 
object BlogApp {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Simple Application")
 
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
 
    sqlContext.read.format("com.databricks.spark.csv").load("/Users/markneedham/projects/land-registry/pp-complete.csv").registerTempTable("transactions")
 
    val rows = sqlContext.sql("select C1,C2,C3,C7,C8,C9,C10,C11,C12,C13 from transactions where transactions.C3 = 'SW3 4EU'").map(x =>
      Row.fromSeq(x.toSeq ++ Array(Array(x.get(4), x.get(3), x.get(5), x.get(6), x.get(7), x.get(8), x.get(9), x.get(2))
        .map(x => x.toString)
        .filter(x => !x.isEmpty)
        .distinct
        .mkString(" / "))))
 
    val path: String = "/tmp/tx-" + System.currentTimeMillis() + ".csv"
    rows.map {
      case Row(price: Integer, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String, fullAddress:String) =>
        BlogTransaction(price, date, postCode, paon, saon, street, locality, city, district, county, fullAddress) }
      .toDF()
      .write
      .format("com.databricks.spark.csv")
      .save(path)
  }
}
Categories: Programming

Exploring (potential) data entry errors in the Land Registry data set

Sun, 10/18/2015 - 11:03

I’ve previously written a couple of blog posts describing the mechanics of analysing the Land Registry data set and I thought it was about time I described some of the queries I’ve been running the discoveries I’ve made.

To recap, the land registry provides a 3GB, 20 million line CSV file containing all the property sales in the UK since 1995.

We’ll be loading and query the data in R using the data.table package:

> library(data.table)
> dt = fread("pp-complete.csv", header = FALSE)
> dt[1:5]
                                       V1     V2               V3       V4 V5
1: {0C7ADEF5-878D-4066-B785-0000003ED74A} 163000 2003-02-21 00:00  UB5 4PJ  T
2: {35F67271-ABD4-40DA-AB09-00000085B9D3} 247500 2005-07-15 00:00 TA19 9DD  D
3: {B20B1C74-E8E1-4137-AB3E-0000011DF342} 320000 2010-09-10 00:00   W4 1DZ  F
4: {7D6B0915-C56B-4275-AF9B-00000156BCE7} 104000 1997-08-27 00:00 NE61 2BH  D
5: {47B60101-B64C-413D-8F60-000002F1692D} 147995 2003-05-02 00:00 PE33 0RU  D
   V6 V7  V8 V9           V10        V11         V12
1:  N  F 106     READING ROAD   NORTHOLT    NORTHOLT
2:  N  F  58     ADAMS MEADOW  ILMINSTER   ILMINSTER
3:  N  L  58    WHELLOCK ROAD                 LONDON
4:  N  F  17         WESTGATE    MORPETH     MORPETH
5:  N  F   4    MASON GARDENS WEST WINCH KING'S LYNN
                            V13            V14 V15
1:                       EALING GREATER LONDON   A
2:               SOUTH SOMERSET       SOMERSET   A
3:                       EALING GREATER LONDON   A
4:               CASTLE MORPETH NORTHUMBERLAND   A
5: KING'S LYNN AND WEST NORFOLK        NORFOLK   A

For our first query we’re going to find the most expensive query sold for each year from 1995 – 2015.

The first thing we’ll need to do is make column ‘V2’ (price) numeric and convert column ‘V3’ (sale date) to data format so we can do date arithmetic on it:

> dt = dt[, V2:= as.numeric(V2)]
> dt = dt[, V3:= as.Date(V3)]

Now let’s write the query:

> dt[, .SD[which.max(V2)], by=year(V3)][order(year)][, .(year,V9,V8,V10,V12,V14,V4,V2)]
    year             V9               V8                   V10            V12            V14       V4       V2
 1: 1995                  THORNETS HOUSE       BUILDER GARDENS    LEATHERHEAD         SURREY KT22 7DE  5610000
 2: 1996                              24             MAIN ROAD MELTON MOWBRAY LEICESTERSHIRE LE14 3SP 17250000
 3: 1997                              42        HYDE PARK GATE         LONDON GREATER LONDON  SW7 5DU  7500000
 4: 1998                              19     NEW BRIDGE STREET         LONDON GREATER LONDON EC4V 6DB 11250000
 5: 1999                  TERMINAL HOUSE LOWER BELGRAVE STREET         LONDON GREATER LONDON SW1W 0NH 32477000
 6: 2000         UNIT 3     JUNIPER PARK            FENTON WAY       BASILDON          ESSEX SS15 6RZ 12600000
 7: 2001                              19        BABMAES STREET         LONDON GREATER LONDON SW1Y 6HD 24750000
 8: 2002                              72        VINCENT SQUARE         LONDON GREATER LONDON SW1P 2PA  8300000
 9: 2003                              81          ADDISON ROAD         LONDON GREATER LONDON  W14 8ED  9250000
10: 2004                              29   HOLLAND VILLAS ROAD         LONDON GREATER LONDON  W14 8DH  7950000
11: 2005 APARTMENT 1102              199         KNIGHTSBRIDGE         LONDON GREATER LONDON  SW7 1RH 15193950
12: 2006                               1     THORNWOOD GARDENS         LONDON GREATER LONDON   W8 7EA 12400000
13: 2007                              36         CADOGAN PLACE         LONDON GREATER LONDON SW1X 9RX 17000000
14: 2008             50                         CHESTER SQUARE         LONDON GREATER LONDON SW1W 9EA 19750000
15: 2009                       CASA SARA     HEATHERSIDE DRIVE VIRGINIA WATER         SURREY GU25 4JU 13800000
16: 2010                              10   HOLLAND VILLAS ROAD         LONDON GREATER LONDON  W14 8BP 16200000
17: 2011                WHITESTONE HOUSE       WHITESTONE LANE         LONDON GREATER LONDON  NW3 1EA 19250000
18: 2012                              20           THE BOLTONS         LONDON GREATER LONDON SW10 9SU 54959000
19: 2013   APARTMENT 7F              171         KNIGHTSBRIDGE         LONDON GREATER LONDON  SW7 1DW 39000000
20: 2014                  APARTMENT 6, 5          PRINCES GATE         LONDON GREATER LONDON  SW7 1QJ 50000000
21: 2015                              37       BURNSALL STREET         LONDON GREATER LONDON  SW3 3SR 27750000
    year             V9               V8                   V10            V12            V14       V4       V2

The results mostly make sense – the majority of the highest priced properties are around Hyde Park and often somewhere near Knightsbridge which is one of the most expensive places in the country.

There are some odd odds though. e.g. in 1996 the top priced property is in Leicester and sold for just over £17m. I looked it up on the Land Registry site to quickly see what it was subsequently sold for:

2015 10 17 22 06 03

Based on the subsequent prices I think we can safely assume that the initial price is incorrect and should actually have been £17,250.

We can also say the same about our 2000 winner in Juniper Park in Basildon which sold for £12.6 million. If we look at the next sale price after that it’s £172,500 in 2003 so most likely it was sold for £126,000 – only 100 times out!

I wanted to follow this observation and see if I could find other anomalies by comparing adjacent sale prices of properties.

First we’ll create a ‘fullAddress’ field which we’ll use as an identifier for each property. It’s not completely unique but it’s not far away:

> dt = dt[, fullAddress := paste(dt$V8, dt$V9, dt$V10, dt$V11, dt$V12, dt$V13, dt$V4, sep=", ")]
> setkey(dt, fullAddress)
 
> dt[, .(fullAddress, V2)][1:5]
                                                                                  fullAddress     V2
1:                ''NUTSHELL COTTAGE, 72, , KIRKLAND, KENDAL, KENDAL, SOUTH LAKELAND, LA9 5AP  89000
2:                         'FARRIERS', , FARRIERS CLOSE, WOODLEY, READING, WOKINGHAM, RG5 3DD 790000
3: 'HOLMCROFT', 40, , BRIDGNORTH ROAD, WOMBOURNE, WOLVERHAMPTON, SOUTH STAFFORDSHIRE, WV5 0AA 305000
4:                            (AKERS), , CHAPEL STREET, EASINGWOLD, YORK, HAMBLETON, YO61 3AE 118000
5:                                       (ANNINGS), , , FARWAY, COLYTON, EAST DEVON, EX24 6DF 150000

Next we’ll add a column to the data table which contains the previous sale price and another column which calculate the difference between the two prices:

> dt[, lag.V2:=c(NA, V2[-.N]), by = fullAddress]
> dt[, V2.diff := V2 - lag.V2]
 
> dt[!is.na(lag.V2),][1:10][, .(fullAddress, lag.V2, V2, V2.diff)]
                                                                                   fullAddress lag.V2     V2 V2.diff
 1:                                       (ANNINGS), , , FARWAY, COLYTON, EAST DEVON, EX24 6DF 150000 385000  235000
 2:                  (BARBER), , PEACOCK CORNER, MOULTON ST MARY, NORWICH, BROADLAND, NR13 3NF 115500 136000   20500
 3:                      (BELL), , BAWBURGH ROAD, MARLINGFORD, NORWICH, SOUTH NORFOLK, NR9 5AG 128000 300000  172000
 4:                      (BEVERLEY), , DAWNS LANE, ASLOCKTON, NOTTINGHAM, RUSHCLIFFE, NG13 9AD  95000 210000  115000
 5: (BLACKMORE), , GREAT STREET, NORTON SUB HAMDON, STOKE-SUB-HAMDON, SOUTH SOMERSET, TA14 6SJ  53000 118000   65000
 6:                        (BOWDERY), , HIGH STREET, MARKINGTON, HARROGATE, HARROGATE, HG3 3NR 140000 198000   58000
 7:                  (BULLOCK), , MOORLAND ROAD, INDIAN QUEENS, ST. COLUMB, RESTORMEL, TR9 6HN  50000  50000       0
 8:                                   (CAWTHRAY), , CAWOOD ROAD, WISTOW, SELBY, SELBY, YO8 3XB 130000 120000  -10000
 9:                                   (CAWTHRAY), , CAWOOD ROAD, WISTOW, SELBY, SELBY, YO8 3XB 120000 155000   35000
10:                                 (COATES), , , BARDSEA, ULVERSTON, SOUTH LAKELAND, LA12 9QT  26000  36000   10000

Let’s find the properties which have the biggest £ value difference in adjacent sales:

> dt[!is.na(V2.diff)][order(-abs(V2.diff))][, .(fullAddress, lag.V2, V2, V2.diff)][1:20]
                                                                fullAddress   lag.V2       V2   V2.diff
 1:     , 50, CHESTER SQUARE, LONDON, LONDON, CITY OF WESTMINSTER, SW1W 9EA  1135000 19750000  18615000
 2:         44, , LANSDOWNE ROAD, , LONDON, KENSINGTON AND CHELSEA, W11 2LU  3675000 22000000  18325000
 3:      24, , MAIN ROAD, ASFORDBY VALLEY, MELTON MOWBRAY, MELTON, LE14 3SP 17250000    32500 -17217500
 4:           11, , ORMONDE GATE, , LONDON, KENSINGTON AND CHELSEA, SW3 4EU   250000 16000000  15750000
 5:     2, , HOLLAND VILLAS ROAD, , LONDON, KENSINGTON AND CHELSEA, W14 8BP  8675000 24000000  15325000
 6:          1, , PEMBRIDGE PLACE, , LONDON, KENSINGTON AND CHELSEA, W2 4XB  2340250 17000000  14659750
 7:     10, , CHESTER SQUARE, LONDON, LONDON, CITY OF WESTMINSTER, SW1W 9HH   680000 15000000  14320000
 8:        12, , SOUTH EATON PLACE, , LONDON, CITY OF WESTMINSTER, SW1W 9JA  4250000 18550000  14300000
 9:     32, FLAT 1, HOLLAND PARK, , LONDON, KENSINGTON AND CHELSEA, W11 3TA   420000 14100000  13680000
10:       42, , EGERTON CRESCENT, , LONDON, KENSINGTON AND CHELSEA, SW3 2EB  1125000 14650000  13525000
11:   36, , CADOGAN PLACE, LONDON, LONDON, KENSINGTON AND CHELSEA, SW1X 9RX  3670000 17000000  13330000
12:        22, , ILCHESTER PLACE, , LONDON, KENSINGTON AND CHELSEA, W14 8AA  3350000 16250000  12900000
13:                3, , BOLNEY GATE, , LONDON, CITY OF WESTMINSTER, SW7 1QW  5650000 18250000  12600000
14:        JUNIPER PARK, UNIT 3, FENTON WAY, , BASILDON, BASILDON, SS15 6RZ 12600000   172500 -12427500
15:           10, , WALTON PLACE, , LONDON, KENSINGTON AND CHELSEA, SW3 1RJ   356000 12750000  12394000
16: 84, MAISONETTE C, EATON SQUARE, , LONDON, CITY OF WESTMINSTER, SW1W 9AG  1500000 13400000  11900000
17:          3, , CHESTERFIELD HILL, , LONDON, CITY OF WESTMINSTER, W1J 5BJ   955000 12600000  11645000
18:   39, , ENNISMORE GARDENS, LONDON, LONDON, CITY OF WESTMINSTER, SW7 1AG  3650000 15250000  11600000
19:       76, FLAT 2, EATON SQUARE, , LONDON, CITY OF WESTMINSTER, SW1W 9AW  3500000 15000000  11500000
20:                            85, , AVENUE ROAD, , LONDON, CAMDEN, NW8 6JD   519000 12000000  11481000

Most of the entries here are in Westminster or Hyde Park and don’t look particularly dodgy at first glance. We’d have to drill into the sale dates to confirm.

What you might also have noticed is that our Melton Mowbray and Juniper Park properties both show up and although they don’t have the biggest £ value difference they would probably rank top if calculated the multiplier instead. Let’s give that a try:

> dt[, V2.multiplier := ifelse(V2 > lag.V2, V2 / lag.V2, lag.V2 / V2)]
 
> dt[!is.na(V2.multiplier)][order(-V2.multiplier)][, .(fullAddress, lag.V2, V2, V2.multiplier)][1:20]
                                                                            fullAddress   lag.V2       V2 V2.multiplier
 1:                  24, , MAIN ROAD, ASFORDBY VALLEY, MELTON MOWBRAY, MELTON, LE14 3SP 17250000    32500     530.76923
 2:                          LEA HAVEN, FLAT 1, CASTLE LANE, , TORQUAY, TORBAY, TQ1 3BE    38000  7537694     198.36037
 3:   NIGHTINGALE HOUSE, , BURLEIGH ROAD, ASCOT, ASCOT, WINDSOR AND MAIDENHEAD, SL5 7LD     9500  1100000     115.78947
 4:                    JUNIPER PARK, UNIT 3, FENTON WAY, , BASILDON, BASILDON, SS15 6RZ 12600000   172500      73.04348
 5:                           9, , ROTHSAY GARDENS, BEDFORD, BEDFORD, BEDFORD, MK40 3QA    21000  1490000      70.95238
 6:       22, GROUND FLOOR FLAT, SEA VIEW AVENUE, , PLYMOUTH, CITY OF PLYMOUTH, PL4 8RU    27950  1980000      70.84079
 7: 91A, , TINTERN AVENUE, WESTCLIFF-ON-SEA, WESTCLIFF-ON-SEA, SOUTHEND-ON-SEA, SS0 9QQ    17000  1190000      70.00000
 8:     204C, , SUTTON ROAD, SOUTHEND-ON-SEA, SOUTHEND-ON-SEA, SOUTHEND-ON-SEA, SS2 5ES    18000  1190000      66.11111
 9:            PRIORY COURT, FLAT 3, PRIORY AVENUE, TOTNES, TOTNES, SOUTH HAMS, TQ9 5HS  2226500    34000      65.48529
10:      59, , ST ANNS ROAD, SOUTHEND-ON-SEA, SOUTHEND-ON-SEA, SOUTHEND-ON-SEA, SS2 5AT    18250  1190000      65.20548
11:                                    15, , BREWERY LANE, LEIGH, LEIGH, WIGAN, WN7 2RJ    13500   880000      65.18519
12:                       11, , ORMONDE GATE, , LONDON, KENSINGTON AND CHELSEA, SW3 4EU   250000 16000000      64.00000
13:                         WOODEND, , CANNONGATE ROAD, HYTHE, HYTHE, SHEPWAY, CT21 5PX    19261  1200000      62.30206
14:                 DODLESTON OAKS, , CHURCH ROAD, DODLESTON, CHESTER, CHESTER, CH4 9NG    10000   620000      62.00000
15:         CREEKSIDE, , CURLEW DRIVE, WEST CHARLETON, KINGSBRIDGE, SOUTH HAMS, TQ7 2AA    28000  1700000      60.71429
16:                              20, , BRANCH ROAD, BURNLEY, BURNLEY, BURNLEY, BB11 3AT     9000   540000      60.00000
17:             THE BARN, , LEE WICK LANE, ST OSYTH, CLACTON-ON-SEA, TENDRING, CO16 8ES    10000   600000      60.00000
18:                           11, , OAKWOOD GARDENS, KNAPHILL, WOKING, WOKING, GU21 2RX     6000   357000      59.50000
19:                              23, , OLDHAM ROAD, GRASSCROFT, OLDHAM, OLDHAM, OL4 4HY     8000   475000      59.37500
20:                  THE SUNDAY HOUSE, , WATER LANE, GOLANT, FOWEY, RESTORMEL, PL23 1LF     8000   475000      59.37500

This is much better! Our Melton Mowbray property comes in first by miles and Juniper Park is there in 4th. The rest of the price increases look implausible as well but let’s drill into a couple of them:

> dt[fullAddress == "15, , BREWERY LANE, LEIGH, LEIGH, WIGAN, WN7 2RJ"][, .(fullAddress, V3, V2)]
                                        fullAddress         V3     V2
1: 15, , BREWERY LANE, LEIGH, LEIGH, WIGAN, WN7 2RJ 1995-06-29  13500
2: 15, , BREWERY LANE, LEIGH, LEIGH, WIGAN, WN7 2RJ 2008-03-28 880000


If we look at some other properties on the same road and look at the property’s features it seems more likely that’s meant to say £88,000.

I noticed a similar trend when looking at some of the others on this list but I also realised that the data needs a bit of cleaning up as the ‘fullAddress’ column isn’t uniquely identifying properties e.g. sometimes a property might have a Town/City of ‘London’ and a District of ‘London’ but on another transaction the District could be blank.

On top of that, my strategy of looking for subsequent prices to spot anomalies falls down when trying to explore properties which only have one sale.

So I have a couple of things to look into for now but once I’ve done those it’d be interesting to write an algorithm/program that could predict which transactions are likely to be anomalies.

I can imagine how that might work if I had a labelled training set but I’m not sure if I could do it with an unsupervised algorithm so if you have any pointers let me know.

Categories: Programming