You are currently viewing an older version of the docs. Go to the latest version →

The Enron Email Corpus

This tutorial illustrates some of our data modeling techniques with the Enron email data set. We’ll look at some approaches to loading the data into FoundationDB and querying the data once its loaded. We’ll be drawing on Data Modeling, Developer Guide, and Python API, so you should take a look at those documents if you’re not familiar with them.

For an introductory tutorial that begins with “Hello world” and explains the basic concepts used in FoundationDB, take a look at our class scheduling tutorial.

Although we’ll be using Python, the concepts in this tutorial are also applicable to the other languages supported by FoundationDB.

If you’d like to see the finished version of the code we’ll be working with, take a look at the Appendix: EnronTutorial.py.

Overview of the data set

The Enron corpus is a publicly available data set of email messages sent or received by about 150 senior managers of the Enron corporation. The data set was acquired and subsequently made public by the Federal Energy Regulatory Commission during its investigation of Enron. It is believed to be one of the largest publicly available collection of real-world email data, and as such it has been widely used for research in social network analysis, natural language processing, and machine learning.

The corpus contains about half a million messages, organized by personal account and the folders each account contained. Several academic institutions host copies of the corpus, sometimes cleaned or annotated in various ways. We’ll be using the version hosted by CMU. We’ll also use some utility functions from MIT for reading the emails.

Here’s what Jeffrey Skilling’s folders look like:

localhost:CMU$ cd skilling-j

localhost:skilling-j$ ls

_sent_mail      deleted_items       notes_inbox
all_documents   discussion_threads  sent
calendar        inbox               sent_items
contacts        mark

The emails are in individual text files:

localhost:sent$ cd sent

localhost:sent$ cat 83.

Message-ID: <10708849.1075840101852.JavaMail.evans@thyme>
Date: Mon, 28 Aug 2000 08:01:00 -0700 (PDT)
From: sherri.sera@enron.com
To: slc1856@sunflower.com
Subject: Re: Thanks for dinner
Mime-Version: 1.0
Content-Type: text/plain; charset=us-ascii
Content-Transfer-Encoding: 7bit
X-From: Sherri Sera
X-To: "Stephen L Comeau" <slc1856@sunflower.com>
X-cc:
X-bcc:
X-Folder: \Jeffrey_Skilling_Dec2000\Notes Folders\Sent
X-Origin: SKILLING-J
X-FileName: jskillin.nsf

Steve, thanks for the note.  It was great to meet you.  I look forward to
seeing more of you in the future.

Regards,
Jeff

Data model

Before we begin loading the data into FoundationDB, we need to think about how to model it. In other words, how can we best represent the data in our ordered key-value store? We’d like to use a model that allows us to easily map the raw data to its representation in the database, while exposing its structure in a way that supports the queries or analysis we’re interested in.

One approach is to consider which entities represented in the data contain structure that we want to capture. From this perspective, two kinds of entities are evident: there are the email messages themselves, with their various fields, and there are the persons who send and receive the messages. If we take messages and persons as our two entity types, the relationships between them emerge naturally and can be readily extracted from the message fields.

We may also be interested in features of the data such as the pattern of contacts between persons or the textual content of their messages. These features can easily be accessed from the basic representation of the messages and persons, allowing us to extend the model to support additional queries.

An entity-relationship model

Given these considerations, we’ll use the entity-relationship model as described in Data Modeling. In this model, entities have directed relationships with each other and can also have associated attributes.

So, we’ll make each person and message an entity. We can use email addresses as identifiers for persons and message IDs as identifiers for messages. To indicate the sender of message, we’ll create a 'sender' relationship from a person to the message. Likewise, we’ll create a relationship of the appropriate type ('to', 'cc', or 'bcc') from a message to each of its recipients. All other data associated with a message (subject, text, date, etc.) will be recorded as an attribute of the message.

Loading the data

Message processing

Our approach will be to walk the directories and read each email into a Python dictionary whose keys are populated with fields of the message. In addition to the sender field, there are fields for recipients and other attributes:

recipientFields = ['to','cc','bcc']
attributeFields = ['folder','sendername','subject','date','text']

An individual message is small enough that we can write its various fields in a single transaction. We’ll first check if the message has already been loaded. If not, we’ll proceed to add relationships for 'sender' and the various recipient fields. Finally, we’ll add the attribute values:

@fdb.transactional
def addEmail( tr, email ):
    if isRelatedEntity(tr,'sender', email['sender'], email['mid']): return
    addRelationship( tr, 'sender', email['sender'], email['mid'])
    # Add relationships for each recipient field
    for field in recipientFields:
        for recipient in email[field]:
            addRelationship( tr, field, email['mid'], recipient)
    # Add attributes for each attribute field
    for field in attributeFields:
        if email[field]:
            fieldValue = str(email[field])[:40000]
            addAttributeValue( tr, email['mid'], field, fieldValue )

