Openwall GNU/*/Linux - a small security-enhanced Linux distro for servers
[<prev] [next>] [day] [month] [year] [list]
Date: Thu, 17 Oct 2013 09:28:41 -0700
From: Dwayne Litzenberger <dlitz@...tz.net>
To: pycrypto@...ts.dlitz.net, python-crypto@...hon.org,
	oss-security@...ts.openwall.com
Subject: CVE-2013-1445 python-crypto: PRNG not correctly reseeded in some
 situations

In PyCrypto before v2.6.1, the Crypto.Random pseudo-random number
generator (PRNG) exhibits a race condition that may cause it to generate
the same 'random' output in multiple processes that are forked from each
other.  Depending on the application, this could reveal sensitive
information or cryptographic keys to remote attackers.

An application may be affected if, within 100 milliseconds, it performs
the following steps (which may be summarized as "read-fork-read-read"):

1. Read from the Crypto.Random PRNG, causing an internal reseed;
2. Fork the process and invoke Crypto.Random.atfork() in the child;
3. Read from the Crypto.Random PRNG again, in at least two different
     processes (parent and child, or multiple children).

Only applications that invoke Crypto.Random.atfork() and perform the
above steps are affected by this issue.  Other applications are
unaffected.

Note: Some PyCrypto functions, such as key generation and PKCS#1-related
functions, implicitly read from the Crypto.Random PRNG.

== Technical details ==

Crypto.Random uses Fortuna[1] to generate random numbers.  The flow of
entropy looks something like this:

      /dev/urandom  -\
                      +-> "accumulator" --> "generator" --> output
      other sources -/   (entropy pools)     (AES-CTR)

- The "accumulator" maintains several pools that collect entropy from
    the environment.

- The "generator" is a deterministic PRNG that is reseeded by the
    accumulator.  Reseeding normally occurs during each request for random
    numbers, but never more than once every 100 ms (the "minimum reseed
    interval").

When a process is forked, the parent's state is duplicated in the child.
In order to continue using the PRNG, the child process must invoke
Crypto.Random.atfork(), which collects new entropy from /dev/urandom and
adds it to the accumulator.  When new PRNG output is subsequently
requested, some of the new entropy in the accumulator is used to reseed
the generator, causing the output of the child to diverge from its
parent.

However, in previous versions of PyCrypto, Crypto.Random.atfork() did
not explicitly reset the child's rate-limiter, so if the child requested
PRNG output before the minimum reseed interval of 100 ms had elapsed, it
would generate its output using state inherited from its parent.

This created a race condition between the parent process and its forked
children that could cause them to produce identical PRNG output for the
duration of the 100 ms minimum reseed interval.

== Demonstration ==

Here is some sample code that illustrates the problem:

      from binascii import hexlify
      import multiprocessing, pprint, time
      import Crypto.Random

      def task_main(arg):
          a = Crypto.Random.get_random_bytes(8)
          time.sleep(0.1)
          b = Crypto.Random.get_random_bytes(8)
          rdy, ack = arg
          rdy.set()
          ack.wait()
          return "%s,%s" % (hexlify(a).decode(),
                            hexlify(b).decode())

      n_procs = 4
      manager = multiprocessing.Manager()
      rdys = [manager.Event() for i in range(n_procs)]
      acks = [manager.Event() for i in range(n_procs)]
      Crypto.Random.get_random_bytes(1)
      pool = multiprocessing.Pool(processes=n_procs,
                                  initializer=Crypto.Random.atfork)
      res_async = pool.map_async(task_main, zip(rdys, acks))
      pool.close()
      [rdy.wait() for rdy in rdys]
      [ack.set() for ack in acks]
      res = res_async.get()
      pprint.pprint(sorted(res))
      pool.join()

The output should be random, but it looked like this:

      ['c607803ae01aa8c0,2e4de6457a304b34',
       'c607803ae01aa8c0,af80d08942b4c987',
       'c607803ae01aa8c0,b0e4c0853de927c4',
       'c607803ae01aa8c0,f0362585b3fceba4']

== Solution ==

The solution is to upgrade to PyCrypto v2.6.1 or later, which properly
resets the rate-limiter when Crypto.Random.atfork() is invoked in the
child.

== Files ==

PyCrypto v2.6.1 may be downloaded from the PyCrypto website[2], from 
PyPI, or using your operating system's package manager or ports tree.  

The official tarball has the following SHA256 sums:

f2ce1e989b272cfcb677616763e0a2e7ec659effa67a88aa92b3a65528f60a3c *pycrypto-2.6.1.tar.gz
c2ab0516cc55321e6543ae75e2aa6f6e56e97432870f32a7799f3b89f467dc1b *pycrypto-2.6.1.tar.gz.asc

The git repository is here: https://github.com/dlitz/pycrypto/
The v2.6.1 tag id is: ebb470d3f0982702e3e9b7fb9ebdaeed95903aaf
The v2.6.1 commit id is: 7fd528d03b5eae58eef6fd219af5d9ac9c83fa50

For informational purposes, patches against pycrypto v2.6 and v2.1.0 are 
attached.  Distributors patching older versions of the library, please 
remember to run the test suite before releasing a modified package:

    # From the source tree
    python setup.py build test

    # After installation
    python -m Crypto.SelfTest.__init__

== Thanks ==

Thanks to Yves-Alexis Perez and Sebastian Ramacher for helping to 
coordinate the release of this fix.

== References ==

[1] N. Ferguson and B. Schneier, _Practical Cryptography_,
      Indianapolis: Wiley, 2003, pp. 155-184.

[2] https://www.dlitz.net/software/pycrypto/ or http://www.pycrypto.org/

-- 
Dwayne C. Litzenberger <dlitz@...tz.net>
    OpenPGP: 19E1 1FE8 B3CF F273 ED17  4A24 928C EC13 39C2 5CF7

From 19dcf7b15d61b7dc1a125a367151de40df6ef175 Mon Sep 17 00:00:00 2001
From: Dwayne Litzenberger <dlitz@...tz.net>
Date: Mon, 14 Oct 2013 14:37:35 -0700
Subject: [PATCH 1/4] Random: Make Crypto.Random.atfork() set last_reseed=None
 (CVE-2013-1445)

== Summary ==

In PyCrypto before v2.6.1, the Crypto.Random pseudo-random number
generator (PRNG) exhibits a race condition that may cause it to generate
the same 'random' output in multiple processes that are forked from each
other.  Depending on the application, this could reveal sensitive
information or cryptographic keys to remote attackers.

An application may be affected if, within 100 milliseconds, it performs
the following steps (which may be summarized as "read-fork-read-read"):

1. Read from the Crypto.Random PRNG, causing an internal reseed;
2. Fork the process and invoke Crypto.Random.atfork() in the child;
3. Read from the Crypto.Random PRNG again, in at least two different
   processes (parent and child, or multiple children).

Only applications that invoke Crypto.Random.atfork() and perform the
above steps are affected by this issue.  Other applications are
unaffected.

Note: Some PyCrypto functions, such as key generation and PKCS#1-related
functions, implicitly read from the Crypto.Random PRNG.

== Technical details ==

Crypto.Random uses Fortuna[1] to generate random numbers.  The flow of
entropy looks something like this:

    /dev/urandom  -\
                    +-> "accumulator" --> "generator" --> output
    other sources -/   (entropy pools)     (AES-CTR)

- The "accumulator" maintains several pools that collect entropy from
  the environment.

- The "generator" is a deterministic PRNG that is reseeded by the
  accumulator.  Reseeding normally occurs during each request for random
  numbers, but never more than once every 100 ms (the "minimum reseed
  interval").

When a process is forked, the parent's state is duplicated in the child.
In order to continue using the PRNG, the child process must invoke
Crypto.Random.atfork(), which collects new entropy from /dev/urandom and
adds it to the accumulator.  When new PRNG output is subsequently
requested, some of the new entropy in the accumulator is used to reseed
the generator, causing the output of the child to diverge from its
parent.

However, in previous versions of PyCrypto, Crypto.Random.atfork() did
not explicitly reset the child's rate-limiter, so if the child requested
PRNG output before the minimum reseed interval of 100 ms had elapsed, it
would generate its output using state inherited from its parent.

This created a race condition between the parent process and its forked
children that could cause them to produce identical PRNG output for the
duration of the 100 ms minimum reseed interval.

== Demonstration ==

Here is some sample code that illustrates the problem:

    from binascii import hexlify
    import multiprocessing, pprint, time
    import Crypto.Random

    def task_main(arg):
        a = Crypto.Random.get_random_bytes(8)
        time.sleep(0.1)
        b = Crypto.Random.get_random_bytes(8)
        rdy, ack = arg
        rdy.set()
        ack.wait()
        return "%s,%s" % (hexlify(a).decode(),
                          hexlify(b).decode())

    n_procs = 4
    manager = multiprocessing.Manager()
    rdys = [manager.Event() for i in range(n_procs)]
    acks = [manager.Event() for i in range(n_procs)]
    Crypto.Random.get_random_bytes(1)
    pool = multiprocessing.Pool(processes=n_procs,
                                initializer=Crypto.Random.atfork)
    res_async = pool.map_async(task_main, zip(rdys, acks))
    pool.close()
    [rdy.wait() for rdy in rdys]
    [ack.set() for ack in acks]
    res = res_async.get()
    pprint.pprint(sorted(res))
    pool.join()

The output should be random, but it looked like this:

    ['c607803ae01aa8c0,2e4de6457a304b34',
     'c607803ae01aa8c0,af80d08942b4c987',
     'c607803ae01aa8c0,b0e4c0853de927c4',
     'c607803ae01aa8c0,f0362585b3fceba4']

== Solution ==

The solution is to upgrade to PyCrypto v2.6.1 or later, which properly
resets the rate-limiter when Crypto.Random.atfork() is invoked in the
child.

== References ==

[1] N. Ferguson and B. Schneier, _Practical Cryptography_,
    Indianapolis: Wiley, 2003, pp. 155-184.
---
 lib/Crypto/Random/Fortuna/FortunaAccumulator.py    |    9 ++
 lib/Crypto/Random/_UserFriendlyRNG.py              |   15 ++
 lib/Crypto/SelfTest/Random/__init__.py             |    1 +
 .../SelfTest/Random/test__UserFriendlyRNG.py       |  171 ++++++++++++++++++++
 4 files changed, 196 insertions(+)
 create mode 100644 lib/Crypto/SelfTest/Random/test__UserFriendlyRNG.py

diff --git a/lib/Crypto/Random/Fortuna/FortunaAccumulator.py b/lib/Crypto/Random/Fortuna/FortunaAccumulator.py
index 5ebbe2b..1ec6f3c 100644
--- a/lib/Crypto/Random/Fortuna/FortunaAccumulator.py
+++ b/lib/Crypto/Random/Fortuna/FortunaAccumulator.py
@@ -109,6 +109,15 @@ class FortunaAccumulator(object):
         self.pools = [FortunaPool() for i in range(32)]     # 32 pools
         assert(self.pools[0] is not self.pools[1])
 
+    def _forget_last_reseed(self):
+        # This is not part of the standard Fortuna definition, and using this
+        # function frequently can weaken Fortuna's ability to resist a state
+        # compromise extension attack, but we need this in order to properly
+        # implement Crypto.Random.atfork().  Otherwise, forked child processes
+        # might continue to use their parent's PRNG state for up to 100ms in
+        # some cases. (e.g. CVE-2013-1445)
+        self.last_reseed = None
+
     def random_data(self, bytes):
         current_time = time.time()
         if (self.last_reseed is not None and self.last_reseed > current_time): # Avoid float comparison to None to make Py3k happy
diff --git a/lib/Crypto/Random/_UserFriendlyRNG.py b/lib/Crypto/Random/_UserFriendlyRNG.py
index c2a2eae..957e006 100644
--- a/lib/Crypto/Random/_UserFriendlyRNG.py
+++ b/lib/Crypto/Random/_UserFriendlyRNG.py
@@ -90,9 +90,24 @@ class _UserFriendlyRNG(object):
         """Initialize the random number generator and seed it with entropy from
         the operating system.
         """
+
+        # Save the pid (helps ensure that Crypto.Random.atfork() gets called)
         self._pid = os.getpid()
+
+        # Collect entropy from the operating system and feed it to
+        # FortunaAccumulator
         self._ec.reinit()
 
+        # Override FortunaAccumulator's 100ms minimum re-seed interval.  This
+        # is necessary to avoid a race condition between this function and
+        # self.read(), which that can otherwise cause forked child processes to
+        # produce identical output.  (e.g. CVE-2013-1445)
+        #
+        # Note that if this function can be called frequently by an attacker,
+        # (and if the bits from OSRNG are insufficiently random) it will weaken
+        # Fortuna's ability to resist a state compromise extension attack.
+        self._fa._forget_last_reseed()
+
     def close(self):
         self.closed = True
         self._osrng = None
diff --git a/lib/Crypto/SelfTest/Random/__init__.py b/lib/Crypto/SelfTest/Random/__init__.py
index 48d84ff..f972bf0 100644
--- a/lib/Crypto/SelfTest/Random/__init__.py
+++ b/lib/Crypto/SelfTest/Random/__init__.py
@@ -32,6 +32,7 @@ def get_tests(config={}):
     from Crypto.SelfTest.Random import OSRNG;               tests += OSRNG.get_tests(config=config)
     from Crypto.SelfTest.Random import test_random;         tests += test_random.get_tests(config=config)
     from Crypto.SelfTest.Random import test_rpoolcompat;    tests += test_rpoolcompat.get_tests(config=config)
+    from Crypto.SelfTest.Random import test__UserFriendlyRNG; tests += test__UserFriendlyRNG.get_tests(config=config)
     return tests
 
 if __name__ == '__main__':
diff --git a/lib/Crypto/SelfTest/Random/test__UserFriendlyRNG.py b/lib/Crypto/SelfTest/Random/test__UserFriendlyRNG.py
new file mode 100644
index 0000000..771a663
--- /dev/null
+++ b/lib/Crypto/SelfTest/Random/test__UserFriendlyRNG.py
@@ -0,0 +1,171 @@
+# -*- coding: utf-8 -*-
+# Self-tests for the user-friendly Crypto.Random interface
+#
+# Written in 2013 by Dwayne C. Litzenberger <dlitz@...tz.net>
+#
+# ===================================================================
+# The contents of this file are dedicated to the public domain.  To
+# the extent that dedication to the public domain is not available,
+# everyone is granted a worldwide, perpetual, royalty-free,
+# non-exclusive license to exercise all rights associated with the
+# contents of this file for any purpose whatsoever.
+# No rights are reserved.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+# ===================================================================
+
+"""Self-test suite for generic Crypto.Random stuff """
+
+from __future__ import nested_scopes
+
+__revision__ = "$Id$"
+
+import binascii
+import pprint
+import unittest
+import os
+import time
+import sys
+if sys.version_info[0] == 2 and sys.version_info[1] == 1:
+    from Crypto.Util.py21compat import *
+from Crypto.Util.py3compat import *
+
+try:
+    import multiprocessing
+except ImportError:
+    multiprocessing = None
+
+import Crypto.Random._UserFriendlyRNG
+import Crypto.Random.random
+
+class RNGForkTest(unittest.TestCase):
+
+    def _get_reseed_count(self):
+        """
+        Get `FortunaAccumulator.reseed_count`, the global count of the
+        number of times that the PRNG has been reseeded.
+        """
+        rng_singleton = Crypto.Random._UserFriendlyRNG._get_singleton()
+        rng_singleton._lock.acquire()
+        try:
+            return rng_singleton._fa.reseed_count
+        finally:
+            rng_singleton._lock.release()
+
+    def runTest(self):
+        # Regression test for CVE-2013-1445.  We had a bug where, under the
+        # right conditions, two processes might see the same random sequence.
+
+        if sys.platform.startswith('win'):  # windows can't fork
+            assert not hasattr(os, 'fork')    # ... right?
+            return
+
+        # Wait 150 ms so that we don't trigger the rate-limit prematurely.
+        time.sleep(0.15)
+
+        reseed_count_before = self._get_reseed_count()
+
+        # One or both of these calls together should trigger a reseed right here.
+        Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
+        Crypto.Random.get_random_bytes(1)
+
+        reseed_count_after = self._get_reseed_count()
+        self.assertNotEqual(reseed_count_before, reseed_count_after)  # sanity check: test should reseed parent before forking
+
+        rfiles = []
+        for i in range(10):
+            rfd, wfd = os.pipe()
+            if os.fork() == 0:
+                # child
+                os.close(rfd)
+                f = os.fdopen(wfd, "wb")
+
+                Crypto.Random.atfork()
+
+                data = Crypto.Random.get_random_bytes(16)
+
+                f.write(data)
+                f.close()
+                os._exit(0)
+            # parent
+            os.close(wfd)
+            rfiles.append(os.fdopen(rfd, "rb"))
+
+        results = []
+        results_dict = {}
+        for f in rfiles:
+            data = binascii.hexlify(f.read())
+            results.append(data)
+            results_dict[data] = 1
+            f.close()
+
+        if len(results) != len(results_dict.keys()):
+            raise AssertionError("RNG output duplicated across fork():\n%s" %
+                                 (pprint.pformat(results)))
+
+
+# For RNGMultiprocessingForkTest
+def _task_main(q):
+    a = Crypto.Random.get_random_bytes(16)
+    time.sleep(0.1)     # wait 100 ms
+    b = Crypto.Random.get_random_bytes(16)
+    q.put(binascii.b2a_hex(a))
+    q.put(binascii.b2a_hex(b))
+    q.put(None)      # Wait for acknowledgment
+
+
+class RNGMultiprocessingForkTest(unittest.TestCase):
+
+    def runTest(self):
+        # Another regression test for CVE-2013-1445.  This is basically the
+        # same as RNGForkTest, but less compatible with old versions of Python,
+        # and a little easier to read.
+
+        n_procs = 5
+        manager = multiprocessing.Manager()
+        queues = [manager.Queue(1) for i in range(n_procs)]
+
+        # Reseed the pool
+        time.sleep(0.15)
+        Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
+        Crypto.Random.get_random_bytes(1)
+
+        # Start the child processes
+        pool = multiprocessing.Pool(processes=n_procs, initializer=Crypto.Random.atfork)
+        map_result = pool.map_async(_task_main, queues)
+
+        # Get the results, ensuring that no pool processes are reused.
+        aa = [queues[i].get(30) for i in range(n_procs)]
+        bb = [queues[i].get(30) for i in range(n_procs)]
+        res = list(zip(aa, bb))
+
+        # Shut down the pool
+        map_result.get(30)
+        pool.close()
+        pool.join()
+
+        # Check that the results are unique
+        if len(set(aa)) != len(aa) or len(set(res)) != len(res):
+            raise AssertionError("RNG output duplicated across fork():\n%s" %
+                                 (pprint.pformat(res),))
+
+
+def get_tests(config={}):
+    tests = []
+    tests += [RNGForkTest()]
+    if multiprocessing is not None:
+        tests += [RNGMultiprocessingForkTest()]
+    return tests
+
+if __name__ == '__main__':
+    suite = lambda: unittest.TestSuite(get_tests())
+    unittest.main(defaultTest='suite')
+
+# vim:set ts=4 sw=4 sts=4 expandtab:
-- 
1.7.10.4


Description: Fix CVE-2013-1445
 In PyCrypto before v2.6.1, the Crypto.Random pseudo-random number generator
 (PRNG) exhibits a race condition that may cause it to generate the same
 'random' output in multiple processes that are forked from each other.
 Depending on the application, this could reveal sensitive information or
 cryptographic keys to remote attackers.
 .
 An application may be affected if, within 100 milliseconds, it performs the
 following steps:
 .
  1. Read from the Crypto.Random PRNG, causing an internal reseed;
  2. Fork the process and invoke Crypto.Random.atfork() in the child;
  3. Read from the Crypto.Random PRNG again, in at least two different
     processes (parent and child, or multiple children).
 .
 Only applications that invoke Crypto.Random.atfork() and perform the above
 steps are affected by this issue. Other applications are unaffected.
 .
 Note: Some PyCrypto functions, such as key generation and PKCS#1-related
 functions, implicitly read from the Crypto.Random PRNG.
Origin: upstream,
 https://github.com/dlitz/pycrypto/commit/19dcf7b15d61b7dc1a125a367151de40df6ef175
Last-Update: 2013-10-17

Index: python-crypto-2.1.0/lib/Crypto/Random/Fortuna/FortunaAccumulator.py
===================================================================
--- python-crypto-2.1.0.orig/lib/Crypto/Random/Fortuna/FortunaAccumulator.py	2013-10-15 08:37:01.000000000 -0700
+++ python-crypto-2.1.0/lib/Crypto/Random/Fortuna/FortunaAccumulator.py	2013-10-15 08:37:29.000000000 -0700
@@ -103,6 +103,15 @@
         self.pools = [FortunaPool() for i in range(32)]     # 32 pools
         assert(self.pools[0] is not self.pools[1])
 
+    def _forget_last_reseed(self):
+        # This is not part of the standard Fortuna definition, and using this
+        # function frequently can weaken Fortuna's ability to resist a state
+        # compromise extension attack, but we need this in order to properly
+        # implement Crypto.Random.atfork().  Otherwise, forked child processes
+        # might continue to use their parent's PRNG state for up to 100ms in
+        # some cases. (e.g. CVE-2013-1445)
+        self.last_reseed = None
+
     def random_data(self, bytes):
         current_time = time.time()
         if self.last_reseed > current_time:
Index: python-crypto-2.1.0/lib/Crypto/Random/_UserFriendlyRNG.py
===================================================================
--- python-crypto-2.1.0.orig/lib/Crypto/Random/_UserFriendlyRNG.py	2013-10-15 08:37:01.000000000 -0700
+++ python-crypto-2.1.0/lib/Crypto/Random/_UserFriendlyRNG.py	2013-10-15 08:37:29.000000000 -0700
@@ -88,9 +88,24 @@
         """Initialize the random number generator and seed it with entropy from
         the operating system.
         """
+
+        # Save the pid (helps ensure that Crypto.Random.atfork() gets called)
         self._pid = os.getpid()
+
+        # Collect entropy from the operating system and feed it to
+        # FortunaAccumulator
         self._ec.reinit()
 
+        # Override FortunaAccumulator's 100ms minimum re-seed interval.  This
+        # is necessary to avoid a race condition between this function and
+        # self.read(), which that can otherwise cause forked child processes to
+        # produce identical output.  (e.g. CVE-2013-1445)
+        #
+        # Note that if this function can be called frequently by an attacker,
+        # (and if the bits from OSRNG are insufficiently random) it will weaken
+        # Fortuna's ability to resist a state compromise extension attack.
+        self._fa._forget_last_reseed()
+
     def close(self):
         self.closed = True
         self._osrng = None
Index: python-crypto-2.1.0/lib/Crypto/SelfTest/Random/__init__.py
===================================================================
--- python-crypto-2.1.0.orig/lib/Crypto/SelfTest/Random/__init__.py	2013-10-15 08:37:01.000000000 -0700
+++ python-crypto-2.1.0/lib/Crypto/SelfTest/Random/__init__.py	2013-10-15 08:37:29.000000000 -0700
@@ -32,6 +32,7 @@
     import OSRNG;               tests += OSRNG.get_tests(config=config)
     import test_random;         tests += test_random.get_tests(config=config)
     import test_rpoolcompat;    tests += test_rpoolcompat.get_tests(config=config)
+    import test__UserFriendlyRNG; tests += test__UserFriendlyRNG.get_tests(config=config)
     return tests
 
 if __name__ == '__main__':
Index: python-crypto-2.1.0/lib/Crypto/SelfTest/Random/test__UserFriendlyRNG.py
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ python-crypto-2.1.0/lib/Crypto/SelfTest/Random/test__UserFriendlyRNG.py	2013-10-15 08:38:29.000000000 -0700
@@ -0,0 +1,168 @@
+# -*- coding: utf-8 -*-
+# Self-tests for the user-friendly Crypto.Random interface
+#
+# Written in 2013 by Dwayne C. Litzenberger <dlitz@...tz.net>
+#
+# ===================================================================
+# The contents of this file are dedicated to the public domain.  To
+# the extent that dedication to the public domain is not available,
+# everyone is granted a worldwide, perpetual, royalty-free,
+# non-exclusive license to exercise all rights associated with the
+# contents of this file for any purpose whatsoever.
+# No rights are reserved.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+# ===================================================================
+
+"""Self-test suite for generic Crypto.Random stuff """
+
+from __future__ import nested_scopes
+
+__revision__ = "$Id$"
+
+import binascii
+import pprint
+import unittest
+import os
+import time
+import sys
+
+try:
+    import multiprocessing
+except ImportError:
+    multiprocessing = None
+
+import Crypto.Random._UserFriendlyRNG
+import Crypto.Random.random
+
+class RNGForkTest(unittest.TestCase):
+
+    def _get_reseed_count(self):
+        """
+        Get `FortunaAccumulator.reseed_count`, the global count of the
+        number of times that the PRNG has been reseeded.
+        """
+        rng_singleton = Crypto.Random._UserFriendlyRNG._get_singleton()
+        rng_singleton._lock.acquire()
+        try:
+            return rng_singleton._fa.reseed_count
+        finally:
+            rng_singleton._lock.release()
+
+    def runTest(self):
+        # Regression test for CVE-2013-1445.  We had a bug where, under the
+        # right conditions, two processes might see the same random sequence.
+
+        if sys.platform.startswith('win'):  # windows can't fork
+            assert not hasattr(os, 'fork')    # ... right?
+            return
+
+        # Wait 150 ms so that we don't trigger the rate-limit prematurely.
+        time.sleep(0.15)
+
+        reseed_count_before = self._get_reseed_count()
+
+        # One or both of these calls together should trigger a reseed right here.
+        Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
+        Crypto.Random.get_random_bytes(1)
+
+        reseed_count_after = self._get_reseed_count()
+        self.assertNotEqual(reseed_count_before, reseed_count_after)  # sanity check: test should reseed parent before forking
+
+        rfiles = []
+        for i in range(10):
+            rfd, wfd = os.pipe()
+            if os.fork() == 0:
+                # child
+                os.close(rfd)
+                f = os.fdopen(wfd, "wb")
+
+                Crypto.Random.atfork()
+
+                data = Crypto.Random.get_random_bytes(16)
+
+                f.write(data)
+                f.close()
+                os._exit(0)
+            # parent
+            os.close(wfd)
+            rfiles.append(os.fdopen(rfd, "rb"))
+
+        results = []
+        results_dict = {}
+        for f in rfiles:
+            data = binascii.hexlify(f.read())
+            results.append(data)
+            results_dict[data] = 1
+            f.close()
+
+        if len(results) != len(results_dict.keys()):
+            raise AssertionError("RNG output duplicated across fork():\n%s" %
+                                 (pprint.pformat(results)))
+
+
+# For RNGMultiprocessingForkTest
+def _task_main(q):
+    a = Crypto.Random.get_random_bytes(16)
+    time.sleep(0.1)     # wait 100 ms
+    b = Crypto.Random.get_random_bytes(16)
+    q.put(binascii.b2a_hex(a))
+    q.put(binascii.b2a_hex(b))
+    q.put(None)      # Wait for acknowledgment
+
+
+class RNGMultiprocessingForkTest(unittest.TestCase):
+
+    def runTest(self):
+        # Another regression test for CVE-2013-1445.  This is basically the
+        # same as RNGForkTest, but less compatible with old versions of Python,
+        # and a little easier to read.
+
+        n_procs = 5
+        manager = multiprocessing.Manager()
+        queues = [manager.Queue(1) for i in range(n_procs)]
+
+        # Reseed the pool
+        time.sleep(0.15)
+        Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
+        Crypto.Random.get_random_bytes(1)
+
+        # Start the child processes
+        pool = multiprocessing.Pool(processes=n_procs, initializer=Crypto.Random.atfork)
+        map_result = pool.map_async(_task_main, queues)
+
+        # Get the results, ensuring that no pool processes are reused.
+        aa = [queues[i].get(30) for i in range(n_procs)]
+        bb = [queues[i].get(30) for i in range(n_procs)]
+        res = list(zip(aa, bb))
+
+        # Shut down the pool
+        map_result.get(30)
+        pool.close()
+        pool.join()
+
+        # Check that the results are unique
+        if len(set(aa)) != len(aa) or len(set(res)) != len(res):
+            raise AssertionError("RNG output duplicated across fork():\n%s" %
+                                 (pprint.pformat(res),))
+
+
+def get_tests(config={}):
+    tests = []
+    tests += [RNGForkTest()]
+    if multiprocessing is not None:
+        tests += [RNGMultiprocessingForkTest()]
+    return tests
+
+if __name__ == '__main__':
+    suite = lambda: unittest.TestSuite(get_tests())
+    unittest.main(defaultTest='suite')
+
+# vim:set ts=4 sw=4 sts=4 expandtab:


[ CONTENT OF TYPE application/pgp-signature SKIPPED ]

Powered by blists - more mailing lists

Please check out the Open Source Software Security Wiki, which is counterpart to this mailing list.

Powered by Openwall GNU/*/Linux - Powered by OpenVZ