sábado, 3 de dezembro de 2011

Importing data from Oracle to Google App Engine Datastore with a custom bulkloader connector

I'm working on a project where we need to upload data from Oracle databases to Datastore very frequently.
The datastore bulkloader  bundled in the App Engine Python SDK works very well for data importing and exporting. It currently offers 3 different connector implementations: csv, xml and simpletext.
CSV connector was working smoothly for us but I wanted to avoid the CSV file generation. I wanted to bulkload directly from Oracle.
After some study I decided to write a new connector: the oracle_connector.
The only pre requisite for using this connector is having the cx_Oracle python module properly installed.
Here are some details about my environment:
  • Ubuntu 11.10 - 64 bits
  • Python 2.7.2+
  • cx_Oracle 5.0.4 unicode
  • Oracle Instant Client 11.2
  • Google App Engine Python SDK 1.6.0
Some design decisions

I wanted to specify a sql query for every kind/entity in the yaml config file and then map the returned (selected) column names or aliases to the properties via the external_name attribute. I didn't find a way to use a custom connector_options like "sql_query" so I used the existent columns option to inform the query.
I also wanted to have the database connection properties to be outside the connector implementation. In order to achieve that I decided to store these configurations in an external file which is passed through the command line "--filename" parameter to the appcfg.py script.

The actual implementation

It is basically comprised of 3 files:
  • bulkloader.yaml - imports and utilizes the oracle_connector as well as maps queries to entities
  • oracle_connector.py - connector implementation based on cx_Oracle (doesn't work for exports)
  • db_settings.py - defines connection properties variables used by oracle_connector
Invoking the bulkloader is very simple:

$APPENGINE_HOME/appcfg.py upload_data --config_file=bulkloader.yaml --kind=Table --filename=db_settings.py --email=user@gmail.com --url=http://app-id.appspot.com/remote_api

The bulkloader.yaml below shows how to import and use the oracle_connector. Also note the connector_options attribute.

- import: base64
- import: re
- import: oracle_connector
- import: google.appengine.ext.bulkload.transform
- import: google.appengine.ext.bulkload.bulkloader_wizard
- import: google.appengine.ext.db
- import: google.appengine.api.datastore
- import: google.appengine.api.users


- kind: Table
  connector: oracle_connector.OracleConnector.create_from_options
    columns: "select TABLE_NAME, TABLESPACE_NAME, LAST_ANALYZED from user_tables"
    - property: __key__
      external_name: TABLE_NAME

    - property: tablespace
      external_name: TABLESPACE_NAME

    - property: last_analyzed
      external_name: LAST_ANALYZED

There are some known issues here: 1) you must use uppercase strings in the external_name attribute and 2) I wasn't able to return number columns (had to use to_char oracle function) due to some problem in my cx_Oracle installation.

Below, the OracleConnector class which implements the connector_interface.ConnectorInterface. This was my first piece of python code. Let me know if I can improve it!

#!/usr/bin/env python
"""A bulkloader connector to read data from Oracle selects.
from google.appengine.ext.bulkload import connector_interface
from google.appengine.ext.bulkload import bulkloader_errors
import cx_Oracle
import os.path

class OracleConnector(connector_interface.ConnectorInterface):

  def create_from_options(cls, options, name):
    """Factory using an options dictionary.

      options: Dictionary of options:
        columns: sql query to perform, each selected column becomes a column
      name: The name of this transformer, for use in error messages.

      OracleConnector object described by the specified options.

      InvalidConfiguration: If the config is invalid.
    columns = options.get('columns', None)
    if not columns:
        raise bulkloader_errors.InvalidConfiguration(
            'Sql query must be specified in the columns '
            'configuration option. (In transformer name %s.)' % name)

    return cls(columns)

  def __init__(self, sql_query):

      sql_query: (required) select query which will be sent to database. The returned columns/aliases will be used as the connectors column names
    self.sql_query = unicode(sql_query)

  def generate_import_record(self, filename, bulkload_state):
    """Generator, yields dicts for nodes found as described in the options.

      filename: py script containing oracle database connection properties: host, port, uid, pwd and service.
      bulkload_state: Passed bulkload_state.

      Neutral dict, one per row returned by the sql query
    dbprops = __import__(os.path.splitext(filename)[0])
    dsn_tns = cx_Oracle.makedsn(dbprops.host, dbprops.port, dbprops.service)
    connection = cx_Oracle.connect(dbprops.uid, dbprops.pwd, dsn_tns)
    cursor = connection.cursor()
    cursor.arraysize = dbprops.cursor_arraysize
    num_fields = len(cursor.description)
    field_names = [i[0] for i in cursor.description]
    for row in cursor.fetchall():
       decoded_dict = {}
       for i in range(num_fields):
         decoded_dict[field_names[i]] = row[i]
       yield decoded_dict    

And finally the contents of db_settings.py:

cursor_arraysize = 50

Get the code

You can download the code here:



This post shows an alternative connector implementation for importing data directly from an Oracle database.
This approach could be easily extended to support other RDBMS such as MySQL or Postgres.
We still have to perform some tests to check how it will behave under different loads and data types but so far, it seems promising.