Data preparation

To read the email data from the text files, we’ll use an iterator that walks the directory structure for persons and folders, reads a message, and returns a Python dictionary as its next item. (You can see the EmailWalker class in resources/util/email_util.py in the Resources zip file.)

Before writing to the database, we’ll clear the 'ER' sub-keyspace that we’ll use for the entity-relationship data:

@fdb.transactional
def clearSubKeyspace(tr, prefix):
    tr.clear_range_startswith(fdb.tuple.pack( (prefix,) ))

Transaction management

To load a sizeable data set, it’s important to manage your transactions in an efficient manner. FoundationDB is not designed to support long or large transactions, so loading the data set in a single transaction is not an option. As described above, we’ll use one transaction per message with the addEmail() function.

An instance of EmailWalker is an iterator that returns dictionaries populated with email data, one message per dictionary. The simplest approach to loading the data into FoundationDB would be to sequentially load the messages using something like:

def loadEnronEmailsSequential(folder=''):
    clearSubKeyspace(db,'ER')
    reader = EmailWalker(datadir+folder)
    for email in reader: addEmail(db, email)

However, it’s more efficient to use a number of concurrent transactions per process. One approach to managing concurrency uses a queue-based producer-consumer pattern in which producers generate data and place it on the queue, while consumers remove data from the queue for further processing. The gevent module has a Queue class that supports this pattern, with each producer and consumer running in its own greenlet.

We’ll define a BoundedBuffer as a subclass of Queue that has a single producer with a reader function to obtain the data from its source, and multiple consumers with a writer function to initiate the write transactions. The queue’s put method will automatically block if the queue’s maxsize has been reached. We’ll set maxsize to the number of consumers, which will make the producer wait until some consumer is free before putting more data in the queue.

The consumers running in separate greenlets will be used to execute transactions, so the number of consumers will determine the number of concurrent transactions. The optimal number of concurrent transactions depends on your hardware configuration, but 50 transactions per process is a good starting point.

The method to produce_and_consume() will spawn the producer, spawn the consumers, wait for the producer to finish, wait for the queue to empty, and finally wait for all consumers to finish:

class BoundedBuffer(Queue):
    def __init__(self, reader, writer, number_consumers=50):
        # Setting maxsize to the number of consumers will make producers
        # wait to put a task in the queue until some consumer is free
        super(BoundedBuffer, self).__init__(maxsize=number_consumers)
        self._number_consumers = number_consumers
        self._reader = reader
        self._writer = writer

    def _producer(self):
        # put will block if maxsize of queue is reached
        for data in self._reader: self.put(data)

    def _consumer(self):
        try:
            while True:
                data = self.get(block=False)
                self._writer(db, data)
                gevent.sleep(0) # yield
        except Empty: pass

    def produce_and_consume(self):
        producers = [gevent.spawn(self._producer)]
        consumers = [gevent.spawn(self._consumer) for _ in xrange(self._number_consumers)]
        gevent.joinall(producers)
        gevent.joinall(consumers)

The reader for data loading will be the iterator instantiated from EmailWalker; the writer will be the addEmail() function. Our loading function will clear the 'ER' subspace, instantiate the reader and the queue, and kick off the producer and consumers:

def loadEnronEmails(folder=''):
    clearSubKeyspace(db,'ER')
    reader = EmailWalker(localdir+folder)
    tasks = BoundedBuffer(reader, addEmail)
    tasks.produce_and_consume()

Let’s load Jeff Skilling’s email and do some counts:

>>> loadEnronEmails('/skilling-j')

In this portion of the data set, how many messages were sent?

>>> len(getRelationships(db,'sender'))

4132

Of these messages, how many were sent by Mr. Skilling?

>>> len(getRelatedEntities(db,'sender','jeff.skilling@enron.com'))

98

Exposing the social network will support additional query types.

Social network

Using the above code, we can now load the Enron corpus in a way that captures its structure and various data values. By representing both persons and emails as entities, we’re building a bipartite graph. From this underlying graph, we can infer a second graph representing a social network. This social graph will have persons as nodes and directed edges for contacts as defined by one or more messages from sender to recipient.

We can build the social graph using indexContacts():

def indexContacts():
    db.clear_range_startswith(fdb.tuple.pack( ('contact',) ))
    for sender, emailID in getRelationships( db, 'sender'):
        for recipient in getRelatedEntities(db, 'to', emailID):
            addContact( db, sender, recipient)

