Django bulk_create without IntegrityError rollback

Contents


The Problem

Before proceeding, kudos to the stack overflow post that hit the nail on the head of how to handle this use case.

If you've ever used the Django ORM bulk_create method, possibly you've run into the case where any key conflicts result in an IntegrityError exception that rolls back the whole bulk_create. The obvious fallback is to create each record individually and catch the exception. Or, use update_or_create.

Obviously, if you're creating 100s, 1000s, ... at once, well that's death by round trip call.

I ran into this problem writing backend cron jobs (celery tasks, to be precise) that grabbed big batches of API data and needed to stuff them into the database. On subsequent runs, the API queries might return some records written in previous runs, along with new data. This problem could come up in "tailing" logfiles into a db, polling threaded comments, etc.

Let's nail down the problem with semi-real requirement involving the Twitter API:

Requirements and Specs

For a set of tweets capture retweets and build an author by retweeter table of retweet counts; update the data periodically

In other words:

tweeter a tweeter b
retweeter x 5 16
retweeter y 12 12
retweeter z 0 1

Here's a spec to get us to code:

  • For each tweet we're watching, capture the most recent 100 retweets from the twitter API
  • Save the tweet id, the retweeter id, and original author
  • The tweet id and retweeter id are unique together, i.e., no duplicated retweets of the same tweet by the same user
  • Recheck each tweet on a schedule

We'll implement most of that spec as a Django "manage.py" command, show what happens with the standard Django bulk_create, and then "fix it"

Code, First Pass

I'm going to assume you have virtualenvwrapper installed and configured, via pip or your platform packager, e.g., apt-get. If you just use virtualenv by itself, you shouldn't have much trouble following along and improvising as needed. If you have neither, then get one.

Create a Django project and app

mkvirtualenv bulk_create
pip install Django psycopg2 fabric twitter
cdvirtualenv
django-admin.py startproject project
cd project/
python manage.py startapp bulky

Configure for Postgresql and create the DB

In project/settings.py set DATABASES to something like

DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.postgresql_psycopg2",
        "NAME": "bulk_create",
        "USER": "bulk_create",
        "PASSWORD": "bulk_create",
        "HOST": "127.0.0.1",
        "PORT": "",
    }
}

and add the bulky app to INSTALLED_APPS

