Repository URL to install this package:
|
Version:
2.1.0.jo1 ▾
|
#!/usr/bin/python
#
# Copyright Google 2007-2008, all rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import unittest
import getpass
import gdata.spreadsheet.text_db
import gdata.spreadsheet.service
__author__ = 'api.jscudder (Jeffrey Scudder)'
username = ''
password = ''
class FactoryTest(unittest.TestCase):
def setUp(self):
self.client = gdata.spreadsheet.text_db.DatabaseClient()
def testBadCredentials(self):
try:
self.client.SetCredentials('foo', 'bar')
self.fail()
except gdata.spreadsheet.text_db.Error, e:
pass
def testCreateGetAndDeleteDatabase(self):
db_title = 'google_spreadsheets_db unit test 1'
self.client.SetCredentials(username, password)
db = self.client.CreateDatabase(db_title)
# Test finding the database using the name
time.sleep(5)
db_list = self.client.GetDatabases(name=db_title)
self.assert_(len(db_list) >= 1)
if len(db_list) >= 1:
self.assert_(db_list[0].entry.title.text == db_title)
# Test finding the database using the spreadsheet key
db_list = self.client.GetDatabases(spreadsheet_key=db.spreadsheet_key)
self.assert_(len(db_list) == 1)
self.assert_(db_list[0].entry.title.text == db_title)
# Delete the test spreadsheet
time.sleep(10)
db.Delete()
class DatabaseTest(unittest.TestCase):
def setUp(self):
client = gdata.spreadsheet.text_db.DatabaseClient(username, password)
self.db = client.CreateDatabase('google_spreadsheets_db unit test 2')
def tearDown(self):
time.sleep(10)
self.db.Delete()
def testCreateGetAndDeleteTable(self):
table = self.db.CreateTable('test1', ['1','2','3'])
# Try to get the new table using the worksheet id.
table_list = self.db.GetTables(worksheet_id=table.worksheet_id)
self.assert_(len(table_list) == 1)
self.assert_(table_list[0].entry.title.text, 'test1')
# Try to get the table using the name
table_list = self.db.GetTables(name='test1')
self.assert_(len(table_list) == 1)
self.assert_(table_list[0].entry.title.text, 'test1')
# Delete the table
table.Delete()
class TableTest(unittest.TestCase):
def setUp(self):
client = gdata.spreadsheet.text_db.DatabaseClient(username, password)
self.db = client.CreateDatabase('google_spreadsheets_db unit test 3')
self.table = self.db.CreateTable('test1', ['a','b','c_d','a', 'd:e'])
def tearDown(self):
time.sleep(10)
self.db.Delete()
def testCreateGetAndDeleteRecord(self):
new_record = self.table.AddRecord({'a':'test1', 'b':'test2', 'cd':'test3', 'a_2':'test4', 'de':'test5'})
# Test getting record by line number.
record = self.table.GetRecord(row_number=1)
self.assert_(record is not None)
self.assert_(record.content['a'] == 'test1')
self.assert_(record.content['b'] == 'test2')
self.assert_(record.content['cd'] == 'test3')
self.assert_(record.content['a_2'] == 'test4')
# Test getting record using the id.
record_list = self.table.GetRecord(row_id=new_record.row_id)
self.assert_(record is not None)
# Delete the record.
time.sleep(10)
new_record.Delete()
def testPushPullSyncing(self):
# Get two copies of the same row.
first_copy = self.table.AddRecord({'a':'1', 'b':'2', 'cd':'3', 'a_2':'4', 'de':'5'})
second_copy = self.table.GetRecord(first_copy.row_id)
# Make changes in the first copy
first_copy.content['a'] = '7'
first_copy.content['b'] = '9'
# Try to get the changes before they've been committed
second_copy.Pull()
self.assert_(second_copy.content['a'] == '1')
self.assert_(second_copy.content['b'] == '2')
# Commit the changes, the content should now be different
first_copy.Push()
second_copy.Pull()
self.assert_(second_copy.content['a'] == '7')
self.assert_(second_copy.content['b'] == '9')
# Make changes to the second copy, push, then try to push changes from
# the first copy.
first_copy.content['a'] = '10'
second_copy.content['a'] = '15'
first_copy.Push()
try:
second_copy.Push()
# The second update should raise and exception due to a 409 conflict.
self.fail()
except gdata.spreadsheet.service.RequestError:
pass
except Exception, error:
#TODO: Why won't the except RequestError catch this?
pass
def testFindRecords(self):
# Add lots of test records:
self.table.AddRecord({'a':'1', 'b':'2', 'cd':'3', 'a_2':'4', 'de':'5'})
self.table.AddRecord({'a':'hi', 'b':'2', 'cd':'20', 'a_2':'4', 'de':'5'})
self.table.AddRecord({'a':'2', 'b':'2', 'cd':'3'})
self.table.AddRecord({'a':'2', 'b':'2', 'cd':'15', 'de':'7'})
self.table.AddRecord({'a':'hi hi hi', 'b':'2', 'cd':'15', 'de':'7'})
self.table.AddRecord({'a':'"5"', 'b':'5', 'cd':'15', 'de':'7'})
self.table.AddRecord({'a':'5', 'b':'5', 'cd':'15', 'de':'7'})
time.sleep(10)
matches = self.table.FindRecords('a == 1')
self.assert_(len(matches) == 1)
self.assert_(matches[0].content['a'] == '1')
self.assert_(matches[0].content['b'] == '2')
matches = self.table.FindRecords('a > 1 && cd < 20')
self.assert_(len(matches) == 4)
matches = self.table.FindRecords('cd < de')
self.assert_(len(matches) == 7)
matches = self.table.FindRecords('a == b')
self.assert_(len(matches) == 0)
matches = self.table.FindRecords('a == 5')
self.assert_(len(matches) == 1)
def testIterateResultSet(self):
# Populate the table with test data.
self.table.AddRecord({'a':'1', 'b':'2', 'cd':'3', 'a_2':'4', 'de':'5'})
self.table.AddRecord({'a':'hi', 'b':'2', 'cd':'20', 'a_2':'4', 'de':'5'})
self.table.AddRecord({'a':'2', 'b':'2', 'cd':'3'})
self.table.AddRecord({'a':'2', 'b':'2', 'cd':'15', 'de':'7'})
self.table.AddRecord({'a':'hi hi hi', 'b':'2', 'cd':'15', 'de':'7'})
self.table.AddRecord({'a':'"5"', 'b':'5', 'cd':'15', 'de':'7'})
self.table.AddRecord({'a':'5', 'b':'5', 'cd':'15', 'de':'7'})
# Get the first two rows.
records = self.table.GetRecords(1, 2)
self.assert_(len(records) == 2)
self.assert_(records[0].content['a'] == '1')
self.assert_(records[1].content['a'] == 'hi')
# Then get the next two rows.
next_records = records.GetNext()
self.assert_(len(next_records) == 2)
self.assert_(next_records[0].content['a'] == '2')
self.assert_(next_records[0].content['cd'] == '3')
self.assert_(next_records[1].content['cd'] == '15')
self.assert_(next_records[1].content['de'] == '7')
def testLookupFieldsOnPreexistingTable(self):
existing_table = self.db.GetTables(name='test1')[0]
existing_table.LookupFields()
self.assertEquals(existing_table.fields, ['a', 'b', 'cd', 'a_2', 'de'])
if __name__ == '__main__':
if not username:
username = raw_input('Spreadsheets API | Text DB Tests\n'
'Please enter your username: ')
if not password:
password = getpass.getpass()
unittest.main()