In addition to storing the edge itself, addContact() stores an edge weight in the social graph by incrementing a count for each message sent by the sender to the recipient:

@fdb.transactional
def addContact( tr, sender, recipient ):
    weight = tr[ fdb.tuple.pack( ('contact', sender, recipient) ) ]
    tr[ fdb.tuple.pack( ('contact', sender, recipient) ) ] = str(int(weight or 0)+1)

Here, we’re maintaining a simple counter as the value of a key. A limitation of this technique is that it will lead to conflicts if we try to update the counter with concurrent transactions. In this case, indexContacts() works sequentially, so we’re OK. For a more robust implementation of counters that support concurrent updates, see our counter layer.

We’ll also define utility functions to get the contacts of a given sender and to get all contact pairs. For the latter, we’ll use a streaming mode of want_all:

@fdb.transactional
def getContacts( tr, sender ):
    return [ fdb.tuple.unpack(k)[-1] for k,_ in tr[ fdb.tuple.range( ('contact', sender))]]

@fdb.transactional
def getAllContacts( tr ):
    return [ fdb.tuple.unpack(k)[1:]
            for k,_ in tr.get_range_startswith(fdb.tuple.pack( ('contact',) ),
                                            streaming_mode = fdb.StreamingMode.want_all ) ]

Let’s index the contacts and look a bit at the data:

>>> indexContacts()

How many contact pairs are there in the email that Mr. Skilling held?

>>> len(getAllContacts(db))

8871

How many contacts did Mr. Skilling himself have?

>>> len(getContacts(db,'jeff.skilling@enron.com'))

809

Avoiding joins

Explicitly storing the social network is, in a sense, redundant in that the contact information could be computed from the entity-relationship model. This kind of redundancy is desirable when it allows common operations to be performed more efficiently. In particular, we should design our data models to reduce the number of joins performed by the client for common operations.

For example, we may be performing an analysis that requires the number of messages sent by a sender to a recipient. We could count the messages using the entity-relationship model by joining the 'sender' and 'to' relationships:

@fdb.transactional
def countMessagesWithJoin( tr, sender, recipient ):
    return len([emailID for s, emailID in getRelatedEntities( db, 'sender', sender)
                if isRelatedEntity(tr, 'to', emailID, recipient)])

Using the contact graph, this operation requires a single read:

@fdb.transactional
def countMessages(tr, sender, recipient ):
    return int(tr[ fdb.tuple.pack( ('contact', sender, recipient) ) ] or 0)

How many messages did Jeff Skilling receive from his brother Mark?

>>> countMessages(db,'markskilling@hotmail.com', 'jeff.skilling@enron.com')

69

For larger operations, the advantage of avoiding joins increases. If we want to sort all contacts in the graph by weight, we can still do so with a single read:

@fdb.transactional
def sortContacts( tr ):
    KVPairs = tr.get_range_startswith(fdb.tuple.pack( ('contact',) ),
                                    streaming_mode = fdb.StreamingMode.want_all )
    contacts = [ fdb.tuple.unpack(k)[1:]+(int(v),) for k,v in KVPairs ]
    return sorted(contacts, key=operator.itemgetter(2), reverse=True)

If we’re going to perform this operation frequently, we should consider adding an index of the form ('contact_index', weight, sender, recipient).

Extended contacts

In social networks, we often want not only a person’s immediate contacts but also the contacts of contacts, i.e., second-degree contacts, and so on for higher degrees. A naive recursive function will work for low degrees and data sets of moderate size:

def getExtendedContacts( sender, degree ):
    if degree == 1:
        return getContacts( db, sender )
    else:
        contacts = []
        for c in getContacts( db, sender):
            contacts.append(c)
            contacts = contacts + getExtendedContacts( c, degree-1)
        return list(set(contacts))

To increase performance, we can add indexes for higher degrees. The size of these indexes will increase with each degree, so we should add them only as needed. We can begin by adding an index for second-degree (“friends of friends”) contacts:

def indexSecondDegree():
    db.clear_range_startswith(fdb.tuple.pack( ('second',) ))
    for sender, recipient in getAllContacts(db):
        for second in getContacts( db, recipient):
            tr[ fdb.tuple.pack( ('second', sender, second) ) ] = ''

Using the index requires only an additional check in the second-degree case:

@fdb.transactional
def getSecondDegrees( tr, sender ):
    return [ fdb.tuple.unpack(k)[-1] for k,_ in tr[ fdb.tuple.range( ('second',sender) )]]