INSTALLED_APPS = (
    'bulky',

You may have noticed the fabric pip install above. I'm going to use a fabfile to handle Postgres user and db create...I'll be dropping and recreating the db once or twice in this demo.

Here's my fabfile.py:

import sys
from importlib import import_module
from os.path import dirname

from fabric.api import *

@task
def localhost():
    env.hosts.append('localhost')

@task
def get_settings(target=None):
    """Read project settings module"""
    # import requested settings
    try:
        sys.path.append(dirname(__file__))
        env.settings = import_module('project.settings')
    except (ImportError, AttributeError):
        abort("Error importing settings")


def dbpassword(username):
    if username == 'postgres':
        password = prompt("Enter database password for postgres user: ")
    else:
        password = env.settings.DATABASES['default']['PASSWORD']
    return password

@task
def psql(sql, username=None, password=None):
    """
    Run SQL against the project's database.
    """
    args = {
        'username': username or env.settings.DATABASES['default']['USER'],
        'password': password or dbpassword(username),
        'sql': sql,
        'host': env.host,
        }
    cmd = """echo "{sql}" | PGPASSWORD='{password}' psql --username='{username}' --host={host}""".format(**args)
    return run(cmd)

@task
def createdb():
    """
    Create DB and DB user.
    """

    sql = """CREATE USER {USER} WITH ENCRYPTED PASSWORD '{PASSWORD}';
    CREATE DATABASE {NAME} WITH OWNER {USER} ENCODING = 'UTF8'
    LC_CTYPE = 'en_US.UTF-8' LC_COLLATE = 'en_US.UTF-8' TEMPLATE template0;
    """.format(**env.settings.DATABASES['default'])
    psql(sql, 'postgres')

@task
def dropdb():
    """
    Drop DB and DB user.
    """
    sql = """DROP DATABASE IF EXISTS {NAME};
    DROP USER IF EXISTS {USER};
    """.format(**env.settings.DATABASES['default'])
    psql(sql, 'postgres')

With this fabfile, creating and syncing the db as spec'd in settings.py is a two liner:

fab localhost get_settings createdb
python manage.py syncdb --noinput

Create the Django model

In bulky/models.py below the comment add

from django.db import models

# Create your models here.

class Retweeter(models.Model):

    tweet = models.BigIntegerField(blank=False)
    retweeter = models.BigIntegerField(blank=False, db_index=True)
    original_author = models.BigIntegerField(blank=False, db_index=True)

    class Meta:
        unique_together = ('tweet', 'retweeter')

    def __unicode__(self):
        return "Retweeter({},{},{})".format(self.tweet, self.retweeter, self.original_author)

With the new model, must re-sync the db

python manage.py syncdb --noinput

Add a custom command

Now, let's write a custom django-admin command to do the work. We'll need to create some new directories and __init__.py files, like so:

mkdir -p bulky/management/commands
touch bulky/management/__init__.py bulky/management/commands/__init__.py

and create a bulky/management/commands/get_retweeters.py and fill it with

from optparse import make_option
from django.core.management.base import BaseCommand, CommandError
from twitter import *

from bulky.models import Retweeter

OAUTH_TOKEN = 'my'
OAUTH_SECRET = 'twitterap'
CONSUMER_KEY = 'credentials'
CONSUMER_SECRET = 'here'

class Command(BaseCommand):
    option_list = BaseCommand.option_list + (
        make_option('-c', '--count', action='store',
            help='Limit to count retweets in each api call, 100 is the twitter max'),
    )
    args = '<tweet_id ...>'
    help = 'Populate Retweeters table for given tweets'

    def handle(self, *args, **options):
        api = Twitter(auth=OAuth(OAUTH_TOKEN, OAUTH_SECRET, CONSUMER_KEY, CONSUMER_SECRET))
        retweeters = []
        retweet_args = { 'count': options['count'] } if options['count'] else {}
        for tweet_id in args:
            tweet = api.statuses.show(id=tweet_id, trim_user=1)
            for retweet in api.statuses.retweets(id=tweet_id, trim_user=1, **retweet_args):
                retweeters.append(Retweeter(tweet=tweet_id,
                                             retweeter=retweet['user']['id'],
                                             original_author=tweet['user']['id']))
        Retweeter.objects.bulk_create(retweeters)
        self.stdout.write("{} records added".format(len(retweeters)))

Several notes at this point:

  1. You'll need to go to the twitter devsite and create an app plus oauth credentials, if you don't already have a test app handy. If you're not sure how to do this, a quick search should help,
  2. Put the values from step 1 into the OAUTH_TOKEN, OAUTH_SECRET, CONSUMER_KEY, CONSUMER_SECRET in get_retweeters.py.
  3. We're using the neat little twitter library for Python, brought to you by Python Twitter Tools.
  4. Note that we've implemented a --count options. This option will simplify testing, as we'll see below.

That's all the code we need, time to test.

First Pass Test

Ok, let's pull in 1 retweet

python manage.py get_retweeters --count 1 21947795900469248
>>> from bulky.models import Retweeter
>>> Retweeter.objects.all()[:]
[<Retweeter: Retweeter(21947795900469248,16282333,819797)>]
^D

Now, let's poll for more tweets:

python manage.py get_retweeters --count 2 21947795900469248

OOPS! We get

IntegrityError: duplicate key value violates unique constraint "bulky_retweeter_tweet_retweeter_key"
DETAIL:  Key (tweet, retweeter)=(21947795900469248, 16282333) already exists.

...which...we were kind of expecting; that's the whole point of this article. Ok, so, let's fix it!

The 'Fix'

Here is our new models.py, with the secret sauce.

from django.db import models
from django.db import connection, transaction

# Create your models here.

class RetweeterManager(models.Manager):

    def bulk_create_new(self, recs):
        """
        bulk create recs, skipping key conflicts that would raise an IntegrityError
        return value: int count of recs written
        """

        if not recs:
            return 0
        cursor = connection.cursor()

        # lock and empty tmp table
        sql = """
        BEGIN;
        LOCK TABLE bulky_retweetertmp IN EXCLUSIVE MODE;
        TRUNCATE TABLE bulky_retweetertmp;
        """
        cursor.execute(sql)

        # write to tmp table
        RetweeterTmp.objects.bulk_create(recs)

        sql = """BEGIN;
        LOCK TABLE bulky_retweeter IN EXCLUSIVE MODE;

        INSERT INTO bulky_retweeter
        SELECT * FROM bulky_retweetertmp WHERE NOT EXISTS (
            SELECT 1 FROM bulky_retweeter WHERE bulky_retweetertmp.tweet = bulky_retweeter.tweet AND
                                           bulky_retweetertmp.retweeter = bulky_retweeter.retweeter
        );
        """
        cursor.execute(sql)
        transaction.commit_unless_managed()
        try:
            # statusmessage is of form 'INSERT 0 1'
            return int(cursor.cursor.cursor.statusmessage.split(' ').pop())
        except (IndexError, ValueError):
            raise Exception("Unexpected statusmessage from INSERT")

class RetweeterBase(models.Model):

    tweet = models.BigIntegerField(blank=False)
    retweeter = models.BigIntegerField(blank=False, db_index=True)
    original_author = models.BigIntegerField(blank=False, db_index=True)

    class Meta:
        unique_together = ('tweet', 'retweeter')
        abstract = True

class RetweeterTmp(RetweeterBase):
    pass

class Retweeter(RetweeterBase):
    objects = RetweeterManager()

    def __unicode__(self):
        return "Retweeter({},{},{})".format(self.tweet, self.retweeter, self.original_author)

What are we doing here?

  1. We've added a new model manager to create custom functions. Nothing fancy about that.
  2. We've moved the schema for Retweeter to a base class model, labeling it abstract = True to user meta inheritance and avoid multi-table inheritance. Put simply, children of RetweeterBase will get its columns without any foreign key relations between base and child.
  3. There are now two models that will instantiate the same schema, from RetweeterBase. RetweeterTmp is a "scratch" table for the bulk_create_new method.
  4. bulk_create_new uses custom sql to
    • wipe the tmp table
    • put the records into the tmp table in a single operation, with the Django bulk_create function
    • run an insert with a nested NOT EXISTS select, to avoid writing records with keys we already have.
    • return the number of records actually inserted
    • and yes, we lock tables to avoid multi-process/thread nastiness (tbh, the code could use closer inspection for multiple access corner cases...caveat emptor)

We need to update the last 2 lines of get_retweeters.py to use the new bulk create

        n = Retweeter.objects.bulk_create_new(retweeters)
        self.stdout.write("{} of {} records added".format(n,len(retweeters)))

Test the Fix

Let's use that fabfile to wipe and recreate the database:

fab get_settings localhost dropdb createdb
python manage.py syncdb --noinput

and try again

(bulk_create)rod@rod-ublap:~/pyves/bulk_create/project$ python manage.py get_retweeters --count 1 21947795900469248
1 of 1 records added
(bulk_create)rod@rod-ublap:~/pyves/bulk_create/project$ python manage.py get_retweeters --count 2 21947795900469248
1 of 2 records added
(bulk_create)rod@rod-ublap:~/pyves/bulk_create/project$ python manage.py get_retweeters --count 4 21947795900469248
2 of 4 records added

Yay!

Performance

Really digging in to the performance of this bulk_create_new is beyond the scope here. I'm certain it performs better than the "n inserts" option. A worthy project would be to query log the operation, and see how fast/slow that "insert with the nested select" is. A project for another day!

Comments

  • There are currently no comments

New Comment

required
required (not published)
optional