The thing that triggered this was discovering that Project Place, the document/project repository we use has an API. I thought I'd do a little digging. I thought it would be cool not just to produce a report from the files, but to fetch the files automatically first.
I should state that the people at Project Place were great, and didn't give me any grief for being a total n00b at this kind of thing.
Initially, using a browser based client to run examples, it all looked pretty simple…
I decided to stick with Python rather than add another layer of unnecessary learning. The first hurdle was OAUTH. I gave up for a few days, but it kept bugging me, so I read more, understood more and managed to get through the Oauth process. The following code is pretty much the same as examples on the available online. There's a slight tweak to disable SSL certificate validation. It's probably not the right thing to do, and you may not need it (I didn't on OS-X on ADSL, but it worked around a problem on Windows on a corporate network through a firewall). The other differences are that oath properties are read from and written to a Java style properties file.
import urlparse import local_oauth2 as oauth import json from Properties import Properties consumer_properties = Properties('consumer.properties') request_token_url = 'https://api.projectplace.com/initiate' access_token_url = 'https://api.projectplace.com/token' authorize_url = 'https://api.projectplace.com/authorize' consumer = oauth.Consumer(consumer_properties.property('Key'), consumer_properties.property('Secret')) client = oauth.Client(consumer=consumer, disable_ssl_certificate_validation=True) # Step 1: Get a request token. This is a temporary token that is used for # having the user authorize an access token and to sign the request to obtain # said access token. resp, content = client.request(request_token_url, "GET") #print 'Response:',resp #print 'Content:',content if resp['status'] != '200': raise Exception("Invalid response %s." % resp['status']) request_token = dict(urlparse.parse_qsl(content)) #print "Request Token:" #print " - oauth_token = %s" % request_token['oauth_token'] #print " - oauth_token_secret = %s" % request_token['oauth_token_secret'] #print # Step 2: Redirect to the provider. Since this is a CLI script we do not # redirect. In a web application you would redirect the user to the URL # below. print "Go to the following link in your browser:" print "%s?oauth_token=%s" % (authorize_url, request_token['oauth_token']) print # After the user has granted access to you, the consumer, the provider will # redirect you to whatever URL you have told them to redirect to. You can # usually define this in the oauth_callback argument as well. accepted = 'n' while accepted.lower() != 'y': accepted = raw_input('Have you authorized me? (y/n) ') oauth_verifier_url = raw_input('What is the URL that you were redirected to? ') oauth_verifier = oauth_verifier_url.split("&oauth_verifier=")[-1] #print "oauth_verifier:%s"%oauth_verifier # Step 3: Once the consumer has redirected the user back to the oauth_callback # URL you can request the access token the user has approved. You use the # request token to sign this request. After this is done you throw away the # request token and use the access token returned. You should store this # access token somewhere safe, like a database, for future use. token = oauth.Token(request_token['oauth_token'], request_token['oauth_token_secret']) token.set_verifier(oauth_verifier) client = oauth.Client(consumer = consumer, token = token, disable_ssl_certificate_validation=True) resp, content = client.request(access_token_url, "POST") access_token = dict(urlparse.parse_qsl(content)) resp, content = client.request(request_token_url, "GET") #print 'Response:',resp #print 'Content:',content #print 'Access Token:',access_token #print "Access Token:" #print " - oauth_token = %s" % access_token['oauth_token'] #print " - oauth_token_secret = %s" % access_token['oauth_token_secret'] #print #print "You may now access protected resources using user_access.properties." #print user_access_properties=file('user_access.properties','w') user_access_properties.write('Key : %s\n'%access_token['oauth_token']) user_access_properties.write('Secret : %s'%access_token['oauth_token_secret']) user_access_properties.close()
local_oauth2 is pretty much the same as python-oauth2, but with the addition of the disable_ssl_certificate_validation parameter...
The Project Place class, which is still a work in progress, and certainly not fully tested, looks like this:
I've not had any luck with POST, mainly due to not having in-depth understanding of http and the oath package. Hints would be most welcome!
def __init__(self, consumer, token=None, cache=None, timeout=None, proxy_info=None, disable_ssl_certificate_validation=False): if consumer is not None and not isinstance(consumer, Consumer): raise ValueError("Invalid consumer.") if token is not None and not isinstance(token, Token): raise ValueError("Invalid token.") self.consumer = consumer self.token = token self.method = SignatureMethod_HMAC_SHA1() httplib2.Http.__init__(self, cache=cache, timeout=timeout, proxy_info=proxy_info, disable_ssl_certificate_validation=disable_ssl_certificate_validation)
The Project Place class, which is still a work in progress, and certainly not fully tested, looks like this:
#!/usr/bin/env python # encoding: utf-8 """ TalkToProjectPlace.py Created by Hywel Thomas on 2012-07-19. Copyright (c) 2012 Jupiterlink Limited. All rights reserved. """ #import sys #import os import local_oauth2 as oauth import json class BadValue(Exception): def __init__(self, message): self.message = message def __str__(self): return self.message class BadParameterCombination(Exception): def __init__(self, message): self.message = message def __str__(self): return self.message class ContainerNotFound(Exception): def __init__(self, message): self.message = message def __str__(self): return self.message class NotImplemented(Exception): def __init__(self, message): self.message = message def __str__(self): return self.message class ProjectPlace(object): def __init__(self, consumer_key, consumer_secret, access_token_key, access_token_secret, format='json', proxy_info = None, disable_ssl_certificate_validation = False): self.consumer_key = consumer_key self.consumer_secret = consumer_secret self.access_token_key = access_token_key self.access_token_secret = access_token_secret self.baseURL = "https://api.projectplace.com/1/" self.format=format consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret) access_token = oauth.Token(key=access_token_key, secret=access_token_secret) self.client = oauth.Client(consumer = consumer, token = access_token, proxy_info = proxy_info, disable_ssl_certificate_validation = disable_ssl_certificate_validation) def request(self, request, **parameters): full_request = u'%s%s.%s'%(self.baseURL,request,self.format) if len(parameters)>0: full_request="%s&%s"%(full_request,"&".join(['%s=%s'%(parameter,parameters[parameter]) for parameter in parameters])) resp, content = self.client.request(uri=full_request, method='GET') if resp['status']=='200': return content else: print '\n\n' print content return resp def request_binary(self,request): fullRequest = u'%s%s'%(self.baseURL,request) resp, content = self.client.request(fullRequest) if resp['status']=='200': return content else: print '\n\n' return resp # USER methods def user_profile(self,user_id='me'): return self.request('user/%s/profile'%user_id) def user_coworkers(self,user_id='me'): return self.request('user/%s/coworkers'%user_id) def user_favorite_coworkers(self,user_id='me'): return self.request('user/%s/favorite-coworkers'%user_id) def user_favourite_coworkers(self,user_id='me'): return self.user_favorite_coworkers(user_id) def user_projects(self,user_id='me'): return self.request('user/%s/projects'%user_id) def user_favorite_projects(self,user_id='me'): return self.request('user/%s/favorite-projects'%user_id) def user_favourite_projects(self,user_id='me'): return self.user_favorite_projects(user_id) def user_recent_documents(self,user_id='me'): return self.request('user/%s/recent-documents'%user_id) def user_avatar(self, user_id): # TODO - think that this request should not go through # the authenticated service. Get a 401 error. return self.request_binary('avatar/%s/%s'%(user_id,self.access_token_key)) def user_assignments(self,user_id='me'): return self.request('user/%s/assignments'%user_id) def latest_user_conversations(self,user_id='me',complete=None,count=None): request = 'user/%s/conversations'%user_id parameters={} if complete: parameters['complete'] = complete if count: parameters['count'] = count return self.request(request, **parameters) def user_document_feed(self,user_id='me'): return self.request('user/%s/document-feed'%user_id) def project_members_list(self,project_id): # Currently getting a 403 return self.request('project/%s/members'%project_id) def document_comments(self,document_id): #todo: figure out how to POST return self.request('document/%s/comments'%document_id) def document_properties(self,document_id): #todo: figure out how to POST return self.request('document/%s/properties'%document_id) def document_touch(self,document_id): #todo: figure out how to POST return self.request('document/%s/touch'%document_id) def document_upload(self, document_container_id, document, **kwargs): document_container = document_container_contents() #todo: figure out how to POST raise NotImplemented('document_upload to Project Place has not yet been implemented') def document_download(self, document_id, destination_filename=None, version=None): request = 'document/%s'%document_id parameters={} if version: parameters['version'] = version document = self.request(request, **parameters) if destination_filename: destination_file = open(destination_filename, 'wb') destination_file.write(document) destination_file.close() return document def document_versions(self,document_id): return self.request('document/%s/versions'%document_id) def document_version_properties(self,document_id,version_id): #TODO POST return self.request('document/%s/versions/%s/properties'%(document_id,version_id)) def document_container_contents(self,document_container_id=None,project_id=None): if document_container_id: return self.request('document-container/%s/contents'%document_container_id) elif project_id: return self.request('project/%s/documents'%project_id) else: raise BadValue('document_container_contents requires a document-container-id or project-id parameter') def project_conversations(self, project_id, older_than=None, newer_than=None): request='project/%s/conversations'%project_id parameters={} if older_than and not newer_than: parameters['older_than'] = older_than if newer_than and not older_than: parameters['newer_than'] = newer_than return self.request(request,**parameters) def project_logotype(self, project_id): # TODO - think that this request should not go through # the authenticated service. Get a 401 error. return self.request_binary('project-logotype/%s/%s'%(project_id,self.access_token_key)) #--------- # Higher order methods def project_id(self, project_name, user_id='me'): projects=json.loads(self.user_projects(user_id)) for project in projects: if project['name']==project_name: return project['id'] def container_id_at_path(self, project_name, path, user_id='me'): #print 'in container_id_at_path' project_id=self.project_id(project_name = project_name, user_id = user_id) #print "Project-ID:",project_id containers=self.containers(json.loads(self.document_container_contents(project_id=project_id))) for target_container_name in path: next_container = None for container in containers: if container['name']==target_container_name: next_container=container['id'] if next_container: containers=self.containers(json.loads(self.document_container_contents(document_container_id=next_container))) else: raise ContainerNotFound('A container called "%s" was not found'%target_container_name) return next_container def containers(self,document_container_contents): return document_container_contents["containers"] def documents(self,document_container_contents): return document_container_contents['documents'] def container_structure(self, document_container_id=None, container_name=None, project_name=None, user_id="me"): """ Supply either a document_container_id if starting from a known container, or the project_name (and optionaly a user_id), if starting from the project root container. The method returns a nested list matching the container structure. This does not include files. Each node is a dictionary of 'name';'id' and 'containers'. """ """the 'containters' returned in document_containter_contents looks like: {u'container_count': 0, u'name': u'f2', u'versioned': True, u'color': u'yellow', u'id': 707070707, u'document_count': 0} we want to add the document_containter_contents here in a new field called 'contents'. This is done recursively to give us the whole nested structure. It's probably wise not to call this against a large structure (i.e. use container_id_at_path first) """ if document_container_id and project_name: raise BadParameterCombination('Only one of document_container_id and project_name should be supplied to the ProjectPlace.container_structure method, not both') elif not document_container_id and not project_name: raise BadParameterCombination('At least one of document_container_id and project_name should be supplied to the ProjectPlace.container_structure method.') elif project_name and not document_container_id: project_id=self.project_id(project_name = project_name, user_id = user_id) container_structure=json.loads(self.document_container_contents(project_id=project_id)) container_structure['name']=project_name elif container_name: # document_container_id and not project_name: container_structure=json.loads(self.document_container_contents(document_container_id=document_container_id)) container_structure['name']=container_name else: raise BadParameterCombination('A container_name must be supplied to the ProjectPlace.container_structure method when document_container_id is the parameter') for container in container_structure['containers']: container[u'contents']=self.container_structure(document_container_id=container[u'id'], container_name=container[u'name'], user_id=user_id) return container_structure
I've not had any luck with POST, mainly due to not having in-depth understanding of http and the oath package. Hints would be most welcome!
No comments:
Post a Comment