def getExtendedContacts( sender, degree ):
    if degree == 1:
        return getContacts( db, sender )
    elif degree == 2:
        return list(set(getContacts(db, sender) + getSecondDegrees( db, sender)))
    else:
        contacts = []
        for c in getContacts( db, sender):
            contacts.append(c)
            contacts = contacts + getExtendedContacts( c, degree-1)
        return list(set(contacts))

How many second-degree contacts did Mr. Skilling have?

>>> jeffFoF = getExtendedContacts('jeff.skilling@enron.com',2)
>>> len(jeffFoF)

1447

Who were a few of them?

>>> jeffFoF[:5]

['ken.rice@enron.com', 'john.reese@enron.com', 'margarita.meyer@enron.com', 'andrew.marsden@enron.com', 'john.echols@enron.com']

Concurrent traversal

Indexing the second-degree contacts helps us by transforming the graph to reduce the search space. However, it doesn’t improve the efficiency of our interaction with the database: our main source of latency, the range reads within getContacts() and getSecondDegrees(), are taking place sequentially. We can greatly increase our efficiency by performing the graph traversal and range reads concurrently.

We can use a gevent JoinableQueue bounded in size by the number of concurrent workers. After initializing the queue with the first-degree contacts, each worker will get a node from the queue, check if its distance from the starting node is within the specified degree, and if so, process its children.

Newly encountered nodes are processed by recording their distance from the starting node and adding them to the queue. If we encounter a node that has already been visited via a longer path, we reprocess the node with the shorter distance:

class BoundedTraversal(JoinableQueue):
    def __init__(self, get_children, number_workers=50):
        # Setting maxsize to the number of workers will block
        # putting a node in the queue until some worker is free
        super(BoundedTraversal, self).__init__(maxsize=number_workers)
        self._get_children = get_children
        self._number_workers = number_workers
        self._distance = {}

    def _worker(self, maxdistance):
        while True:
            node = self.get()
            if self._distance[node] < maxdistance:
                try:
                    # Range read will usually block the greenlet
                    children = self._get_children(db, node)
                    self._check_children(children,self._distance[node])
                finally: self.task_done()

    def _check_children(self, children, parent_distance):
        for c in children:
            # If c is unvisited or only visited via a longer path, then visit it
            if (c not in self._distance) or (self._distance[c] > parent_distance + 1):
                self._distance[c] = parent_distance + 1
                # put will block when maxsize of queue is reached
                self.put(c)

    def run_workers(self, root, maxdistance):
        self._distance[root] = 0
        children = self._get_children(db, root)
        gevent.spawn(self._check_children, children, 0)
        workers = [gevent.spawn(self._worker, maxdistance)
                    for _ in xrange(self._number_workers)]
        self.join
        gevent.joinall(workers,timeout=1)

    def visited(self):
        return self._distance.keys()

def getBoundedContacts(sender, degree):
    bounded = BoundedTraversal(getContacts, 50)
    bounded.run_workers(sender, degree)
    return bounded.visited()

How many fourth-degree contacts did Mr. Skilling have?

>>> len(getBoundedContacts('jeff.skilling@enron.com',4))

2203

Indexing by TF-IDF

Of course, an email corpus is interesting for its textual content as well as for the social network it defines. When working with large corpus, it would be helpful to have techniques to summarize individual messages or sets of messages (i.e., subsets of the corpus). We’d also like to be able to access those messages or message sets by their topical summaries.

Term Frequency-Inverse Document Frequency (TF-IDF) is a simple numeric measure of a term’s importance to a document in relation to the larger corpus in which it’s found. The notion of “document” in TF-IDF is flexible; it can correspond to any subset of the corpus. We’ll implement TF-IDF for two choices of document type: individual messages and persons (i.e., all messages sent by the person). You could easily adjust the code for other choices, such as email folders.

TF-IDF balances the frequency of term with a document (which results in a higher score) against its frequency in the corpus as whole (which results in a lower score). Term Frequency (TF) is just a count of the number of times a term appears in a document. Inverse Document Frequency (IDF) is a inverse measure of the frequency of the term across the corpus. It is calculated as the log-ratio of the number of documents to the number of documents containing the term. TF-IDF is just the product of TF and IDF.

Given TF-IDF scores, we’ll create two indexes for them. Using the tuple layer, the first index will store data elements in the order docID, score, term, which supports efficient retrieval of terms by individual docment. This operation is useful for generating summaries of the document. The second index will store data elements in the order term, score, docID, which supports efficient retrieval of documents by term. This operation is useful for topical searches. In both cases, FoundationDB’s ordering of keys will allow us to easily retrieve only the results with the highest scores:

