Openwall GNU/*/Linux - a small security-enhanced Linux distro for servers
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Date: Tue, 4 Feb 2014 12:04:01 +0200
From: Henri Salo <henri@...v.fi>
To: oss-security@...ts.openwall.com
Subject: Re: CVE request: python-gnupg before 0.3.5 shell
 injection

On Tue, Feb 04, 2014 at 10:35:46AM +0100, Hanno Böck wrote:
> python-gnupg 0.3.5 lists in the changelog:
> "Added improved shell quoting to guard against shell injection."
> 
> Sounds like a severe security issue, but further info is lacking.

Diff attached. New function shell_quote() seems to represent major changes to
shell input quoting against unsafe input.

+# We use the test below because it works for Jython as well as CPython
+if os.path.__name__ == 'ntpath':
+    # On Windows, we don't need shell quoting, other than worrying about
+    # paths with spaces in them.
+    def shell_quote(s):
+        return '"%s"' % s
+else:
+    # Section copied from sarge
+
+    # This regex determines which shell input needs quoting
+    # because it may be unsafe
+    UNSAFE = re.compile(r'[^\w%+,./:=@...)
+
+    def shell_quote(s):
+        """
+        Quote text so that it is safe for Posix command shells.
+
+        For example, "*.py" would be converted to "'*.py'". If the text is
+        considered safe it is returned unquoted.
+
+        :param s: The value to quote
+        :type s: str (or unicode on 2.x)
+        :return: A safe version of the input, from the point of view of Posix
+                 command shells
+        :rtype: The passed-in type
+        """
+        if not isinstance(s, string_types):
+            raise TypeError('Expected string type, got %s' % type(s))
+        if not s:
+            result = "''"
+        elif len(s) >= 2 and (s[0], s[-1]) == ("'", "'"):
+            result = '"%s"' % s.replace('"', r'\"')
+        elif not UNSAFE.search(s):
+            result = s
+        else:
+            result = "'%s'" % s.replace("'", "'\"'\"'")
+        return result
+
+    # end of sarge code

---
Henri Salo

diff --git a/PKG-INFO b/PKG-INFO
index 3514535..5e15564 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,12 +1,12 @@
 Metadata-Version: 1.0
 Name: python-gnupg
-Version: 0.3.4
+Version: 0.3.5
 Summary: A wrapper for the Gnu Privacy Guard (GPG or GnuPG)
 Home-page: http://packages.python.org/python-gnupg/index.html
 Author: Vinay Sajip
 Author-email: vinay_sajip@...-dove.com
 License: Copyright (C) 2008-2013 by Vinay Sajip. All Rights Reserved. See LICENSE for license.
-Download-URL: http://python-gnupg.googlecode.com/files/python-gnupg-0.3.4.tar.gz
+Download-URL: http://python-gnupg.googlecode.com/files/python-gnupg-0.3.5.tar.gz
 Description: This module allows easy access to GnuPG's key management, encryption and signature functionality from Python programs. It is intended for use with Python 2.4 or greater.
 Platform: No particular restrictions
 Classifier: Development Status :: 5 - Production/Stable
diff --git a/gnupg.py b/gnupg.py
index e5060af..b19f3cf 100644
--- a/gnupg.py
+++ b/gnupg.py
@@ -33,9 +33,9 @@ A unittest harness (test_gnupg.py) has also been added.
 """
 import locale
 
-__version__ = "0.3.4"
+__version__ = "0.3.5"
 __author__ = "Vinay Sajip"
-__date__  = "$05-Jun-2013 09:48:54$"
+__date__  = "$30-Aug-2013 18:10:36$"
 
 try:
     from io import StringIO
@@ -73,6 +73,47 @@ logger = logging.getLogger(__name__)
 if not logger.handlers:
     logger.addHandler(NullHandler())
 
+# We use the test below because it works for Jython as well as CPython
+if os.path.__name__ == 'ntpath':
+    # On Windows, we don't need shell quoting, other than worrying about
+    # paths with spaces in them.
+    def shell_quote(s):
+        return '"%s"' % s
+else:
+    # Section copied from sarge
+
+    # This regex determines which shell input needs quoting
+    # because it may be unsafe
+    UNSAFE = re.compile(r'[^\w%+,./:=@...)
+
+    def shell_quote(s):
+        """
+        Quote text so that it is safe for Posix command shells.
+
+        For example, "*.py" would be converted to "'*.py'". If the text is
+        considered safe it is returned unquoted.
+
+        :param s: The value to quote
+        :type s: str (or unicode on 2.x)
+        :return: A safe version of the input, from the point of view of Posix
+                 command shells
+        :rtype: The passed-in type
+        """
+        if not isinstance(s, string_types):
+            raise TypeError('Expected string type, got %s' % type(s))
+        if not s:
+            result = "''"
+        elif len(s) >= 2 and (s[0], s[-1]) == ("'", "'"):
+            result = '"%s"' % s.replace('"', r'\"')
+        elif not UNSAFE.search(s):
+            result = s
+        else:
+            result = "'%s'" % s.replace("'", "'\"'\"'")
+        return result
+
+    # end of sarge code
+
+
 def _copy_data(instream, outstream):
     # Copy one stream to another
     sent = 0
@@ -326,6 +367,13 @@ BASIC_ESCAPES = {
     r'\0': '\0',
 }
 
+class SendResult(object):
+    def __init__(self, gpg):
+        self.gpg = gpg
+
+    def handle_status(self, key, value):
+        logger.debug('SendResult: %s: %s', key, value)
+
 class ListKeys(list):
     ''' Handle status messages for --list-keys.
 
@@ -384,6 +432,40 @@ class ListKeys(list):
     def handle_status(self, key, value):
         pass
 
+class SearchKeys(list):
+    ''' Handle status messages for --search-keys.
+
+        Handle pub and uid (relating the latter to the former).
+
+        Don't care about the rest
+    '''
+    def __init__(self, gpg):
+        self.gpg = gpg
+        self.curkey = None
+        self.fingerprints = []
+        self.uids = []
+
+    def pub(self, args):
+        vars = ("""
+            type keyid algo length date expires
+        """).split()
+        self.curkey = {}
+        for i in range(len(vars)):
+            self.curkey[vars[i]] = args[i]
+        self.curkey['uids'] = []
+        self.append(self.curkey)
+
+    def uid(self, args):
+        uid = args[1]
+        uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
+        for k, v in BASIC_ESCAPES.items():
+            uid = uid.replace(k, v)
+        self.curkey['uids'].append(uid)
+        self.uids.append(uid)
+
+    def handle_status(self, key, value):
+        pass
+
 class Crypt(Verify):
     "Handle status messages for --encrypt and --decrypt"
     def __init__(self, gpg):
@@ -528,7 +610,9 @@ class GPG(object):
         'delete': DeleteResult,
         'generate': GenKey,
         'import': ImportResult,
+        'send': SendResult,
         'list': ListKeys,
+        'search': SearchKeys,
         'sign': Sign,
         'verify': Verify,
     }
@@ -571,6 +655,10 @@ class GPG(object):
         self.encoding = locale.getpreferredencoding()
         if self.encoding is None: # This happens on Jython!
             self.encoding = sys.stdin.encoding
+        if self.encoding is None:
+            logger.warning('No encoding found via locale.getpreferredencoding '
+                           'or sys.stdin.encoding, defaulting to utf-8.')
+            self.encoding = 'utf-8'
         if gnupghome and not os.path.isdir(self.gnupghome):
             os.makedirs(self.gnupghome,0x1C0)
         p = self._open_subprocess(["--version"])
@@ -594,14 +682,14 @@ class GPG(object):
         """
         cmd = [self.gpgbinary, '--status-fd 2 --no-tty']
         if self.gnupghome:
-            cmd.append('--homedir "%s"' % self.gnupghome)
+            cmd.append('--homedir %s' % shell_quote(self.gnupghome))
         if self.keyring:
             cmd.append('--no-default-keyring')
             for fn in self.keyring:
-                cmd.append('--keyring "%s"' % fn)
+                cmd.append('--keyring %s' % shell_quote(fn))
         if self.secret_keyring:
             for fn in self.secret_keyring:
-                cmd.append('--secret-keyring "%s"' % fn)
+                cmd.append('--secret-keyring %s' % shell_quote(fn))
         if passphrase:
             cmd.append('--batch --passphrase-fd 0')
         if self.use_agent:
@@ -735,7 +823,7 @@ class GPG(object):
         elif clearsign:
             args.append("--clearsign")
         if keyid:
-            args.append('--default-key "%s"' % keyid)
+            args.append('--default-key %s' % shell_quote(keyid))
         result = self.result_map['sign'](self)
         #We could use _handle_io here except for the fact that if the
         #passphrase is bad, gpg bails and you can't write the message.
@@ -787,8 +875,8 @@ class GPG(object):
             logger.debug('Wrote to temp file: %r', s)
             os.write(fd, s)
             os.close(fd)
-            args.append(fn)
-            args.append('"%s"' % data_filename)
+            args.append(shell_quote(fn))
+            args.append(shell_quote(data_filename))
             try:
                 p = self._open_subprocess(args)
                 self._collect_output(p, result, stdin=p.stdin)
@@ -856,9 +944,6 @@ class GPG(object):
     def recv_keys(self, keyserver, *keyids):
         """Import a key from a keyserver
 
-        The doctest assertion is skipped in Jython because security permissions
-        may prevent the recv_keys from succeeding.
-
         >>> import shutil
         >>> shutil.rmtree("keys")
         >>> gpg = GPG(gnupghome="keys")
@@ -871,20 +956,39 @@ class GPG(object):
         logger.debug('recv_keys: %r', keyids)
         data = _make_binary_stream("", self.encoding)
         #data = ""
-        args = ['--keyserver', keyserver, '--recv-keys']
-        args.extend(keyids)
+        args = ['--keyserver', shell_quote(keyserver), '--recv-keys']
+        args.extend([shell_quote(k) for k in keyids])
         self._handle_io(args, data, result, binary=True)
         logger.debug('recv_keys result: %r', result.__dict__)
         data.close()
         return result
 
+    def send_keys(self, keyserver, *keyids):
+        """Send a key to a keyserver.
+
+        Note: it's not practical to test this function without sending
+        arbitrary data to live keyservers.
+        """
+        result = self.result_map['send'](self)
+        logger.debug('send_keys: %r', keyids)
+        data = _make_binary_stream('', self.encoding)
+        #data = ""
+        args = ['--keyserver', shell_quote(keyserver), '--send-keys']
+        args.extend([shell_quote(k) for k in keyids])
+        self._handle_io(args, data, result, binary=True)
+        logger.debug('send_keys result: %r', result.__dict__)
+        data.close()
+        return result
+
     def delete_keys(self, fingerprints, secret=False):
         which='key'
         if secret:
             which='secret-key'
         if _is_sequence(fingerprints):
-            fingerprints = ' '.join(fingerprints)
-        args = ['--batch --delete-%s "%s"' % (which, fingerprints)]
+            fingerprints = ' '.join([shell_quote(s) for s in fingerprints])
+        else:
+            fingerprints = shell_quote(fingerprints)
+        args = ['--batch --delete-%s %s' % (which, fingerprints)]
         result = self.result_map['delete'](self)
         p = self._open_subprocess(args)
         self._collect_output(p, result, stdin=p.stdin)
@@ -896,8 +1000,10 @@ class GPG(object):
         if secret:
             which='-secret-key'
         if _is_sequence(keyids):
-            keyids = ' '.join(['"%s"' % k for k in keyids])
-        args = ["--armor --export%s %s" % (which, keyids)]
+            keyids = ' '.join([shell_quote(k) for k in keyids])
+        else:
+            keyids = shell_quote(keyids)
+        args = ['--armor --export%s %s' % (which, keyids)]
         p = self._open_subprocess(args)
         # gpg --export produces no status-fd output; stdout will be
         # empty in case of failure
@@ -954,6 +1060,46 @@ class GPG(object):
                 getattr(result, keyword)(L)
         return result
 
+    def search_keys(self, query, keyserver='pgp.mit.edu'):
+        """ search keyserver by query (using --search-keys option)
+
+        >>> import shutil
+        >>> shutil.rmtree('keys')
+        >>> gpg = GPG(gnupghome='keys')
+        >>> os.chmod('keys', 0x1C0)
+        >>> result = gpg.search_keys('<vinay_sajip@...mail.com>')
+        >>> assert result
+        >>> keyserver = 'keyserver.ubuntu.com'
+        >>> result = gpg.search_keys('<vinay_sajip@...mail.com>', keyserver)
+        >>> assert result
+
+        """
+
+        args = ['--fixed-list-mode', '--fingerprint', '--with-colons',
+                '--keyserver', shell_quote(keyserver), '--search-keys',
+                shell_quote(query)]
+        p = self._open_subprocess(args)
+
+        # Get the response information
+        result = self.result_map['search'](self)
+        self._collect_output(p, result, stdin=p.stdin)
+        lines = result.data.decode(self.encoding,
+                                   self.decode_errors).splitlines()
+        valid_keywords = ['pub', 'uid']
+        for line in lines:
+            if self.verbose:
+                print(line)
+            logger.debug('line: %r', line.rstrip())
+            if not line:    # sometimes get blank lines on Windows
+                continue
+            L = line.strip().split(':')
+            if not L:
+                continue
+            keyword = L[0]
+            if keyword in valid_keywords:
+                getattr(result, keyword)(L)
+        return result
+
     def gen_key(self, input):
         """Generate a key; you might use gen_key_input() to create the
         control input.
@@ -983,9 +1129,8 @@ class GPG(object):
             if str(val).strip():    # skip empty strings
                 parms[key] = val
         parms.setdefault('Key-Type','RSA')
-        parms.setdefault('Key-Length',1024)
+        parms.setdefault('Key-Length',2048)
         parms.setdefault('Name-Real', "Autogenerated Key")
-        parms.setdefault('Name-Comment', "Generated by gnupg.py")
         try:
             logname = os.environ['LOGNAME']
         except KeyError:
@@ -1030,21 +1175,26 @@ class GPG(object):
         "Encrypt the message read from the file-like object 'file'"
         args = ['--encrypt']
         if symmetric:
+            # can't be False or None - could be True or a cipher algo value
+            # such as AES256
             args = ['--symmetric']
+            if symmetric is not True:
+                args.extend(['--cipher-algo', shell_quote(symmetric)])
+            # else use the default, currently CAST5
         else:
             args = ['--encrypt']
             if not _is_sequence(recipients):
                 recipients = (recipients,)
             for recipient in recipients:
-                args.append('--recipient "%s"' % recipient)
-        if armor:   # create ascii-armored output - set to False for binary output
+                args.append('--recipient %s' % shell_quote(recipient))
+        if armor:   # create ascii-armored output - False for binary output
             args.append('--armor')
         if output:  # write the output to a file with the specified name
             if os.path.exists(output):
                 os.remove(output) # to avoid overwrite confirmation message
-            args.append('--output "%s"' % output)
+            args.append('--output %s' % shell_quote(output))
         if sign:
-            args.append('--sign --default-key "%s"' % sign)
+            args.append('--sign --default-key %s' % shell_quote(sign))
         if always_trust:
             args.append("--always-trust")
         result = self.result_map['crypt'](self)
@@ -1110,7 +1260,7 @@ class GPG(object):
         if output:  # write the output to a file with the specified name
             if os.path.exists(output):
                 os.remove(output) # to avoid overwrite confirmation message
-            args.append('--output "%s"' % output)
+            args.append('--output %s' % shell_quote(output))
         if always_trust:
             args.append("--always-trust")
         result = self.result_map['crypt'](self)
diff --git a/test_gnupg.py b/test_gnupg.py
index e8ab042..a720ca4 100644
--- a/test_gnupg.py
+++ b/test_gnupg.py
@@ -16,7 +16,7 @@ import unittest
 import gnupg
 
 __author__ = "Vinay Sajip"
-__date__  = "$11-Mar-2013 23:48:26$"
+__date__  = "$30-Aug-2013 18:10:57$"
 
 ALL_TESTS = True
 
@@ -184,6 +184,7 @@ class GPGTestCase(unittest.TestCase):
                               '(dummy comment) <test.name@...mple.com>')
 
     def test_key_generation_with_escapes(self):
+        "Test that key generation handles escape characters"
         cmd = self.gpg.gen_key_input(name_comment='Funny chars: '
                                                   '\\r\\n\\f\\v\\0\\b',
                                      name_real='Test Name',
@@ -201,15 +202,14 @@ class GPGTestCase(unittest.TestCase):
     def test_key_generation_with_empty_value(self):
         "Test that key generation handles empty values"
         params = {
-            'key_type': 'RSA',
-            'key_length': 1024,
-            'name_comment': ' ', # Not added, so default will appear
+            'key_type': ' ',
+            'key_length': 2048,
         }
         cmd = self.gpg.gen_key_input(**params)
-        self.assertTrue('\nName-Comment: Generated by gnupg.py\n' in cmd)
-        params['name_comment'] = 'A'
+        self.assertTrue('Key-Type: RSA\n' in cmd)
+        params['key_type'] = 'DSA'
         cmd = self.gpg.gen_key_input(**params)
-        self.assertTrue('\nName-Comment: A\n' in cmd)
+        self.assertTrue('Key-Type: DSA\n' in cmd)
         
     def test_list_keys_after_generation(self):
         "Test that after key generation, the generated key is available"
@@ -265,6 +265,12 @@ class GPGTestCase(unittest.TestCase):
         edata = str(gpg.encrypt(data, None, passphrase='bbrown', symmetric=True))
         ddata = gpg.decrypt(edata, passphrase='bbrown')
         self.assertEqual(data, str(ddata))
+        # Test symmetric encryption with non-default cipher
+        data = "chippy was here"
+        edata = str(gpg.encrypt(data, None, passphrase='bbrown',
+                    symmetric='AES256'))
+        ddata = gpg.decrypt(edata, passphrase='bbrown')
+        self.assertEqual(data, str(ddata))
 
     def test_import_and_export(self):
         "Test that key import and export works"
@@ -446,6 +452,15 @@ class GPGTestCase(unittest.TestCase):
                     os.remove(fn)
         logger.debug("test_file_encryption_and_decryption ends")
 
+    def test_search_keys(self):
+        "Test that searching for keys works"
+        r = self.gpg.search_keys('<vinay_sajip@...mail.com>')
+        self.assertTrue(r)
+        self.assertTrue('Vinay Sajip <vinay_sajip@...mail.com>' in r[0]['uids'])
+        r = self.gpg.search_keys('92905378')
+        self.assertTrue(r)
+        self.assertTrue('Vinay Sajip <vinay_sajip@...mail.com>' in r[0]['uids'])
+
 
 TEST_GROUPS = {
     'sign' : set(['test_signature_verification']),
@@ -456,7 +471,8 @@ TEST_GROUPS = {
                  'test_key_generation_with_invalid_key_type',
                  'test_key_generation_with_escapes',
                  'test_key_generation_with_empty_value',
-                 'test_key_generation_with_colons']),
+                 'test_key_generation_with_colons',
+                 'test_search_keys']),
     'import' : set(['test_import_only']),
     'basic' : set(['test_environment', 'test_list_keys_initial',
                    'test_nogpg', 'test_make_args']),


[ CONTENT OF TYPE application/pgp-signature SKIPPED ]

Powered by blists - more mailing lists

Your e-mail address:

Please check out the Open Source Software Security Wiki, which is counterpart to this mailing list.

Powered by Openwall GNU/*/Linux - Powered by OpenVZ