Before proceeding, kudos to the stack overflow post that hit the nail on the head of how to handle this use case.
If you've ever used the Django ORM bulk_create method, possibly you've run into the case where any key conflicts result in an IntegrityError exception that rolls back the whole bulk_create. The obvious fallback is to create each record individually and catch the exception. Or, use update_or_create.
Obviously, if you're creating 100s, 1000s, ... at once, well that's death by round trip call.
I ran into this problem writing backend cron jobs (celery tasks, to be precise) that grabbed big batches of API data and needed to stuff them into the database. On subsequent runs, the API queries might return some records written in previous runs, along with new data. This problem could come up in "tailing" logfiles into a db, polling threaded comments, etc.
Let's nail down the problem with semi-real requirement involving the Twitter API:
For a set of tweets capture retweets and build an author by retweeter table of retweet counts; update the data periodically
In other words:
tweeter a | tweeter b | |
---|---|---|
retweeter x | 5 | 16 |
retweeter y | 12 | 12 |
retweeter z | 0 | 1 |
Here's a spec to get us to code:
We'll implement most of that spec as a Django "manage.py" command, show what happens with the standard Django bulk_create, and then "fix it"
I'm going to assume you have virtualenvwrapper installed and configured, via pip or your platform packager, e.g., apt-get. If you just use virtualenv by itself, you shouldn't have much trouble following along and improvising as needed. If you have neither, then get one.
Create a Django project and app
mkvirtualenv bulk_create
pip install Django psycopg2 fabric twitter
cdvirtualenv
django-admin.py startproject project
cd project/
python manage.py startapp bulky
Configure for Postgresql and create the DB
In project/settings.py
set DATABASES
to something like
DATABASES = { "default": { "ENGINE": "django.db.backends.postgresql_psycopg2", "NAME": "bulk_create", "USER": "bulk_create", "PASSWORD": "bulk_create", "HOST": "127.0.0.1", "PORT": "", } }
and add the bulky
app to INSTALLED_APPS
INSTALLED_APPS = ( 'bulky',
You may have noticed the fabric pip install above. I'm going to use a fabfile to handle Postgres user and db create...I'll be dropping and recreating the db once or twice in this demo.
Here's my fabfile.py
:
import sys from importlib import import_module from os.path import dirname from fabric.api import * @task def localhost(): env.hosts.append('localhost') @task def get_settings(target=None): """Read project settings module""" # import requested settings try: sys.path.append(dirname(__file__)) env.settings = import_module('project.settings') except (ImportError, AttributeError): abort("Error importing settings") def dbpassword(username): if username == 'postgres': password = prompt("Enter database password for postgres user: ") else: password = env.settings.DATABASES['default']['PASSWORD'] return password @task def psql(sql, username=None, password=None): """ Run SQL against the project's database. """ args = { 'username': username or env.settings.DATABASES['default']['USER'], 'password': password or dbpassword(username), 'sql': sql, 'host': env.host, } cmd = """echo "{sql}" | PGPASSWORD='{password}' psql --username='{username}' --host={host}""".format(**args) return run(cmd) @task def createdb(): """ Create DB and DB user. """ sql = """CREATE USER {USER} WITH ENCRYPTED PASSWORD '{PASSWORD}'; CREATE DATABASE {NAME} WITH OWNER {USER} ENCODING = 'UTF8' LC_CTYPE = 'en_US.UTF-8' LC_COLLATE = 'en_US.UTF-8' TEMPLATE template0; """.format(**env.settings.DATABASES['default']) psql(sql, 'postgres') @task def dropdb(): """ Drop DB and DB user. """ sql = """DROP DATABASE IF EXISTS {NAME}; DROP USER IF EXISTS {USER}; """.format(**env.settings.DATABASES['default']) psql(sql, 'postgres')
With this fabfile, creating and syncing the db as spec'd in settings.py
is a two liner:
fab localhost get_settings createdb python manage.py syncdb --noinput
Create the Django model
In bulky/models.py
below the comment add
from django.db import models # Create your models here. class Retweeter(models.Model): tweet = models.BigIntegerField(blank=False) retweeter = models.BigIntegerField(blank=False, db_index=True) original_author = models.BigIntegerField(blank=False, db_index=True) class Meta: unique_together = ('tweet', 'retweeter') def __unicode__(self): return "Retweeter({},{},{})".format(self.tweet, self.retweeter, self.original_author)
With the new model, must re-sync the db
python manage.py syncdb --noinput
Add a custom command
Now, let's write a custom django-admin command to do the work. We'll need to create some new directories and __init__.py
files, like so:
mkdir -p bulky/management/commands touch bulky/management/__init__.py bulky/management/commands/__init__.py
and create a bulky/management/commands/get_retweeters.py
and fill it with
from optparse import make_option from django.core.management.base import BaseCommand, CommandError from twitter import * from bulky.models import Retweeter OAUTH_TOKEN = 'my' OAUTH_SECRET = 'twitterap' CONSUMER_KEY = 'credentials' CONSUMER_SECRET = 'here' class Command(BaseCommand): option_list = BaseCommand.option_list + ( make_option('-c', '--count', action='store', help='Limit to count retweets in each api call, 100 is the twitter max'), ) args = '<tweet_id ...>' help = 'Populate Retweeters table for given tweets' def handle(self, *args, **options): api = Twitter(auth=OAuth(OAUTH_TOKEN, OAUTH_SECRET, CONSUMER_KEY, CONSUMER_SECRET)) retweeters = [] retweet_args = { 'count': options['count'] } if options['count'] else {} for tweet_id in args: tweet = api.statuses.show(id=tweet_id, trim_user=1) for retweet in api.statuses.retweets(id=tweet_id, trim_user=1, **retweet_args): retweeters.append(Retweeter(tweet=tweet_id, retweeter=retweet['user']['id'], original_author=tweet['user']['id'])) Retweeter.objects.bulk_create(retweeters) self.stdout.write("{} records added".format(len(retweeters)))
Several notes at this point:
OAUTH_TOKEN
, OAUTH_SECRET
, CONSUMER_KEY
, CONSUMER_SECRET
in get_retweeters.py. --count
options. This option will simplify testing, as we'll see below.That's all the code we need, time to test.
Ok, let's pull in 1 retweet
python manage.py get_retweeters --count 1 21947795900469248 >>> from bulky.models import Retweeter >>> Retweeter.objects.all()[:] [<Retweeter: Retweeter(21947795900469248,16282333,819797)>] ^D
Now, let's poll for more tweets:
python manage.py get_retweeters --count 2 21947795900469248
OOPS! We get
IntegrityError: duplicate key value violates unique constraint "bulky_retweeter_tweet_retweeter_key" DETAIL: Key (tweet, retweeter)=(21947795900469248, 16282333) already exists.
...which...we were kind of expecting; that's the whole point of this article. Ok, so, let's fix it!
Here is our new models.py, with the secret sauce.
from django.db import models from django.db import connection, transaction # Create your models here. class RetweeterManager(models.Manager): def bulk_create_new(self, recs): """ bulk create recs, skipping key conflicts that would raise an IntegrityError return value: int count of recs written """ if not recs: return 0 cursor = connection.cursor() # lock and empty tmp table sql = """ BEGIN; LOCK TABLE bulky_retweetertmp IN EXCLUSIVE MODE; TRUNCATE TABLE bulky_retweetertmp; """ cursor.execute(sql) # write to tmp table RetweeterTmp.objects.bulk_create(recs) sql = """BEGIN; LOCK TABLE bulky_retweeter IN EXCLUSIVE MODE; INSERT INTO bulky_retweeter SELECT * FROM bulky_retweetertmp WHERE NOT EXISTS ( SELECT 1 FROM bulky_retweeter WHERE bulky_retweetertmp.tweet = bulky_retweeter.tweet AND bulky_retweetertmp.retweeter = bulky_retweeter.retweeter ); """ cursor.execute(sql) transaction.commit_unless_managed() try: # statusmessage is of form 'INSERT 0 1' return int(cursor.cursor.cursor.statusmessage.split(' ').pop()) except (IndexError, ValueError): raise Exception("Unexpected statusmessage from INSERT") class RetweeterBase(models.Model): tweet = models.BigIntegerField(blank=False) retweeter = models.BigIntegerField(blank=False, db_index=True) original_author = models.BigIntegerField(blank=False, db_index=True) class Meta: unique_together = ('tweet', 'retweeter') abstract = True class RetweeterTmp(RetweeterBase): pass class Retweeter(RetweeterBase): objects = RetweeterManager() def __unicode__(self): return "Retweeter({},{},{})".format(self.tweet, self.retweeter, self.original_author)
What are we doing here?
abstract = True
to user meta inheritance and avoid multi-table inheritance. Put simply, children of RetweeterBase will get its columns without any foreign key relations between base and child.We need to update the last 2 lines of get_retweeters.py
to use the new bulk create
n = Retweeter.objects.bulk_create_new(retweeters) self.stdout.write("{} of {} records added".format(n,len(retweeters)))
Let's use that fabfile to wipe and recreate the database:
fab get_settings localhost dropdb createdb python manage.py syncdb --noinput
and try again
(bulk_create)rod@rod-ublap:~/pyves/bulk_create/project$ python manage.py get_retweeters --count 1 21947795900469248 1 of 1 records added (bulk_create)rod@rod-ublap:~/pyves/bulk_create/project$ python manage.py get_retweeters --count 2 21947795900469248 1 of 2 records added (bulk_create)rod@rod-ublap:~/pyves/bulk_create/project$ python manage.py get_retweeters --count 4 21947795900469248 2 of 4 records added
Yay!
Really digging in to the performance of this bulk_create_new is beyond the scope here. I'm certain it performs better than the "n inserts" option. A worthy project would be to query log the operation, and see how fast/slow that "insert with the nested select" is. A project for another day!
Comments
New Comment