@fdb.transactional
def index_score(tr, score_tuple):
    docType, score, docID, term = score_tuple
    tr[ fdb.tuple.pack( (docType+'_term', docID, score, term) ) ] = ""
    tr[ fdb.tuple.pack( ('term_'+docType, term, score, docID) ) ] = ""

Computing the TF-IDF scores

The tfidf() function does the bulk of our work. It computes and indexes TF-IDF scores over the entire corpus for a given document type. A document type of 'sender' uses the person sending the email; 'mid' uses individual messages:

def tfidf(docType):

    if docType not in ['mid', 'sender']: raise Exception('document type unknown')

    document_tf = defaultdict(Counter) # document_tf = { docID : Counter( { term : tf })}
    terms_per_document = defaultdict(set)

The function iterates over all messages in the corpus, retrieving the text in both the 'subject' and 'text' (i.e., body) fields. You can use the raw text from these fields or filter it using stop words. (You can find the get_terms() function used below for this purpose in the Day 4 zip file here.) For each term in the combined text, we count its occurrence in the document to get the TFs. In preparation for computing the IDFs, we also collect the set of terms that occur in each document:

for sender, emailID in getRelationships( db, 'sender'):

    if docType == 'mid': docID = emailID
    elif docType == 'sender': docID = sender

    subject = getAttributeValue(db, emailID, 'subject') or ''
    text = getAttributeValue(db, emailID,'text') or ''
    # Tokenize and cleanse the text
    terms_in_email = get_terms(subject) + get_terms(text)
    # Retrieve Counter for document and increment it for each term
    document_tf[docID].update(terms_in_email)
    # Collect all of the terms in each document
    terms_per_document[docID].update(terms_in_email)

Now, we can compute the IDFs. We retrieve the terms for each document and add them to a counter, giving us a the number of documents that contain each term:

# Map each term to the number of documents in which it occurs
allterms = Counter()
for _, terms in terms_per_document.iteritems():
    # Increment the Counter for each term in `terms`
    allterms.update(terms)

The IDF for each term is the log-ratio of the number of documents to the number of documents containing the term. We want a floating point result, so we’ll convert the integer counts to floats before performing the division:

idfs = {}
# Get the number of documents from the number of keys
ndocuments = len(terms_per_document)
for term, count in allterms.iteritems():
    # idfs = { term : frequency }
    idfs[term] = math.log( float(ndocuments) / float(count) )

Finally, we multiply each TF and IDF to get a floating point TF-IDF score. We’ll be storing the scores within indexes in FoundationDB, and the tuple layer supports integers but not floats. It’s not a problem to use integers because we only care about the order of the scores, not their absolute magnitude. A quick inspection of the raw scores shows that the first few digits are sufficient to preserve their order. We can multiply the TF-IDFs by 100, round, and convert them to integers to get scores of 2-4 digits without loss of order.

We’ll complete the function by returning a generator of tuples, each of which contains the data for a single indexing operation:

return ((docType, int(round(tf * idfs[term]*100)), docID, term)
        for docID, tfs in document_tf.iteritems()
        for term, tf in tfs.iteritems())

The final function to index the corpus, indexEmailTerms(), uses the same BoundedBuffer class that we used for loading. The generator returned by tfidf() serves as the reader for the producer, and the index_score() function serves as the writer for the concurrent consumers:

def indexEmailTerms(docType):
    clearSubKeyspace(db, docType+'_term')
    clearSubKeyspace(db, 'term_'+docType)
    reader = tfidf(docType)
    tasks = BoundedBuffer(reader, index_score)
    tasks.produce_and_consume()

We’ll index both document types:

>>> indexEmailTerms('mid')
>>> indexEmailTerms('sender')

Summarization

The terms in a document with the highest TF-IDFs are its most important terms as measured against the corpus as a whole. As such, these terms can serve as convenient summarization of the document. Once we’ve indexed the corpus for a document type, we can use prefix range reads to retrieve just the n highest terms for a document. We give get_range_startswith() a limit of n and specify reverse=True to get the highest scoring terms.

To summarize the email from a given sender, we can use getTermsForSender():

@fdb.transactional
def getTermsForSender(tr, n, sender):
    prefix = fdb.tuple.pack( ('sender_term', sender) )
    return [ fdb.tuple.unpack(k)[-1]
             for k, _ in tr.get_range_startswith(prefix, n, reverse=True) ]

For example:

>>> getTermsForSender(db,5,'david.morris@lehman.com')

['lehman', 'brothers', 'target', 'market', 'price']

To summarize a given message, we can use getTermsForMessage():

