diff --git a/PKG-INFO b/PKG-INFO index 3514535..5e15564 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,12 +1,12 @@ Metadata-Version: 1.0 Name: python-gnupg -Version: 0.3.4 +Version: 0.3.5 Summary: A wrapper for the Gnu Privacy Guard (GPG or GnuPG) Home-page: http://packages.python.org/python-gnupg/index.html Author: Vinay Sajip Author-email: vinay_sajip@red-dove.com License: Copyright (C) 2008-2013 by Vinay Sajip. All Rights Reserved. See LICENSE for license. -Download-URL: http://python-gnupg.googlecode.com/files/python-gnupg-0.3.4.tar.gz +Download-URL: http://python-gnupg.googlecode.com/files/python-gnupg-0.3.5.tar.gz Description: This module allows easy access to GnuPG's key management, encryption and signature functionality from Python programs. It is intended for use with Python 2.4 or greater. Platform: No particular restrictions Classifier: Development Status :: 5 - Production/Stable diff --git a/gnupg.py b/gnupg.py index e5060af..b19f3cf 100644 --- a/gnupg.py +++ b/gnupg.py @@ -33,9 +33,9 @@ A unittest harness (test_gnupg.py) has also been added. """ import locale -__version__ = "0.3.4" +__version__ = "0.3.5" __author__ = "Vinay Sajip" -__date__ = "$05-Jun-2013 09:48:54$" +__date__ = "$30-Aug-2013 18:10:36$" try: from io import StringIO @@ -73,6 +73,47 @@ logger = logging.getLogger(__name__) if not logger.handlers: logger.addHandler(NullHandler()) +# We use the test below because it works for Jython as well as CPython +if os.path.__name__ == 'ntpath': + # On Windows, we don't need shell quoting, other than worrying about + # paths with spaces in them. + def shell_quote(s): + return '"%s"' % s +else: + # Section copied from sarge + + # This regex determines which shell input needs quoting + # because it may be unsafe + UNSAFE = re.compile(r'[^\w%+,./:=@-]') + + def shell_quote(s): + """ + Quote text so that it is safe for Posix command shells. + + For example, "*.py" would be converted to "'*.py'". If the text is + considered safe it is returned unquoted. + + :param s: The value to quote + :type s: str (or unicode on 2.x) + :return: A safe version of the input, from the point of view of Posix + command shells + :rtype: The passed-in type + """ + if not isinstance(s, string_types): + raise TypeError('Expected string type, got %s' % type(s)) + if not s: + result = "''" + elif len(s) >= 2 and (s[0], s[-1]) == ("'", "'"): + result = '"%s"' % s.replace('"', r'\"') + elif not UNSAFE.search(s): + result = s + else: + result = "'%s'" % s.replace("'", "'\"'\"'") + return result + + # end of sarge code + + def _copy_data(instream, outstream): # Copy one stream to another sent = 0 @@ -326,6 +367,13 @@ BASIC_ESCAPES = { r'\0': '\0', } +class SendResult(object): + def __init__(self, gpg): + self.gpg = gpg + + def handle_status(self, key, value): + logger.debug('SendResult: %s: %s', key, value) + class ListKeys(list): ''' Handle status messages for --list-keys. @@ -384,6 +432,40 @@ class ListKeys(list): def handle_status(self, key, value): pass +class SearchKeys(list): + ''' Handle status messages for --search-keys. + + Handle pub and uid (relating the latter to the former). + + Don't care about the rest + ''' + def __init__(self, gpg): + self.gpg = gpg + self.curkey = None + self.fingerprints = [] + self.uids = [] + + def pub(self, args): + vars = (""" + type keyid algo length date expires + """).split() + self.curkey = {} + for i in range(len(vars)): + self.curkey[vars[i]] = args[i] + self.curkey['uids'] = [] + self.append(self.curkey) + + def uid(self, args): + uid = args[1] + uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) + for k, v in BASIC_ESCAPES.items(): + uid = uid.replace(k, v) + self.curkey['uids'].append(uid) + self.uids.append(uid) + + def handle_status(self, key, value): + pass + class Crypt(Verify): "Handle status messages for --encrypt and --decrypt" def __init__(self, gpg): @@ -528,7 +610,9 @@ class GPG(object): 'delete': DeleteResult, 'generate': GenKey, 'import': ImportResult, + 'send': SendResult, 'list': ListKeys, + 'search': SearchKeys, 'sign': Sign, 'verify': Verify, } @@ -571,6 +655,10 @@ class GPG(object): self.encoding = locale.getpreferredencoding() if self.encoding is None: # This happens on Jython! self.encoding = sys.stdin.encoding + if self.encoding is None: + logger.warning('No encoding found via locale.getpreferredencoding ' + 'or sys.stdin.encoding, defaulting to utf-8.') + self.encoding = 'utf-8' if gnupghome and not os.path.isdir(self.gnupghome): os.makedirs(self.gnupghome,0x1C0) p = self._open_subprocess(["--version"]) @@ -594,14 +682,14 @@ class GPG(object): """ cmd = [self.gpgbinary, '--status-fd 2 --no-tty'] if self.gnupghome: - cmd.append('--homedir "%s"' % self.gnupghome) + cmd.append('--homedir %s' % shell_quote(self.gnupghome)) if self.keyring: cmd.append('--no-default-keyring') for fn in self.keyring: - cmd.append('--keyring "%s"' % fn) + cmd.append('--keyring %s' % shell_quote(fn)) if self.secret_keyring: for fn in self.secret_keyring: - cmd.append('--secret-keyring "%s"' % fn) + cmd.append('--secret-keyring %s' % shell_quote(fn)) if passphrase: cmd.append('--batch --passphrase-fd 0') if self.use_agent: @@ -735,7 +823,7 @@ class GPG(object): elif clearsign: args.append("--clearsign") if keyid: - args.append('--default-key "%s"' % keyid) + args.append('--default-key %s' % shell_quote(keyid)) result = self.result_map['sign'](self) #We could use _handle_io here except for the fact that if the #passphrase is bad, gpg bails and you can't write the message. @@ -787,8 +875,8 @@ class GPG(object): logger.debug('Wrote to temp file: %r', s) os.write(fd, s) os.close(fd) - args.append(fn) - args.append('"%s"' % data_filename) + args.append(shell_quote(fn)) + args.append(shell_quote(data_filename)) try: p = self._open_subprocess(args) self._collect_output(p, result, stdin=p.stdin) @@ -856,9 +944,6 @@ class GPG(object): def recv_keys(self, keyserver, *keyids): """Import a key from a keyserver - The doctest assertion is skipped in Jython because security permissions - may prevent the recv_keys from succeeding. - >>> import shutil >>> shutil.rmtree("keys") >>> gpg = GPG(gnupghome="keys") @@ -871,20 +956,39 @@ class GPG(object): logger.debug('recv_keys: %r', keyids) data = _make_binary_stream("", self.encoding) #data = "" - args = ['--keyserver', keyserver, '--recv-keys'] - args.extend(keyids) + args = ['--keyserver', shell_quote(keyserver), '--recv-keys'] + args.extend([shell_quote(k) for k in keyids]) self._handle_io(args, data, result, binary=True) logger.debug('recv_keys result: %r', result.__dict__) data.close() return result + def send_keys(self, keyserver, *keyids): + """Send a key to a keyserver. + + Note: it's not practical to test this function without sending + arbitrary data to live keyservers. + """ + result = self.result_map['send'](self) + logger.debug('send_keys: %r', keyids) + data = _make_binary_stream('', self.encoding) + #data = "" + args = ['--keyserver', shell_quote(keyserver), '--send-keys'] + args.extend([shell_quote(k) for k in keyids]) + self._handle_io(args, data, result, binary=True) + logger.debug('send_keys result: %r', result.__dict__) + data.close() + return result + def delete_keys(self, fingerprints, secret=False): which='key' if secret: which='secret-key' if _is_sequence(fingerprints): - fingerprints = ' '.join(fingerprints) - args = ['--batch --delete-%s "%s"' % (which, fingerprints)] + fingerprints = ' '.join([shell_quote(s) for s in fingerprints]) + else: + fingerprints = shell_quote(fingerprints) + args = ['--batch --delete-%s %s' % (which, fingerprints)] result = self.result_map['delete'](self) p = self._open_subprocess(args) self._collect_output(p, result, stdin=p.stdin) @@ -896,8 +1000,10 @@ class GPG(object): if secret: which='-secret-key' if _is_sequence(keyids): - keyids = ' '.join(['"%s"' % k for k in keyids]) - args = ["--armor --export%s %s" % (which, keyids)] + keyids = ' '.join([shell_quote(k) for k in keyids]) + else: + keyids = shell_quote(keyids) + args = ['--armor --export%s %s' % (which, keyids)] p = self._open_subprocess(args) # gpg --export produces no status-fd output; stdout will be # empty in case of failure @@ -954,6 +1060,46 @@ class GPG(object): getattr(result, keyword)(L) return result + def search_keys(self, query, keyserver='pgp.mit.edu'): + """ search keyserver by query (using --search-keys option) + + >>> import shutil + >>> shutil.rmtree('keys') + >>> gpg = GPG(gnupghome='keys') + >>> os.chmod('keys', 0x1C0) + >>> result = gpg.search_keys('') + >>> assert result + >>> keyserver = 'keyserver.ubuntu.com' + >>> result = gpg.search_keys('', keyserver) + >>> assert result + + """ + + args = ['--fixed-list-mode', '--fingerprint', '--with-colons', + '--keyserver', shell_quote(keyserver), '--search-keys', + shell_quote(query)] + p = self._open_subprocess(args) + + # Get the response information + result = self.result_map['search'](self) + self._collect_output(p, result, stdin=p.stdin) + lines = result.data.decode(self.encoding, + self.decode_errors).splitlines() + valid_keywords = ['pub', 'uid'] + for line in lines: + if self.verbose: + print(line) + logger.debug('line: %r', line.rstrip()) + if not line: # sometimes get blank lines on Windows + continue + L = line.strip().split(':') + if not L: + continue + keyword = L[0] + if keyword in valid_keywords: + getattr(result, keyword)(L) + return result + def gen_key(self, input): """Generate a key; you might use gen_key_input() to create the control input. @@ -983,9 +1129,8 @@ class GPG(object): if str(val).strip(): # skip empty strings parms[key] = val parms.setdefault('Key-Type','RSA') - parms.setdefault('Key-Length',1024) + parms.setdefault('Key-Length',2048) parms.setdefault('Name-Real', "Autogenerated Key") - parms.setdefault('Name-Comment', "Generated by gnupg.py") try: logname = os.environ['LOGNAME'] except KeyError: @@ -1030,21 +1175,26 @@ class GPG(object): "Encrypt the message read from the file-like object 'file'" args = ['--encrypt'] if symmetric: + # can't be False or None - could be True or a cipher algo value + # such as AES256 args = ['--symmetric'] + if symmetric is not True: + args.extend(['--cipher-algo', shell_quote(symmetric)]) + # else use the default, currently CAST5 else: args = ['--encrypt'] if not _is_sequence(recipients): recipients = (recipients,) for recipient in recipients: - args.append('--recipient "%s"' % recipient) - if armor: # create ascii-armored output - set to False for binary output + args.append('--recipient %s' % shell_quote(recipient)) + if armor: # create ascii-armored output - False for binary output args.append('--armor') if output: # write the output to a file with the specified name if os.path.exists(output): os.remove(output) # to avoid overwrite confirmation message - args.append('--output "%s"' % output) + args.append('--output %s' % shell_quote(output)) if sign: - args.append('--sign --default-key "%s"' % sign) + args.append('--sign --default-key %s' % shell_quote(sign)) if always_trust: args.append("--always-trust") result = self.result_map['crypt'](self) @@ -1110,7 +1260,7 @@ class GPG(object): if output: # write the output to a file with the specified name if os.path.exists(output): os.remove(output) # to avoid overwrite confirmation message - args.append('--output "%s"' % output) + args.append('--output %s' % shell_quote(output)) if always_trust: args.append("--always-trust") result = self.result_map['crypt'](self) diff --git a/test_gnupg.py b/test_gnupg.py index e8ab042..a720ca4 100644 --- a/test_gnupg.py +++ b/test_gnupg.py @@ -16,7 +16,7 @@ import unittest import gnupg __author__ = "Vinay Sajip" -__date__ = "$11-Mar-2013 23:48:26$" +__date__ = "$30-Aug-2013 18:10:57$" ALL_TESTS = True @@ -184,6 +184,7 @@ class GPGTestCase(unittest.TestCase): '(dummy comment) ') def test_key_generation_with_escapes(self): + "Test that key generation handles escape characters" cmd = self.gpg.gen_key_input(name_comment='Funny chars: ' '\\r\\n\\f\\v\\0\\b', name_real='Test Name', @@ -201,15 +202,14 @@ class GPGTestCase(unittest.TestCase): def test_key_generation_with_empty_value(self): "Test that key generation handles empty values" params = { - 'key_type': 'RSA', - 'key_length': 1024, - 'name_comment': ' ', # Not added, so default will appear + 'key_type': ' ', + 'key_length': 2048, } cmd = self.gpg.gen_key_input(**params) - self.assertTrue('\nName-Comment: Generated by gnupg.py\n' in cmd) - params['name_comment'] = 'A' + self.assertTrue('Key-Type: RSA\n' in cmd) + params['key_type'] = 'DSA' cmd = self.gpg.gen_key_input(**params) - self.assertTrue('\nName-Comment: A\n' in cmd) + self.assertTrue('Key-Type: DSA\n' in cmd) def test_list_keys_after_generation(self): "Test that after key generation, the generated key is available" @@ -265,6 +265,12 @@ class GPGTestCase(unittest.TestCase): edata = str(gpg.encrypt(data, None, passphrase='bbrown', symmetric=True)) ddata = gpg.decrypt(edata, passphrase='bbrown') self.assertEqual(data, str(ddata)) + # Test symmetric encryption with non-default cipher + data = "chippy was here" + edata = str(gpg.encrypt(data, None, passphrase='bbrown', + symmetric='AES256')) + ddata = gpg.decrypt(edata, passphrase='bbrown') + self.assertEqual(data, str(ddata)) def test_import_and_export(self): "Test that key import and export works" @@ -446,6 +452,15 @@ class GPGTestCase(unittest.TestCase): os.remove(fn) logger.debug("test_file_encryption_and_decryption ends") + def test_search_keys(self): + "Test that searching for keys works" + r = self.gpg.search_keys('') + self.assertTrue(r) + self.assertTrue('Vinay Sajip ' in r[0]['uids']) + r = self.gpg.search_keys('92905378') + self.assertTrue(r) + self.assertTrue('Vinay Sajip ' in r[0]['uids']) + TEST_GROUPS = { 'sign' : set(['test_signature_verification']), @@ -456,7 +471,8 @@ TEST_GROUPS = { 'test_key_generation_with_invalid_key_type', 'test_key_generation_with_escapes', 'test_key_generation_with_empty_value', - 'test_key_generation_with_colons']), + 'test_key_generation_with_colons', + 'test_search_keys']), 'import' : set(['test_import_only']), 'basic' : set(['test_environment', 'test_list_keys_initial', 'test_nogpg', 'test_make_args']),