Package couchdbkit :: Module utils
[hide private]
[frames] | no frames]

Source Code for Module couchdbkit.utils

  1  # -*- coding: utf-8 - 
  2  # 
  3  # This file is part of couchdbkit released under the MIT license.  
  4  # See the NOTICE for more information. 
  5   
  6   
  7  """ 
  8  Mostly utility functions couchdbkit uses internally that don't 
  9  really belong anywhere else in the modules. 
 10  """ 
 11  from __future__ import with_statement 
 12   
 13  import codecs 
 14  import string 
 15  from hashlib import md5 
 16  import os 
 17  import re 
 18  import sys 
 19  import urllib 
 20   
 21   
 22  try: 
 23      import simplejson as json 
 24  except ImportError: 
 25      try: 
 26          import json  
 27      except ImportError: 
 28          raise ImportError("""simplejson isn't installed 
 29   
 30  Install it with the command: 
 31   
 32      pip install simplejson 
 33  """) 
 34    
 35   
 36  # backport relpath from python2.6 
 37  if not hasattr(os.path, 'relpath'): 
 38      if os.name == "nt": 
39 - def splitunc(p):
40 if p[1:2] == ':': 41 return '', p # Drive letter present 42 firstTwo = p[0:2] 43 if firstTwo == '//' or firstTwo == '\\\\': 44 # is a UNC path: 45 # vvvvvvvvvvvvvvvvvvvv equivalent to drive letter 46 # \\machine\mountpoint\directories... 47 # directory ^^^^^^^^^^^^^^^ 48 normp = os.path.normcase(p) 49 index = normp.find('\\', 2) 50 if index == -1: 51 ##raise RuntimeError, 'illegal UNC path: "' + p + '"' 52 return ("", p) 53 index = normp.find('\\', index + 1) 54 if index == -1: 55 index = len(p) 56 return p[:index], p[index:] 57 return '', p
58
59 - def relpath(path, start=os.path.curdir):
60 """Return a relative version of a path""" 61 62 if not path: 63 raise ValueError("no path specified") 64 start_list = os.path.abspath(start).split(os.path.sep) 65 path_list = os.path.abspath(path).split(os.path.sep) 66 if start_list[0].lower() != path_list[0].lower(): 67 unc_path, rest = splitunc(path) 68 unc_start, rest = splitunc(start) 69 if bool(unc_path) ^ bool(unc_start): 70 raise ValueError("Cannot mix UNC and non-UNC paths (%s and %s)" 71 % (path, start)) 72 else: 73 raise ValueError("path is on drive %s, start on drive %s" 74 % (path_list[0], start_list[0])) 75 # Work out how much of the filepath is shared by start and path. 76 for i in range(min(len(start_list), len(path_list))): 77 if start_list[i].lower() != path_list[i].lower(): 78 break 79 else: 80 i += 1 81 82 rel_list = [os.path.pardir] * (len(start_list)-i) + path_list[i:] 83 if not rel_list: 84 return os.path.curdir 85 return os.path.join(*rel_list)
86 else:
87 - def relpath(path, start=os.path.curdir):
88 """Return a relative version of a path""" 89 90 if not path: 91 raise ValueError("no path specified") 92 93 start_list = os.path.abspath(start).split(os.path.sep) 94 path_list = os.path.abspath(path).split(os.path.sep) 95 96 # Work out how much of the filepath is shared by start and path. 97 i = len(os.path.commonprefix([start_list, path_list])) 98 99 rel_list = [os.path.pardir] * (len(start_list)-i) + path_list[i:] 100 if not rel_list: 101 return os.path.curdir 102 return os.path.join(*rel_list)
103 else: 104 relpath = os.path.relpath 105
106 -def split_path(path):
107 parts = [] 108 while True: 109 head, tail = os.path.split(path) 110 parts = [tail] + parts 111 path = head 112 if not path: break 113 return parts
114 115 VALID_DB_NAME = re.compile(r'^[a-z][a-z0-9_$()+-/]*$') 116 SPECIAL_DBS = ("_users", "_replicator",)
117 -def validate_dbname(name):
118 """ validate dbname """ 119 if name in SPECIAL_DBS: 120 return True 121 elif not VALID_DB_NAME.match(urllib.unquote(name)): 122 raise ValueError("Invalid db name: '%s'" % name) 123 return True
124
125 -def to_bytestring(s):
126 """ convert to bytestring an unicode """ 127 if not isinstance(s, basestring): 128 return s 129 if isinstance(s, unicode): 130 return s.encode('utf-8') 131 else: 132 return s
133
134 -def read_file(fname, utf8=True, force_read=False):
135 """ read file content""" 136 if utf8: 137 try: 138 with codecs.open(fname, 'rb', "utf-8") as f: 139 data = f.read() 140 return data 141 except UnicodeError: 142 if force_read: 143 return read_file(fname, utf8=False) 144 raise 145 else: 146 with open(fname, 'rb') as f: 147 data = f.read() 148 return data
149
150 -def sign_file(file_path):
151 """ return md5 hash from file content 152 153 :attr file_path: string, path of file 154 155 :return: string, md5 hexdigest 156 """ 157 if os.path.isfile(file_path): 158 content = read_file(file_path, force_read=True) 159 return md5(to_bytestring(content)).hexdigest() 160 return ''
161
162 -def write_content(fname, content):
163 """ write content in a file 164 165 :attr fname: string,filename 166 :attr content: string 167 """ 168 f = open(fname, 'wb') 169 f.write(to_bytestring(content)) 170 f.close()
171
172 -def write_json(filename, content):
173 """ serialize content in json and save it 174 175 :attr filename: string 176 :attr content: string 177 178 """ 179 write_content(filename, json.dumps(content))
180
181 -def read_json(filename, use_environment=False):
182 """ read a json file and deserialize 183 184 :attr filename: string 185 :attr use_environment: boolean, default is False. If 186 True, replace environment variable by their value in file 187 content 188 189 :return: dict or list 190 """ 191 try: 192 data = read_file(filename, force_read=True) 193 except IOError, e: 194 if e[0] == 2: 195 return {} 196 raise 197 198 if use_environment: 199 data = string.Template(data).substitute(os.environ) 200 201 try: 202 data = json.loads(data) 203 except ValueError: 204 print >>sys.stderr, "Json is invalid, can't load %s" % filename 205 raise 206 return data
207