@fdb.transactional
def getTermsForMessage(tr, n, emailID):
    prefix = fdb.tuple.pack( ('mid_term', emailID) )
    return [ fdb.tuple.unpack(k)[-1]
             for k, _ in tr.get_range_startswith(prefix, n, reverse=True) ]

For example:

>>> getTermsForMessage(db,5,'6521568.1075845486224.JavaMail.evans@thyme')

['gama', 'investigation', 'trakya', 'turkish', 'bureaucrats']

Conclusion

The Enron Email Corpus consists of unstructured data in flat files, which might not seem like an obvious match for an ordered key-value store. Nevertheless, FoundationDB made it pretty easy to model the data set, load the data, and extend the model for various types of queries. We were even able to perform a lightweight analysis of the textual content of the data and index it to support efficient queries using FoundationDB’s range reads.

This exercise highlights a couple of important points:

  • A data model in FoundationDB is flexible and can be extended as needed. You can begin with a basic model that captures the essential structure of your data and then augment it to support your application’s query patterns.
  • Well-designed indexes can often reduce complex queries to simple range reads.

Given these observations, you can begin to think about how to use FoundationDB with your own application:

  • Take a look at our layers and see if you could use one or add your own!
  • Remember that you can use the techniques illustrated above in any of the languages supported by FoundationDB.
  • Ask questions or share your own experiences on the community site.

Appendix: EnronTutorial.py

Here’s the code for the Enron tutorial:

#!/usr/bin/python -i

####################################
##          Initialize            ##
####################################

import sys, math, operator, gevent
from gevent.queue import JoinableQueue, Queue, Empty
from collections import Counter, defaultdict

import fdb, fdb.tuple

fdb.api_version(22)

db = fdb.open(event_model="gevent")

####################################
##         Data Modeling          ##
####################################

# Sub-keyspaces

@fdb.transactional
def clearSubKeyspace(tr, prefix):
    tr.clear_range_startswith(fdb.tuple.pack( (prefix,) ))

# Attributes

@fdb.transactional
def addAttributeValue( tr, entityID, attribute, value ):
    tr[ fdb.tuple.pack( ("ER", entityID, attribute) ) ] = str(value)

@fdb.transactional
def getAttributeValue( tr, entityID, attribute ):
    return tr[ fdb.tuple.pack( ("ER", entityID, attribute) ) ]

# Relationships

@fdb.transactional
def addRelationship( tr, relationship, primaryKey, foreignKey ):
    tr[ fdb.tuple.pack( ("ER", relationship, primaryKey, foreignKey) ) ] = ""

@fdb.transactional
def getRelationships( tr, relationship ):
    return [ fdb.tuple.unpack(k)[2:]
            for k,v in tr.get_range_startswith(fdb.tuple.pack( ("ER", relationship ) ),
                                            streaming_mode = fdb.StreamingMode.want_all ) ]

@fdb.transactional
def getRelatedEntities( tr, relationship, primaryKey ):
    items = tr[ fdb.tuple.range( ("ER", relationship, primaryKey ) ) ]
    return [ fdb.tuple.unpack(k)[-1] for k,v in items ]

@fdb.transactional
def isRelatedEntity( tr, relationship, primaryKey, foreignKey ):
    return tr[ fdb.tuple.pack( ("ER", relationship, primaryKey, foreignKey ) ) ].present()

####################################
##        Loading the Data        ##
####################################

# Location of your downloaded data set
localdir = '/Users/stephenpimentel/Enron/'

# Code available on-line at URL in document
from email_util import EmailWalker

# Initial data model:
# Treat email addresses as unique IDs for persons
# Store persons and emails as entities
# Store all fields that have persons as values as relationships, i.e.,
#   sender:, to:, cc:, bcc:
# Store all other fields as attributes of email entity

# recipientFields are tuples when non-null, but empty list when null
recipientFields = ['to','cc','bcc']
attributeFields = ['folder','sendername','subject','date','text']

@fdb.transactional
def addEmail( tr, email ):
    # Check if the email has already been added to enforce idempotence
    if isRelatedEntity(tr,'sender', email['sender'], email['mid']): return
    # Add relationship for 'sender'
    addRelationship( tr, 'sender', email['sender'], email['mid'])
    # Add relationships for each recipient field
    for field in recipientFields:
        for recipient in email[field]:
            addRelationship( tr, field, email['mid'], recipient)
    # Add attributes for each attribute field
    for field in attributeFields:
        if email[field]:
            fieldValue = str(email[field])[:40000]
            addAttributeValue( tr, email['mid'], field, fieldValue )

def loadEnronEmailsSequential():
    clearSubKeyspace(db,'ER')
    walker = EmailWalker(localdir)
    for email in walker: addEmail(db, email)

