Repository URL to install this package:
|
Version:
2.1.0.jo1 ▾
|
#!/usr/bin/python
#
# Copyright (C) 2008 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = 'api.jhartmann@gmail.com (Jochen Hartmann)'
import getpass
import time
import StringIO
import random
import unittest
import atom
import gdata.youtube
import gdata.youtube.service
YOUTUBE_TEST_CLIENT_ID = 'ytapi-pythonclientlibrary_servicetest'
class YouTubeServiceTest(unittest.TestCase):
def setUp(self):
self.client = gdata.youtube.service.YouTubeService()
self.client.email = username
self.client.password = password
self.client.source = YOUTUBE_TEST_CLIENT_ID
self.client.developer_key = developer_key
self.client.client_id = YOUTUBE_TEST_CLIENT_ID
self.client.ProgrammaticLogin()
def testRetrieveVideoFeed(self):
feed = self.client.GetYouTubeVideoFeed(
'https://gdata.youtube.com/feeds/api/standardfeeds/recently_featured');
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 0)
for entry in feed.entry:
self.assert_(entry.title.text != '')
def testRetrieveTopRatedVideoFeed(self):
feed = self.client.GetTopRatedVideoFeed()
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 10)
def testRetrieveMostViewedVideoFeed(self):
feed = self.client.GetMostViewedVideoFeed()
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 10)
def testRetrieveRecentlyFeaturedVideoFeed(self):
feed = self.client.GetRecentlyFeaturedVideoFeed()
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 10)
def testRetrieveWatchOnMobileVideoFeed(self):
feed = self.client.GetWatchOnMobileVideoFeed()
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 10)
def testRetrieveTopFavoritesVideoFeed(self):
feed = self.client.GetTopFavoritesVideoFeed()
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 10)
def testRetrieveMostRecentVideoFeed(self):
feed = self.client.GetMostRecentVideoFeed()
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 10)
def testRetrieveMostDiscussedVideoFeed(self):
feed = self.client.GetMostDiscussedVideoFeed()
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 10)
def testRetrieveMostLinkedVideoFeed(self):
feed = self.client.GetMostLinkedVideoFeed()
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 10)
def testRetrieveMostRespondedVideoFeed(self):
feed = self.client.GetMostRespondedVideoFeed()
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 10)
def testRetrieveVideoEntryByUri(self):
entry = self.client.GetYouTubeVideoEntry(
'https://gdata.youtube.com/feeds/videos/Ncakifd_16k')
self.assert_(isinstance(entry, gdata.youtube.YouTubeVideoEntry))
self.assert_(entry.title.text != '')
def testRetrieveVideoEntryByVideoId(self):
entry = self.client.GetYouTubeVideoEntry(video_id='Ncakifd_16k')
self.assert_(isinstance(entry, gdata.youtube.YouTubeVideoEntry))
self.assert_(entry.title.text != '')
def testRetrieveUserVideosbyUri(self):
feed = self.client.GetYouTubeUserFeed(
'https://gdata.youtube.com/feeds/users/gdpython/uploads')
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 0)
def testRetrieveUserVideosbyUsername(self):
feed = self.client.GetYouTubeUserFeed(username='gdpython')
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 0)
def testSearchWithVideoQuery(self):
query = gdata.youtube.service.YouTubeVideoQuery()
query.vq = 'google'
query.max_results = 8
feed = self.client.YouTubeQuery(query)
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assertEquals(len(feed.entry), 8)
def testDirectVideoUploadStatusUpdateAndDeletion(self):
self.assertEquals(self.client.developer_key, developer_key)
self.assertEquals(self.client.client_id, YOUTUBE_TEST_CLIENT_ID)
self.assertEquals(self.client.additional_headers['X-GData-Key'],
'key=' + developer_key)
self.assertEquals(self.client.additional_headers['X-Gdata-Client'],
YOUTUBE_TEST_CLIENT_ID)
test_video_title = 'my cool video ' + str(random.randint(1000,5000))
test_video_description = 'description ' + str(random.randint(1000,5000))
my_media_group = gdata.media.Group(
title = gdata.media.Title(text=test_video_title),
description = gdata.media.Description(description_type='plain',
text=test_video_description),
keywords = gdata.media.Keywords(text='video, foo'),
category = gdata.media.Category(
text='Autos',
scheme='http://gdata.youtube.com/schemas/2007/categories.cat',
label='Autos'),
player=None
)
self.assert_(isinstance(my_media_group, gdata.media.Group))
# Set Geo location to 37,-122 lat, long
where = gdata.geo.Where()
where.set_location((37.0,-122.0))
video_entry = gdata.youtube.YouTubeVideoEntry(media=my_media_group,
geo=where)
self.assert_(isinstance(video_entry, gdata.youtube.YouTubeVideoEntry))
new_entry = self.client.InsertVideoEntry(video_entry, video_file_location)
self.assert_(isinstance(new_entry, gdata.youtube.YouTubeVideoEntry))
self.assertEquals(new_entry.title.text, test_video_title)
self.assertEquals(new_entry.media.description.text, test_video_description)
self.assert_(new_entry.id.text)
# check upload status also
upload_status = self.client.CheckUploadStatus(new_entry)
self.assert_(upload_status[0] != '')
# test updating entry meta-data
new_video_description = 'description ' + str(random.randint(1000,5000))
new_entry.media.description.text = new_video_description
updated_entry = self.client.UpdateVideoEntry(new_entry)
self.assert_(isinstance(updated_entry, gdata.youtube.YouTubeVideoEntry))
self.assertEquals(updated_entry.media.description.text,
new_video_description)
# sleep for 10 seconds
time.sleep(10)
# test to delete the entry
value = self.client.DeleteVideoEntry(updated_entry)
if not value:
# sleep more and try again
time.sleep(20)
# test to delete the entry
value = self.client.DeleteVideoEntry(updated_entry)
self.assert_(value == True)
def testDirectVideoUploadWithDeveloperTags(self):
self.assertEquals(self.client.developer_key, developer_key)
self.assertEquals(self.client.client_id, YOUTUBE_TEST_CLIENT_ID)
self.assertEquals(self.client.additional_headers['X-GData-Key'],
'key=' + developer_key)
self.assertEquals(self.client.additional_headers['X-Gdata-Client'],
YOUTUBE_TEST_CLIENT_ID)
test_video_title = 'my cool video ' + str(random.randint(1000,5000))
test_video_description = 'description ' + str(random.randint(1000,5000))
test_developer_tag_01 = 'tag' + str(random.randint(1000,5000))
test_developer_tag_02 = 'tag' + str(random.randint(1000,5000))
test_developer_tag_03 = 'tag' + str(random.randint(1000,5000))
my_media_group = gdata.media.Group(
title = gdata.media.Title(text=test_video_title),
description = gdata.media.Description(description_type='plain',
text=test_video_description),
keywords = gdata.media.Keywords(text='video, foo'),
category = [gdata.media.Category(
text='Autos',
scheme='http://gdata.youtube.com/schemas/2007/categories.cat',
label='Autos')],
player=None
)
self.assert_(isinstance(my_media_group, gdata.media.Group))
video_entry = gdata.youtube.YouTubeVideoEntry(media=my_media_group)
original_developer_tags = [test_developer_tag_01, test_developer_tag_02,
test_developer_tag_03]
dev_tags = video_entry.AddDeveloperTags(original_developer_tags)
for dev_tag in dev_tags:
self.assert_(dev_tag.text in original_developer_tags)
self.assert_(isinstance(video_entry, gdata.youtube.YouTubeVideoEntry))
new_entry = self.client.InsertVideoEntry(video_entry, video_file_location)
self.assert_(isinstance(new_entry, gdata.youtube.YouTubeVideoEntry))
self.assertEquals(new_entry.title.text, test_video_title)
self.assertEquals(new_entry.media.description.text, test_video_description)
self.assert_(new_entry.id.text)
developer_tags_from_new_entry = new_entry.GetDeveloperTags()
for dev_tag in developer_tags_from_new_entry:
self.assert_(dev_tag.text in original_developer_tags)
self.assertEquals(len(developer_tags_from_new_entry),
len(original_developer_tags))
# sleep for 10 seconds
time.sleep(10)
# test to delete the entry
value = self.client.DeleteVideoEntry(new_entry)
if not value:
# sleep more and try again
time.sleep(20)
# test to delete the entry
value = self.client.DeleteVideoEntry(new_entry)
self.assert_(value == True)
def testBrowserBasedVideoUpload(self):
self.assertEquals(self.client.developer_key, developer_key)
self.assertEquals(self.client.client_id, YOUTUBE_TEST_CLIENT_ID)
self.assertEquals(self.client.additional_headers['X-GData-Key'],
'key=' + developer_key)
self.assertEquals(self.client.additional_headers['X-Gdata-Client'],
YOUTUBE_TEST_CLIENT_ID)
test_video_title = 'my cool video ' + str(random.randint(1000,5000))
test_video_description = 'description ' + str(random.randint(1000,5000))
my_media_group = gdata.media.Group(
title = gdata.media.Title(text=test_video_title),
description = gdata.media.Description(description_type='plain',
text=test_video_description),
keywords = gdata.media.Keywords(text='video, foo'),
category = gdata.media.Category(
text='Autos',
scheme='http://gdata.youtube.com/schemas/2007/categories.cat',
label='Autos'),
player=None
)
self.assert_(isinstance(my_media_group, gdata.media.Group))
video_entry = gdata.youtube.YouTubeVideoEntry(media=my_media_group)
self.assert_(isinstance(video_entry, gdata.youtube.YouTubeVideoEntry))
response = self.client.GetFormUploadToken(video_entry)
self.assert_(response[0].startswith(
'https://uploads.gdata.youtube.com/action/FormDataUpload/'))
self.assert_(len(response[0]) > 55)
self.assert_(len(response[1]) > 100)
def testRetrieveRelatedVideoFeedByUri(self):
feed = self.client.GetYouTubeRelatedVideoFeed(
'https://gdata.youtube.com/feeds/videos/Ncakifd_16k/related')
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 0)
def testRetrieveRelatedVideoFeedById(self):
feed = self.client.GetYouTubeRelatedVideoFeed(video_id = 'Ncakifd_16k')
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 0)
def testRetrieveResponseVideoFeedByUri(self):
feed = self.client.GetYouTubeVideoResponseFeed(
'https://gdata.youtube.com/feeds/videos/Ncakifd_16k/responses')
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoResponseFeed))
self.assert_(len(feed.entry) > 0)
def testRetrieveResponseVideoFeedById(self):
feed = self.client.GetYouTubeVideoResponseFeed(video_id='Ncakifd_16k')
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoResponseFeed))
self.assert_(len(feed.entry) > 0)
def testRetrieveVideoCommentFeedByUri(self):
feed = self.client.GetYouTubeVideoCommentFeed(
'https://gdata.youtube.com/feeds/api/videos/Ncakifd_16k/comments')
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoCommentFeed))
self.assert_(len(feed.entry) > 0)
def testRetrieveVideoCommentFeedByVideoId(self):
feed = self.client.GetYouTubeVideoCommentFeed(video_id='Ncakifd_16k')
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoCommentFeed))
self.assert_(len(feed.entry) > 0)
def testAddComment(self):
video_id = '9g6buYJTt_g'
video_entry = self.client.GetYouTubeVideoEntry(video_id=video_id)
random_comment_text = 'test_comment_' + str(random.randint(1000,50000))
self.client.AddComment(comment_text=random_comment_text,
video_entry=video_entry)
comment_feed = self.client.GetYouTubeVideoCommentFeed(video_id=video_id)
comment_found = False
for item in comment_feed.entry:
if (item.content.text == random_comment_text):
comment_found = True
self.assertEquals(comment_found, True)
def testAddRating(self):
video_id_to_rate = 'Ncakifd_16k'
video_entry = self.client.GetYouTubeVideoEntry(video_id=video_id_to_rate)
response = self.client.AddRating(3, video_entry)
self.assert_(isinstance(response, gdata.GDataEntry))
def testRetrievePlaylistFeedByUri(self):
feed = self.client.GetYouTubePlaylistFeed(
'https://gdata.youtube.com/feeds/users/gdpython/playlists')
self.assert_(isinstance(feed, gdata.youtube.YouTubePlaylistFeed))
self.assert_(len(feed.entry) > 0)
def testRetrievePlaylistListFeedByUsername(self):
feed = self.client.GetYouTubePlaylistFeed(username='gdpython')
self.assert_(isinstance(feed, gdata.youtube.YouTubePlaylistFeed))
self.assert_(len(feed.entry) > 0)
def testRetrievePlaylistVideoFeed(self):
feed = self.client.GetYouTubePlaylistVideoFeed(
'https://gdata.youtube.com/feeds/api/playlists/BCB3BB96DF51B505')
self.assert_(isinstance(feed, gdata.youtube.YouTubePlaylistVideoFeed))
self.assert_(len(feed.entry) > 0)
self.assert_(isinstance(feed.entry[0],
gdata.youtube.YouTubePlaylistVideoEntry))
def testAddUpdateAndDeletePlaylist(self):
test_playlist_title = 'my test playlist ' + str(random.randint(1000,3000))
test_playlist_description = 'test playlist '
response = self.client.AddPlaylist(test_playlist_title,
test_playlist_description)
self.assert_(isinstance(response, gdata.youtube.YouTubePlaylistEntry))
new_playlist_title = 'my updated playlist ' + str(random.randint(1000,4000))
new_playlist_description = 'my updated playlist '
playlist_entry_id = response.id.text.split('/')[-1]
updated_playlist = self.client.UpdatePlaylist(playlist_entry_id,
new_playlist_title,
new_playlist_description)
playlist_feed = self.client.GetYouTubePlaylistFeed()
update_successful = False
for playlist_entry in playlist_feed.entry:
if playlist_entry.title.text == new_playlist_title:
update_successful = True
break
self.assertEquals(update_successful, True)
# wait
time.sleep(10)
# delete it
playlist_uri = updated_playlist.id.text
response = self.client.DeletePlaylist(playlist_uri)
self.assertEquals(response, True)
def testAddUpdateAndDeletePrivatePlaylist(self):
test_playlist_title = 'my test playlist ' + str(random.randint(1000,3000))
test_playlist_description = 'test playlist '
response = self.client.AddPlaylist(test_playlist_title,
test_playlist_description,
playlist_private=True)
self.assert_(isinstance(response, gdata.youtube.YouTubePlaylistEntry))
new_playlist_title = 'my updated playlist ' + str(random.randint(1000,4000))
new_playlist_description = 'my updated playlist '
playlist_entry_id = response.id.text.split('/')[-1]
updated_playlist = self.client.UpdatePlaylist(playlist_entry_id,
new_playlist_title,
new_playlist_description,
playlist_private=True)
playlist_feed = self.client.GetYouTubePlaylistFeed()
update_successful = False
playlist_still_private = False
for playlist_entry in playlist_feed.entry:
if playlist_entry.title.text == new_playlist_title:
update_successful = True
if playlist_entry.private is not None:
playlist_still_private = True
self.assertEquals(update_successful, True)
self.assertEquals(playlist_still_private, True)
# wait
time.sleep(10)
# delete it
playlist_uri = updated_playlist.id.text
response = self.client.DeletePlaylist(playlist_uri)
self.assertEquals(response, True)
def testAddEditAndDeleteVideoFromPlaylist(self):
test_playlist_title = 'my test playlist ' + str(random.randint(1000,3000))
test_playlist_description = 'test playlist '
response = self.client.AddPlaylist(test_playlist_title,
test_playlist_description)
self.assert_(isinstance(response, gdata.youtube.YouTubePlaylistEntry))
custom_video_title = 'my test video on my test playlist'
custom_video_description = 'this is a test video on my test playlist'
video_id = 'Ncakifd_16k'
playlist_uri = response.feed_link[0].href
time.sleep(10)
response = self.client.AddPlaylistVideoEntryToPlaylist(
playlist_uri, video_id, custom_video_title, custom_video_description)
self.assert_(isinstance(response, gdata.youtube.YouTubePlaylistVideoEntry))
playlist_entry_id = response.id.text.split('/')[-1]
playlist_uri = response.id.text.split(playlist_entry_id)[0][:-1]
new_video_title = 'video number ' + str(random.randint(1000,3000))
new_video_description = 'test video'
time.sleep(10)
response = self.client.UpdatePlaylistVideoEntryMetaData(
playlist_uri,
playlist_entry_id,
new_video_title,
new_video_description,
1)
self.assert_(isinstance(response, gdata.youtube.YouTubePlaylistVideoEntry))
time.sleep(10)
playlist_entry_id = response.id.text.split('/')[-1]
# remove video from playlist
response = self.client.DeletePlaylistVideoEntry(playlist_uri,
playlist_entry_id)
self.assertEquals(response, True)
time.sleep(10)
# delete the playlist
response = self.client.DeletePlaylist(playlist_uri)
self.assertEquals(response, True)
def testRetrieveSubscriptionFeedByUri(self):
feed = self.client.GetYouTubeSubscriptionFeed(
'https://gdata.youtube.com/feeds/users/gdpython/subscriptions')
self.assert_(isinstance(feed, gdata.youtube.YouTubeSubscriptionFeed))
self.assert_(len(feed.entry) == 3)
subscription_to_channel_found = False
subscription_to_favorites_found = False
subscription_to_query_found = False
all_types_found = False
for entry in feed.entry:
self.assert_(isinstance(entry, gdata.youtube.YouTubeSubscriptionEntry))
subscription_type = entry.GetSubscriptionType()
if subscription_type == 'channel':
subscription_to_channel_found = True
elif subscription_type == 'favorites':
subscription_to_favorites_found = True
elif subscription_type == 'query':
subscription_to_query_found = True
if (subscription_to_channel_found and subscription_to_favorites_found and
subscription_to_query_found):
all_types_found = True
self.assertEquals(all_types_found, True)
def testRetrieveSubscriptionFeedByUsername(self):
feed = self.client.GetYouTubeSubscriptionFeed(username='gdpython')
self.assert_(isinstance(feed, gdata.youtube.YouTubeSubscriptionFeed))
self.assert_(len(feed.entry) == 3)
subscription_to_channel_found = False
subscription_to_favorites_found = False
subscription_to_query_found = False
all_types_found = False
for entry in feed.entry:
self.assert_(isinstance(entry, gdata.youtube.YouTubeSubscriptionEntry))
subscription_type = entry.GetSubscriptionType()
if subscription_type == 'channel':
subscription_to_channel_found = True
elif subscription_type == 'favorites':
subscription_to_favorites_found = True
elif subscription_type == 'query':
subscription_to_query_found = True
if (subscription_to_channel_found and subscription_to_favorites_found and
subscription_to_query_found):
all_types_found = True
self.assertEquals(all_types_found, True)
def testRetrieveUserProfileByUri(self):
user = self.client.GetYouTubeUserEntry(
'https://gdata.youtube.com/feeds/users/gdpython')
self.assert_(isinstance(user, gdata.youtube.YouTubeUserEntry))
self.assertEquals(user.location.text, 'US')
def testRetrieveUserProfileByUsername(self):
user = self.client.GetYouTubeUserEntry(username='gdpython')
self.assert_(isinstance(user, gdata.youtube.YouTubeUserEntry))
self.assertEquals(user.location.text, 'US')
def testRetrieveUserFavoritesFeed(self):
feed = self.client.GetUserFavoritesFeed(username='gdpython')
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 0)
def testRetrieveDefaultUserFavoritesFeed(self):
feed = self.client.GetUserFavoritesFeed()
self.assert_(isinstance(feed, gdata.youtube.YouTubeVideoFeed))
self.assert_(len(feed.entry) > 0)
def testAddAndDeleteVideoFromFavorites(self):
video_id = 'Ncakifd_16k'
video_entry = self.client.GetYouTubeVideoEntry(video_id=video_id)
response = self.client.AddVideoEntryToFavorites(video_entry)
self.assert_(isinstance(response, gdata.GDataEntry))
time.sleep(10)
response = self.client.DeleteVideoEntryFromFavorites(video_id)
self.assertEquals(response, True)
def testRetrieveContactFeedByUri(self):
feed = self.client.GetYouTubeContactFeed(
'https://gdata.youtube.com/feeds/users/gdpython/contacts')
self.assert_(isinstance(feed, gdata.youtube.YouTubeContactFeed))
self.assertEquals(len(feed.entry), 1)
def testRetrieveContactFeedByUsername(self):
feed = self.client.GetYouTubeContactFeed(username='gdpython')
self.assert_(isinstance(feed, gdata.youtube.YouTubeContactFeed))
self.assertEquals(len(feed.entry), 1)
if __name__ == '__main__':
print ('NOTE: Please run these tests only with a test account. '
'The tests may delete or update your data.')
username = raw_input('Please enter your username: ')
password = getpass.getpass()
developer_key = raw_input('Please enter your developer key: ')
video_file_location = raw_input(
'Please enter the absolute path to a video file: ')
unittest.main()