#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
| This file is part of the web2py Web Framework
| Copyrighted by Massimo Di Pierro <mdipierro@cs.depaul.edu>
| License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html)
File operations
---------------
"""
import storage
import os
import re
import tarfile
import glob
import time
import datetime
import logging
from http import HTTP
from gzip import open as gzopen
from recfile import generate
__all__ = [
'parse_version',
'read_file',
'write_file',
'readlines_file',
'up',
'abspath',
'mktree',
'listdir',
'recursive_unlink',
'cleanpath',
'tar',
'untar',
'tar_compiled',
'get_session',
'check_credentials',
'w2p_pack',
'w2p_unpack',
'w2p_pack_plugin',
'w2p_unpack_plugin',
'fix_newlines',
'make_fake_file_like_object',
]
def parse_semantic(version="Version 1.99.0-rc.1+timestamp.2011.09.19.08.23.26"):
"""Parses a version string according to http://semver.org/ rules
Args:
version(str): the SemVer string
Returns:
tuple: Major, Minor, Patch, Release, Build Date
"""
re_version = re.compile('(\d+)\.(\d+)\.(\d+)(\-(?P<pre>[^\s+]*))?(\+(?P<build>\S*))')
m = re_version.match(version.strip().split()[-1])
if not m:
return None
a, b, c = int(m.group(1)), int(m.group(2)), int(m.group(3))
pre_release = m.group('pre') or ''
build = m.group('build') or ''
if build.startswith('timestamp'):
build = datetime.datetime.strptime(build.split('.', 1)[1], '%Y.%m.%d.%H.%M.%S')
return (a, b, c, pre_release, build)
def parse_legacy(version="Version 1.99.0 (2011-09-19 08:23:26)"):
"""Parses "legacy" version string
Args:
version(str): the version string
Returns:
tuple: Major, Minor, Patch, Release, Build Date
"""
re_version = re.compile('[^\d]+ (\d+)\.(\d+)\.(\d+)\s*\((?P<datetime>.+?)\)\s*(?P<type>[a-z]+)?')
m = re_version.match(version)
a, b, c = int(m.group(1)), int(m.group(2)), int(m.group(3)),
pre_release = m.group('type') or 'dev'
build = datetime.datetime.strptime(m.group('datetime'), '%Y-%m-%d %H:%M:%S')
return (a, b, c, pre_release, build)
[docs]def parse_version(version):
"""Attempts to parse SemVer, fallbacks on legacy
"""
version_tuple = parse_semantic(version)
if not version_tuple:
version_tuple = parse_legacy(version)
return version_tuple
[docs]def read_file(filename, mode='r'):
"""Returns content from filename, making sure to close the file explicitly
on exit.
"""
f = open(filename, mode)
try:
return f.read()
finally:
f.close()
[docs]def write_file(filename, value, mode='w'):
"""Writes <value> to filename, making sure to close the file
explicitly on exit.
"""
f = open(filename, mode)
try:
return f.write(value)
finally:
f.close()
[docs]def readlines_file(filename, mode='r'):
"""Applies .split('\n') to the output of `read_file()`
"""
return read_file(filename, mode).split('\n')
[docs]def mktree(path):
head, tail = os.path.split(path)
if head:
if tail:
mktree(head)
if not os.path.exists(head):
os.mkdir(head)
[docs]def listdir(path,
expression='^.+$',
drop=True,
add_dirs=False,
sort=True,
maxnum=None,
exclude_content_from=None
):
"""
Like `os.listdir()` but you can specify a regex pattern to filter files.
If `add_dirs` is True, the returned items will have the full path.
"""
if exclude_content_from is None:
exclude_content_from = []
if path[-1:] != os.path.sep:
path = path + os.path.sep
if drop:
n = len(path)
else:
n = 0
regex = re.compile(expression)
items = []
for (root, dirs, files) in os.walk(path, topdown=True):
for dir in dirs[:]:
if dir.startswith('.'):
dirs.remove(dir)
if add_dirs:
items.append(root[n:])
for file in sorted(files):
if regex.match(file) and not file.startswith('.'):
if root not in exclude_content_from:
items.append(os.path.join(root, file)[n:])
if maxnum and len(items) >= maxnum:
break
if sort:
return sorted(items)
else:
return items
[docs]def recursive_unlink(f):
"""Deletes `f`. If it's a folder, also its contents will be deleted
"""
if os.path.isdir(f):
for s in os.listdir(f):
recursive_unlink(os.path.join(f, s))
os.rmdir(f)
elif os.path.isfile(f):
os.unlink(f)
[docs]def cleanpath(path):
"""Turns any expression/path into a valid filename. replaces / with _ and
removes special characters.
"""
items = path.split('.')
if len(items) > 1:
path = re.sub('[^\w\.]+', '_', '_'.join(items[:-1]) + '.'
+ ''.join(items[-1:]))
else:
path = re.sub('[^\w\.]+', '_', ''.join(items[-1:]))
return path
def _extractall(filename, path='.', members=None):
tar = tarfile.TarFile(filename, 'r')
ret = tar.extractall(path, members)
tar.close()
return ret
[docs]def tar(file, dir, expression='^.+$',
filenames=None, exclude_content_from=None):
"""Tars dir into file, only tars file that match expression
"""
tar = tarfile.TarFile(file, 'w')
try:
if filenames is None:
filenames = listdir(dir, expression, add_dirs=True,
exclude_content_from=exclude_content_from)
for file in filenames:
tar.add(os.path.join(dir, file), file, False)
finally:
tar.close()
[docs]def untar(file, dir):
"""Untar file into dir
"""
_extractall(file, dir)
[docs]def w2p_pack(filename, path, compiled=False, filenames=None):
"""Packs a web2py application.
Args:
filename(str): path to the resulting archive
path(str): path to the application
compiled(bool): if `True` packs the compiled version
filenames(list): adds filenames to the archive
"""
filename = abspath(filename)
path = abspath(path)
tarname = filename + '.tar'
if compiled:
tar_compiled(tarname, path, '^[\w\.\-]+$',
exclude_content_from=['cache', 'sessions', 'errors'])
else:
tar(tarname, path, '^[\w\.\-]+$', filenames=filenames,
exclude_content_from=['cache', 'sessions', 'errors'])
w2pfp = gzopen(filename, 'wb')
tarfp = open(tarname, 'rb')
w2pfp.write(tarfp.read())
w2pfp.close()
tarfp.close()
os.unlink(tarname)
def create_welcome_w2p():
if not os.path.exists('welcome.w2p') or os.path.exists('NEWINSTALL'):
try:
w2p_pack('welcome.w2p', 'applications/welcome')
os.unlink('NEWINSTALL')
logging.info("New installation: created welcome.w2p file")
except:
logging.error("New installation error: unable to create welcome.w2p file")
[docs]def w2p_unpack(filename, path, delete_tar=True):
if filename == 'welcome.w2p':
create_welcome_w2p()
filename = abspath(filename)
path = abspath(path)
if filename[-4:] == '.w2p' or filename[-3:] == '.gz':
if filename[-4:] == '.w2p':
tarname = filename[:-4] + '.tar'
else:
tarname = filename[:-3] + '.tar'
fgzipped = gzopen(filename, 'rb')
tarfile = open(tarname, 'wb')
tarfile.write(fgzipped.read())
tarfile.close()
fgzipped.close()
else:
tarname = filename
untar(tarname, path)
if delete_tar:
os.unlink(tarname)
[docs]def w2p_pack_plugin(filename, path, plugin_name):
"""Packs the given plugin into a w2p file.
Will match files at::
<path>/*/plugin_[name].*
<path>/*/plugin_[name]/*
"""
filename = abspath(filename)
path = abspath(path)
if not filename.endswith('web2py.plugin.%s.w2p' % plugin_name):
raise Exception("Not a web2py plugin name")
plugin_tarball = tarfile.open(filename, 'w:gz')
try:
app_dir = path
while app_dir[-1] == '/':
app_dir = app_dir[:-1]
files1 = glob.glob(
os.path.join(app_dir, '*/plugin_%s.*' % plugin_name))
files2 = glob.glob(
os.path.join(app_dir, '*/plugin_%s/*' % plugin_name))
for file in files1 + files2:
plugin_tarball.add(file, arcname=file[len(app_dir) + 1:])
finally:
plugin_tarball.close()
[docs]def w2p_unpack_plugin(filename, path, delete_tar=True):
filename = abspath(filename)
path = abspath(path)
if not os.path.basename(filename).startswith('web2py.plugin.'):
raise Exception("Not a web2py plugin")
w2p_unpack(filename, path, delete_tar)
[docs]def tar_compiled(file, dir, expression='^.+$',
exclude_content_from=None):
"""Used to tar a compiled application.
The content of models, views, controllers is not stored in the tar file.
"""
tar = tarfile.TarFile(file, 'w')
for file in listdir(dir, expression, add_dirs=True,
exclude_content_from=exclude_content_from):
filename = os.path.join(dir, file)
if os.path.islink(filename):
continue
if os.path.isfile(filename) and file[-4:] != '.pyc':
if file[:6] == 'models':
continue
if file[:5] == 'views':
continue
if file[:11] == 'controllers':
continue
if file[:7] == 'modules':
continue
tar.add(filename, file, False)
tar.close()
[docs]def up(path):
return os.path.dirname(os.path.normpath(path))
[docs]def get_session(request, other_application='admin'):
"""Checks that user is authorized to access other_application"""
if request.application == other_application:
raise KeyError
try:
session_id = request.cookies['session_id_' + other_application].value
session_filename = os.path.join(
up(request.folder), other_application, 'sessions', session_id)
if not os.path.exists(session_filename):
session_filename = generate(session_filename)
osession = storage.load_storage(session_filename)
except Exception, e:
osession = storage.Storage()
return osession
def set_session(request, session, other_application='admin'):
"""Checks that user is authorized to access other_application"""
if request.application == other_application:
raise KeyError
session_id = request.cookies['session_id_' + other_application].value
session_filename = os.path.join(
up(request.folder), other_application, 'sessions', session_id)
storage.save_storage(session, session_filename)
[docs]def check_credentials(request, other_application='admin',
expiration=60 * 60, gae_login=True):
"""Checks that user is authorized to access other_application"""
if request.env.web2py_runtime_gae:
from google.appengine.api import users
if users.is_current_user_admin():
return True
elif gae_login:
login_html = '<a href="%s">Sign in with your google account</a>.' \
% users.create_login_url(request.env.path_info)
raise HTTP(200, '<html><body>%s</body></html>' % login_html)
else:
return False
else:
t0 = time.time()
dt = t0 - expiration
s = get_session(request, other_application)
r = (s.authorized and s.last_time and s.last_time > dt)
if r:
s.last_time = t0
set_session(request, s, other_application)
return r
[docs]def fix_newlines(path):
regex = re.compile(r'''(\r
|\r|
)''')
for filename in listdir(path, '.*\.(py|html)$', drop=False):
rdata = read_file(filename, 'rb')
wdata = regex.sub('\n', rdata)
if wdata != rdata:
write_file(filename, wdata, 'wb')
def copystream(
src,
dest,
size,
chunk_size=10 ** 5,
):
"""
this is here because I think there is a bug in shutil.copyfileobj
"""
while size > 0:
if size < chunk_size:
data = src.read(size)
else:
data = src.read(chunk_size)
length = len(data)
if length > size:
(data, length) = (data[:size], size)
size -= length
if length == 0:
break
dest.write(data)
if length < chunk_size:
break
dest.seek(0)
return
[docs]def make_fake_file_like_object():
class LogFile(object):
def write(self, value):
pass
def close(self):
pass
return LogFile()
from settings import global_settings # we need to import settings here because
# settings imports fileutils too
[docs]def abspath(*relpath, **base):
"""Converts relative path to absolute path based (by default) on
applications_parent
"""
path = os.path.join(*relpath)
gluon = base.get('gluon', False)
if os.path.isabs(path):
return path
if gluon:
return os.path.join(global_settings.gluon_parent, path)
return os.path.join(global_settings.applications_parent, path)