class BoundedBuffer(Queue):
    def __init__(self, reader, writer, number_consumers=50):
        # Setting maxsize to the number of consumers will make producers
        # wait to put a task in the queue until some consumer is free
        super(BoundedBuffer, self).__init__(maxsize=number_consumers)
        self._number_consumers = number_consumers
        self._reader = reader
        self._writer = writer

    def _producer(self):
        # put will block if maxsize of queue is reached
        for data in self._reader: self.put(data)

    def _consumer(self):
        try:
            while True:
                data = self.get(block=False)
                self._writer(db, data)
                gevent.sleep(0) # yield
        except Empty: pass

    def produce_and_consume(self):
        producers = [gevent.spawn(self._producer)]
        consumers = [gevent.spawn(self._consumer) for _ in xrange(self._number_consumers)]
        gevent.joinall(producers)
        gevent.joinall(consumers)

def loadEnronEmails(folder=''):
    clearSubKeyspace(db,'ER')
    reader = EmailWalker(localdir+folder)
    tasks = BoundedBuffer(reader, addEmail)
    tasks.produce_and_consume()

####################################
##        Social Graph            ##
####################################

def indexContacts():
    db.clear_range_startswith(fdb.tuple.pack( ('contact',) ))
    for sender, emailID in getRelationships( db, 'sender'):
        for recipient in getRelatedEntities(db, 'to', emailID):
            addContact( db, sender, recipient)

@fdb.transactional
def addContact( tr, sender, recipient ):
    weight = tr[ fdb.tuple.pack( ('contact', sender, recipient) ) ]
    tr[ fdb.tuple.pack( ('contact', sender, recipient) ) ] = str(int(weight or 0)+1)

@fdb.transactional
def getContacts( tr, sender ):
    return [ fdb.tuple.unpack(k)[-1] for k,_ in tr[ fdb.tuple.range( ('contact', sender))]]

@fdb.transactional
def getAllContacts( tr ):
    return [ fdb.tuple.unpack(k)[1:]
         for k,_ in tr.get_range_startswith(fdb.tuple.pack( ('contact',) ),
                                     streaming_mode = fdb.StreamingMode.want_all ) ]

# How many messages has sender sent to recipient?
# Version using the ER model requires client to perform join
@fdb.transactional
def countMessagesWithJoin( tr, sender, recipient ):
    return len([emailID for emailID in getRelatedEntities( db, 'sender', sender)
                if isRelatedEntity(tr, 'to', emailID, recipient)])

@fdb.transactional
def countMessages(tr, sender, recipient ):
    return int(tr[ fdb.tuple.pack( ('contact', sender, recipient) ) ] or 0)

@fdb.transactional
def sortContacts( tr ):
    KVPairs = tr.get_range_startswith(fdb.tuple.pack( ('contact',) ),
                                    streaming_mode = fdb.StreamingMode.want_all )
    contacts = [ fdb.tuple.unpack(k)[1:]+(int(v),) for k,v in KVPairs ]
    return sorted(contacts, key=operator.itemgetter(2), reverse=True)

# Extended Contacts

def indexSecondDegree():
    db.clear_range_startswith(fdb.tuple.pack( ('second',) ))
    for sender, recipient in getAllContacts(db):
        for second in getContacts( db, recipient):
            db[ fdb.tuple.pack( ('second', sender, second) ) ] = ''

@fdb.transactional
def getSecondDegrees( tr, sender ):
    return [ fdb.tuple.unpack(k)[-1] for k,_ in tr[ fdb.tuple.range( ('second',sender) )]]

def getExtendedContacts( sender, degree ):
    if degree == 1:
        return getContacts( db, sender )
    elif degree == 2:
        return list(set(getContacts(db, sender) + getSecondDegrees( db, sender)))
    else:
        contacts = []
        for c in getContacts( db, sender):
            contacts.append(c)
            contacts = contacts + getExtendedContacts( c, degree-1)
        return list(set(contacts))

####################################
##      Concurrent Traversal      ##
####################################

