Category Archives: Python

Enabling OAuth2 in python-social-auth

Having a running setup, using OAuth2 login with Django & python-social-auth==0.1.19, you can simply create a file within the social/backends folder:   
from social.backends.oauth import BaseOAuth2
class BnetOAuth2(BaseOAuth2):
    """Battlenet OAuth2 authentication backend"""
    name = 'bnet-oauth2'
    DEFAULT_SCOPE = ['wow.profile',
    EXTRA_DATA = [
        ('refresh_token', 'refresh_token', True),
        ('expires_in', 'expires'),
        ('token_type', 'token_type', True)
    def asd_get_user_id(self, details, response):
        if self.setting('USE_UNIQUE_USER_ID', False):
            return response['email']
            return response['username']
    def get_user_details(self, response):
        return {'username': response.get('username', ''),
                'email': response.get('email', ''),
                'fullname': response.get('name', ''),
                'first_name': response.get('given_name', ''),
                'last_name': response.get('family_name', '')}
    def user_data(self, access_token, *args, **kwargs):
        id = self.get_json(
            params={'access_token': access_token}
        btag = self.get_json(
            params={'access_token': access_token}
        first_name, last_name = btag.split("#")
        return {"id": id,
                "username": first_name,
                "email": "%d@%s" % (id, self.EMAIL_SUFFIX),
                "family_name": last_name,
                "given_name": first_name,
                "name": btag}

Finally activate the created backend and don’t forget to add the BNet API keys to your django settings. Now go and try the login.

AUTHENTICATION_BACKENDS = ('social.backends.bnet.BnetOAuth2')

After login, a new user should be created using the part of the battle tag before the # as username and first_name and the number behind the # as last_name. The email will be filled using the BNet account id and a user-defined suffix.

VN:F [1.9.22_1171]
Rating: 0.0/10 (0 votes cast)

Supervisord at 100% load for gunicorn

Starting a gunicorn django server with the supervisord daemon using the deprecated gunicorn_django command works but results in 100% load of the supervisor process.

process_name=gunicorn  ; process_name expr (default %(program_name)s)
numprocs=1                    ; number of processes copies to start (def 1)
directory=/home/gunicorn  ; directory to cwd to before exec (def no cwd)
autostart=true                ; start at supervisord start (default: true)
autorestart=true        ; whether/when to restart (default: unexpected)
user=gunicorn                   ; setuid to this UNIX account to run the program
redirect_stderr=true          ; redirect proc stderr to stdout (default false)

After adding gunicorn to the INSTALLED_APPS in the project settings, the gunicorn server can be easily started with the python run_gunicorn command and everything works fine.

VN:F [1.9.22_1171]
Rating: 6.0/10 (1 vote cast)

streaMplayer – play flash streams with mplayer

Want to watch a flash live stream but your browser is not able to play the stream smoothly ? Or maybe you simply hate flash movies… Here is the solution ! Catch the stream and pipe it to mplayer. Here is small python script to set the required iptables and start mplayer afterwards.

Usage: Start the script; then open / reload the website that includes the flash player in your browser.

import time
import subprocess
retries = 3
sleep = 3
iptAdd = "sudo iptables -t nat -A OUTPUT -p tcp --dport 1935 -j REDIRECT"
iptDel = "sudo iptables -t nat -D OUTPUT -p tcp --dport 1935 -j REDIRECT"
iptc = subprocess.Popen(iptAdd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
_, stderr = iptc.communicate()
if not len(stderr) == 0:
    rtmpsrv = subprocess.Popen("rtmpsrv", stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0)
    while True:
        line = rtmpsrv.stderr.readline()
        if line.find(b"Closing connection") >= 0:
    stdout, _ = rtmpsrv.communicate()
    rtmpdump = b"".join(stdout.split(b"\n"))
    rtmpdump = None
iptc = subprocess.Popen(iptDel, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
_, stderr = iptc.communicate()
if not len(stderr) == 0:
    print("Could not detect stream.")
if not rtmpdump is None:
    rtmpdump = rtmpdump.decode("ASCII")
    print("Found rtmpdump command: '%s'\n" % rtmpdump)
    mp = "%s | mplayer -" % rtmpdump[:rtmpdump.find(" -o \"")]
    print("Executing: '%s'\n" % mp)
    retry = 0
    while True:
        mplayer = subprocess.Popen(mp, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        _, stderr = mplayer.communicate()
        if stderr.find(b"ERROR") >= 0:
            print("Error :(")
            if retry < retries:
                retry += 1
                print("Retry (%d)" % retry)
                print("Giving up")




VN:F [1.9.22_1171]
Rating: 3.0/10 (1 vote cast)

Faster file IO in python using cython

Reading large files in Python sometimes feels incredible slow. Here are some approaches using Cython to minimize reading times. Simply compiling the existing python code with Cython reduces the reading times by 23%. By introducing explicit type definitions, I could finally reach C++ reading speeds wich are 4.4x faster than pure Python code. However, when I used the generator keyword yield to iterate over all lines in an external Python function without exploiting my memory, the required runtime doubles for this approach. The used codeĀ snippets are listed below.


File: Simple python function to read in a file line by line.

def read_file_python(filename):
    f = open(filename, "rb")
    while True:
        line = f.readline()
        if not line: break
        #yield line
    return []

File: file_io.pyx Cython file, containing a pure python function and a cython optimized function for linewise file reading.

from libc.stdio cimport *
cdef extern from "stdio.h":
    #FILE * fopen ( const char * filename, const char * mode )
    FILE *fopen(const char *, const char *)
    #int fclose ( FILE * stream )
    int fclose(FILE *)
    #ssize_t getline(char **lineptr, size_t *n, FILE *stream);
    ssize_t getline(char **, size_t *, FILE *)
def read_file_slow(filename):
    f = open(filename, "rb")
    while True:
        line = f.readline()
        if not line: break
        #yield line
    return []
def read_file(filename):
    filename_byte_string = filename.encode("UTF-8")
    cdef char* fname = filename_byte_string
    cdef FILE* cfile
    cfile = fopen(fname, "rb")
    if cfile == NULL:
        raise FileNotFoundError(2, "No such file or directory: '%s'" % filename)
    cdef char * line = NULL
    cdef size_t l = 0
    cdef ssize_t read
    while True:
        read = getline(&line, &l, cfile)
        if read == -1: break
        #yield line
    return []

File: file_io.cppComparison code for C++.

#include "stdio.h"
#include <stdlib.h>
int main()
    FILE* cfile = fopen("trajectory.pdb", "rb");
    if(cfile == NULL) return 1;
    char * line = NULL;
    size_t l = 0;
    ssize_t read;
        read = getline(&line, &l, cfile);
        if(read == -1) break;
    return 0;

File: file_io_bench.pyPython code to test and benchmark all different functions.
import timeit
count = 10
check = False
if check:
    from file_io import read_file, read_file_slow
    import hashlib
    m ="md5")
    for line in read_file_slow("trajectory.pdb"):
    h1 = m.hexdigest()
    m ="md5")
    for line in read_file("trajectory.pdb"):
    h2 = m.hexdigest()
    assert h1 == h2, Exception("read error")
    print("read functions: ok")
t = timeit.Timer("""for line in read_file_python("trajectory.pdb"):
  pass""", """from file_io_python import read_file_python""")
t1 = t.timeit(count)
print("Python", t1, "sec")
t = timeit.Timer("""for line in read_file_slow("trajectory.pdb"):
  pass""", """from file_io import read_file_slow""")
t2 = t.timeit(count)
print("Cython", t2, "sec")
t = timeit.Timer("""for line in read_file("trajectory.pdb"):
  pass""", """from file_io import read_file""")
t3 = t.timeit(count)
print("cdef Cython", t3, "sec")
t = timeit.Timer("""s = subprocess.Popen("./a.out", shell=True)
""", """import subprocess""")
t4 = t.timeit(count)
print("C", t4, "sec")

VN:F [1.9.22_1171]
Rating: 6.6/10 (5 votes cast)

Decrypting RSDF files

A small code-snippet to decrypt links from a RSDF container:

def decryptRSDF(filename):
    from Crypto.Cipher import AES

    links = []

    f = open(filename, "r")
    lines = f.readlines()

    data = bytearray.fromhex("".join(lines))
    array = data.split("\n")

    key = bytearray.fromhex("8C35192D964DC3182C6F84F3252239EB4A320D2500000000")
    iv = bytearray.fromhex("a3d5a33cb95ac1f5cbdb1ad25cb0a7aa")

    aes_context =, AES.MODE_ECB, str(iv))

    for line in array:
        url_in = base64.b64decode(line)
        length = len(url_in)

        if length > 0:
            url_input = bytearray(url_in)
            url_output = bytearray(length)

            #1 byte
            output_block = bytearray(aes_context.encrypt(str(iv)))

            url_output[0] = url_input[0] ^ output_block[0]

            #other bytes
            for n in range(1, length+1):
                iv[:15] = iv[1:]
                iv[15] = url_input[n-1]

                if n < length:
                    output_block = bytearray(aes_context.encrypt(str(iv)))
                    url_output[n] = url_input[n] ^ output_block[0]


    return links
VN:F [1.9.22_1171]
Rating: 3.0/10 (1 vote cast)

PyQt4 QFileDialog freezes when qt4reactor is running

To use the twisted reactor inside a Qt gui application, I’m using the qt4reactor package:

app = QtGui.QApplication(sys.argv)

import qt4reactor

from twisted.internet import reactor
factory = Factory()

gui = Gui(app)


But I realized that after the “reactor.runReturn()” line, each QFileDialog freezes the complete program.

filename = QtGui.QFileDialog.getOpenFileName()

A solution is to use the non-native dialog instead:

filename = QtGui.QFileDialog.getOpenFileName(options=QtGui.QFileDialog.DontUseNativeDialog)
VN:F [1.9.22_1171]
Rating: 0.0/10 (0 votes cast)

Improved python gzip reading speed

Dealing with large files of protein trajectories, I realized that some of my python scripts are incredibly slow in comparison with c++ code. I noticed that unzipping a trajectory before reading is faster than using the gzip module to read directly from the gzipped file ^^.

I have five different approaches to benchmark the reading speed for the following two (same) files:

-rw-r--r-- 1 doep doep 2.4G Feb 15 16:05 traj.pdb
-rw-r--r-- 1 doep doep 609M Feb 15 15:59 traj.pdb.gz

Each runtime was measured twice using the real-time of the ‘time’ command. Each approach reads in every single line via:

while True:
    line = f.readline()
    if not line: break

The five methods are:

  1. Reading from uncompressed file via: open()
  2. Reading from uncompressed file using the io module:
  3. Reading from compressed file using the gzip module:
  4. Reading from compressed file using a small class based on the zlib module: zlib_file()
  5. Reading from compressed file using named pipes: os.mkfifo()



Because storing/reading uncompressed file is not an option, the named pipes os.mkfifo() are the best/fastest solution for simply reading in files. But it also used the second system CPU, so the real-time is smaller than the user-time (90 +- 4.5). If you need seeks, etc you should extend the zlib_file class to your needs and gain a factor of ~2 in speedup. It is sad to see the performance of the approach, as ‘zcatĀ  traj.pdb.gz > /dev/null’ took only 21.165 seconds.

For uncompressed reads, the open() approach is the faster one, but on a different machine things were different as was 20x times faster than the open(). So you should check the open() speed on your machine before using it.

Complete code:

"""This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program.  If not, see <>."""
from __future__ import print_function
import io
import zlib
import sys
class zlib_file():
    def __init__(self, buffer_size=1024*1024*8):
        self.dobj = zlib.decompressobj(16+zlib.MAX_WBITS) #16+zlib.MAX_WBITS -> zlib can decompress gzip
        self.decomp = []
        self.lines = []
        self.buffer_size = buffer_size
    def open(self, filename):
        self.fhwnd =, "rb")
        self.eof = False
    def close(self):
        self.decomp = []
    def decompress(self):
        raw =
        if not raw:
            self.eof = True
            self.decomp.insert(0, self.dobj.flush())
            self.decomp.insert(0, self.dobj.decompress(raw))
    def readline(self):
        out_str = []
        while True:
            if len(self.lines) > 0:
                return self.lines.pop() + "\n"
            elif len(self.decomp) > 0:
                out = self.decomp.pop()
                arr = out.split("\n")
                if len(arr) == 1:
                    return "".join(out_str)
                if self.eof: break
        if len(out_str) > 0:
            return "".join(out_str)
    def readlines(self):
        lines = []
        while True:
            line = self.readline()
            if not line: break
        return lines
if __name__ == "__main__":
    mode = int(sys.argv[1])
    if mode == 1:
        f = open("traj.pdb")
        while True:
            line = f.readline()
            if not line: break
    elif mode == 2:
        f ="traj.pdb")
        while True:
            line = f.readline()
            if not line: break
    elif mode == 3:
        import gzip
        gz ="traj.pdb.gz", mode="r")
        while True:
            line = gz.readline()
            if not line: break
    if mode == 4:
        f = zlib_file()"traj.pdb.gz")
        while True:
            line = f.readline()
            if not line: break
    elif mode == 5:
        import os
        import subprocess
        tmp_fifo = "tmp_fifo"
        p = subprocess.Popen("gzip --stdout -d traj.pdb.gz > %s" % tmp_fifo, shell=True)
        f =, "r")
        while True:
            line = f.readline()
            if not line: break

VN:F [1.9.22_1171]
Rating: 7.5/10 (2 votes cast)