class BoundedTraversal(JoinableQueue):
    def __init__(self, get_children, number_workers=50):
        # Setting maxsize to the number of workers will block
        # putting a node in the queue until some worker is free
        super(BoundedTraversal, self).__init__(maxsize=number_workers)
        self._get_children = get_children
        self._number_workers = number_workers
        self._distance = {}

    def _worker(self, maxdistance):
        while True:
            node = self.get()
            if self._distance[node] < maxdistance:
                try:
                    # Range read will usually block the greenlet
                    children = self._get_children(db, node)
                    self._check_children(children,self._distance[node])
                finally: self.task_done()

    def _check_children(self, children, parent_distance):
        for c in children:
            # If c is unvisited or only visited via a longer path, then visit it
            if (c not in self._distance) or (self._distance[c] > parent_distance + 1):
                self._distance[c] = parent_distance + 1
                # put will block when maxsize of queue is reached
                self.put(c)

    def run_workers(self, root, maxdistance):
        self._distance[root] = 0
        children = self._get_children(db, root)
        gevent.spawn(self._check_children, children, 0)
        workers = [gevent.spawn(self._worker, maxdistance)
                    for _ in xrange(self._number_workers)]
        self.join
        gevent.joinall(workers,timeout=1)

    def visited(self):
        return self._distance.keys()

def getBoundedContacts(sender, degree):
    bounded = BoundedTraversal(getContacts, 50)
    bounded.run_workers(sender, degree)
    return bounded.visited()

####################################
##             TF-IDF             ##
####################################

# Code available on-line at URL in document
from get_terms import *

@fdb.transactional
def index_score(tr, score_tuple):
    docType, score, docID, term = score_tuple
    tr[ fdb.tuple.pack( (docType+'_term', docID, score, term) ) ] = ""
    tr[ fdb.tuple.pack( ('term_'+docType, term, score, docID) ) ] = ""

# The docType parameter defines the abstract "documents" to be used for TF-IDF
# For example,
#   docType = 'sender' would use person sending email
#   docType = 'mid' would use email itself

def tfidf(docType):

    if docType not in ['mid', 'sender']: raise Exception('document type unknown')

    document_tf = defaultdict(Counter) # document_tf = { docID : Counter( { term : tf })}
    terms_per_document = defaultdict(set)

    for sender, emailID in getRelationships( db, 'sender'):

        if docType == 'mid': docID = emailID
        elif docType == 'sender': docID = sender

        subject = getAttributeValue(db, emailID, 'subject') or ''
        text = getAttributeValue(db, emailID,'text') or ''
        # Tokenize and cleanse the text
        terms_in_email = get_terms(subject) + get_terms(text)
        # Retrieve Counter for document and increment it for each term
        document_tf[docID].update(terms_in_email)
        # Collect all of the terms in each document
        terms_per_document[docID].update(terms_in_email)

    # allterms maps each term to the number of documents it occurs in
    allterms = Counter()
    for _, terms in terms_per_document.iteritems():
        # Increment the counter value for each term in `terms`
        allterms.update(terms)

    # Compute the Inverse Document Frequencies
    idfs = {}
    # Get the number of documents from the number of keys
    ndocuments = len(terms_per_document)
    for term, count in allterms.iteritems():
        # idfs = { term : frequency }
        idfs[term] = math.log( float(ndocuments) / float(count) )

    # Combine the Term Frequencies and Inverse Document Frequencies into TF-IDFs
    # Convert the TF-IDFs to integers with digits sufficient to preserve order
    # Return a generator of tuples with TD-IDFs to be indexed
    return ((docType, int(round(tf * idfs[term]*100)), docID, term)
            for docID, tfs in document_tf.iteritems()
            for term, tf in tfs.iteritems())

def indexEmailTerms(docType):
    clearSubKeyspace(db, docType+'_term')
    clearSubKeyspace(db, 'term_'+docType)
    reader = tfidf(docType)
    tasks = BoundedBuffer(reader, index_score)
    tasks.produce_and_consume()

####################################
##   Query Functions for TF-IDF   ##
####################################

@fdb.transactional
def getTermsForSender(tr, n, sender):
    prefix = fdb.tuple.pack( ('sender_term', sender) )
    return [ fdb.tuple.unpack(k)[-1]
             for k, _ in tr.get_range_startswith(prefix, n, reverse=True) ]

@fdb.transactional
def getTermsForMessage(tr, n, emailID):
    prefix = fdb.tuple.pack( ('mid_term', emailID) )
    return [ fdb.tuple.unpack(k)[-1]
             for k, _ in tr.get_range_startswith(prefix, n, reverse=True) ]

@fdb.transactional
def getSendersForTerm(tr, n, term):
    prefix = fdb.tuple.pack( ('term_sender', term) )
    return [ fdb.tuple.unpack(k)[-1]
             for k, _ in tr.get_range_startswith(prefix, n, reverse=True) ]

@fdb.transactional
def getMessagesForTerm(tr, n, term):
    prefix = fdb.tuple.pack( ('term_mid', term) )
    return [ fdb.tuple.unpack(k)[-1]
             for k, _ in tr.get_range_startswith(prefix, n, reverse=True) ]