Commit a31e53ce authored by CompileNix's avatar CompileNix
Browse files

add shell command injection for remote cluster actions

parent bf068aea
Loading
Loading
Loading
Loading
+52 −43
Original line number Diff line number Diff line
#!/usr/bin/python3 -u
import sys, argparse, subprocess, json, random, traceback, re, time, signal

parser = argparse.ArgumentParser(description='tool to synchronize ceph rbd images between two clusters', usage='python3 main.py --source rbd/image_name --destination rbd_backup/backup_test_destination')
parser = argparse.ArgumentParser(description='tool to synchronize ceph rbd images between two clusters, using ssh', usage='python3 main.py --source username@server:rbd/image_name --destination rbd_backup/backup_test_destination')

parser.add_argument('-v', '--verbose', action="store_true", dest='verbose', default=False, help='print verbose output')
parser.add_argument('-vv', '--debug', action="store_true", dest='debug', default=False, help='print debug output')
parser.add_argument('-s', '--source', action="store", dest='source', help='the source ceph rbd image', type=str, required=True)
parser.add_argument('-d', '--destination', action="store", dest='destination', help='the destination ceph rbd image', type=str, required=True)
parser.add_argument('-s', '--source', action="store", dest='source', help='the source ceph rbd image: username@server:rbd/image_name', type=str, required=True)
parser.add_argument('-d', '--destination', action="store", dest='destination', help='the destination ceph rbd image: rbd_backup/backup_test_destination', type=str, required=True)
parser.add_argument('-p', '--snapshot-prefix', action="store", dest='snapshotPrefix', help='', required=True, default='backup_snapshot_')
parser.add_argument('-w', '--whole-object', action="store_true", dest='wholeObject', help='do not diff for intra-object deltas. Dramatically improves diff performance but may result in larger delta backup', required=False, default=True)
parser.add_argument('-healty', '--wait-until-healthy', action="store_true", dest='waitHealthy', help='wait until cluster is healthy', required=False, default=True)
parser.add_argument('-no-scrub', '--no-scrubbing', action="store_true", dest='noScrubbing', help='wait for scrubbing to finnish and disable scrubbing (does re-enable scrubbing automatically). This implies --wait-until-healthy', required=False, default=False)
parser.add_argument('-p', '--snapshot-prefix', action="store", dest='snapshotPrefix', help='', required=False, default='backup_snapshot_')

args = parser.parse_args()

@@ -71,45 +71,47 @@ def cephRbdObjectToPath(object):
    return '/'.join([object['pool'], object['image']])

# ----------------------------------------------------------------------------
source = cephRbdPathToObject(args.source)
# username@server:rbd/image_name
remoteConnectionCommand = args.source.split(':')[0]
source = cephRbdPathToObject(args.source.split(':')[1])
destination = cephRbdPathToObject(args.destination)
# ----------------------------------------------------------------------------

def getCephRbdImages(pool: str):
    return execParseJson('rbd -p ' + pool + ' ls --format json')
def getCephRbdImages(pool: str, commandInject: str = ''):
    return execParseJson(commandInject + 'rbd -p ' + pool + ' ls --format json')

def cephRbdImageExists(pool: str, image: str):
    return image in getCephRbdImages(pool)
def cephRbdImageExists(pool: str, image: str, commandInject: str = ''):
    return image in getCephRbdImages(pool, commandInject)

def getCephSnapshots(pool: str, image: str):
    return execParseJson('rbd -p ' + pool + ' snap ls --format json ' + image)
def getCephSnapshots(pool: str, image: str, commandInject: str = ''):
    return execParseJson(commandInject + 'rbd -p ' + pool + ' snap ls --format json ' + image)

def countPreviousCephRbdSnapsots(pool: str, image: str):
def countPreviousCephRbdSnapsots(pool: str, image: str, commandInject: str = ''):
    logMessage('get ceph snapshot count for image ' + image, LOGLEVEL_INFO)
    count = 0
    for snapshot in getCephSnapshots(pool, image):
    for snapshot in getCephSnapshots(pool, image, commandInject):
        if (snapshot['name'].startswith(SNAPSHOT_PREFIX, 0, len(SNAPSHOT_PREFIX))):
            count += 1

    return count

def previousCephRbdSnapsotName(pool: str, image: str):
    logMessage('get ceph snapshot name for image ' + image, LOGLEVEL_INFO)
    for snapshot in getCephSnapshots(pool, image):
def previousCephRbdSnapsotName(pool: str, image: str, commandInject: str = ''):
    logMessage('get ceph snapshot name for image ' + commandInject + pool + '/' + image, LOGLEVEL_INFO)
    for snapshot in getCephSnapshots(pool, image, commandInject):
        if (snapshot['name'].startswith(SNAPSHOT_PREFIX, 0, len(SNAPSHOT_PREFIX))):
            return snapshot['name']
    raise RuntimeError('cannot determine ceph snapshot name, aborting!')

def getBackupMode(source, destination):
    sourceExists = cephRbdImageExists(source['pool'], source['image'])
def getBackupMode(source, destination, commandInject: str = ''):
    sourceExists = cephRbdImageExists(source['pool'], source['image'], commandInject)
    if (not sourceExists):
        raise RuntimeError('invalid arguments, source image does not exist ' + cephRbdObjectToPath(source))

    destinationExists = cephRbdImageExists(destination['pool'], destination['image'])
    destinationExists = cephRbdImageExists(destination['pool'], destination['image'], commandInject)
    if (not destinationExists):
        raise RuntimeError('invalid arguments, destination image does not exist ' + cephRbdObjectToPath(destination))

    sourcePreviousSnapshotCount = countPreviousCephRbdSnapsots(source['pool'], source['image'])
    sourcePreviousSnapshotCount = countPreviousCephRbdSnapsots(source['pool'], source['image'], commandInject)

    if (sourcePreviousSnapshotCount > 1):
        raise RuntimeError('inconsistent state, more than one snapshot for image ' + cephRbdObjectToPath(source))
@@ -123,63 +125,70 @@ def getBackupMode(source, destination):
    if (sourcePreviousSnapshotCount == 0 and not destinationExists):
        return {'mode': BACKUPMODE_INITIAL}
    else:
        return {'mode': BACKUPMODE_INCREMENTAL, 'base_snapshot': previousCephRbdSnapsotName(source['pool'], source['image'])}
        return {'mode': BACKUPMODE_INCREMENTAL, 'base_snapshot': previousCephRbdSnapsotName(source['pool'], source['image'], commandInject)}

def createCephRbdSnapshot(pool: str, image: str):
    logMessage('creating ceph snapshot for image ' + pool + '/' + image, LOGLEVEL_INFO)
def createCephRbdSnapshot(pool: str, image: str, commandInject: str = ''):
    logMessage('creating ceph snapshot for image ' + commandInject + pool + '/' + image, LOGLEVEL_INFO)
    name = SNAPSHOT_PREFIX + ''.join([random.choice('0123456789abcdef') for _ in range(16)])
    logMessage('exec command "rbd -p ' + pool + ' snap create ' + image + '@' + name + '"', LOGLEVEL_INFO)
    logMessage('exec command "' + commandInject + 'rbd -p ' + pool + ' snap create ' + image + '@' + name + '"', LOGLEVEL_INFO)
    if commandInject != '':
        code = subprocess.call(commandInject.strip().split(' ') + ['rbd', '-p', pool, 'snap', 'create', image + '@' + name])
    else:
        code = subprocess.call(['rbd', '-p', pool, 'snap', 'create', image + '@' + name])
    if (code != 0):
        raise RuntimeError('error creating ceph snapshot code: ' + str(code))
    logMessage('ceph snapshot created ' + name, LOGLEVEL_INFO)
    return name

def removeCephRbdSnapshot(pool: str, image: str, snapshot: str):
    execRaw('rbd -p ' + pool + ' snap rm ' + image + '@' + snapshot)
def removeCephRbdSnapshot(pool: str, image: str, snapshot: str, commandInject: str = ''):
    execRaw(commandInject + 'rbd -p ' + pool + ' snap rm ' + image + '@' + snapshot)

def getCephRbdProperties(pool: str, image: str):
    return execParseJson('rbd -p ' + pool + ' --format json info ' + image)
def getCephRbdProperties(pool: str, image: str, commandInject: str = ''):
    return execParseJson('rbd -p ' + pool + ' --format json info ' + image, commandInject)

def setCephScrubbing(enable: bool):
def setCephScrubbing(enable: bool, commandInject: str = ''):
    actionName = 'enable' if enable else 'disable'
    action = 'set' if enable else 'unset'
    logMessage(actionName + ' ceph scrubbing', LOGLEVEL_INFO)
    execRaw('ceph osd ' + action + ' nodeep-scrub')
    execRaw('ceph osd ' + action + ' noscrub')
    execRaw(commandInject + 'ceph osd ' + action + ' nodeep-scrub')
    execRaw(commandInject + 'ceph osd ' + action + ' noscrub')

def waitForCephClusterHealthy():
def waitForCephClusterHealthy(commandInject: str = ''):
    logMessage('waiting for ceph cluster to become healthy', LOGLEVEL_INFO)
    while (execRaw('ceph health detail').startswith('HEALTH_ERR')):
    while (execRaw(commandInject + 'ceph health detail').startswith('HEALTH_ERR')):
        print('.', end='', file=sys.stderr)
        time.sleep(5)

def waitForCephScrubbingCompletion():
def waitForCephScrubbingCompletion(commandInject: str = ''):
    logMessage('waiting for ceph cluster to complete scrubbing', LOGLEVEL_INFO)
    pattern = re.compile("scrubbing")
    while (pattern.search(execRaw('ceph status'))):
    while (pattern.search(execRaw(commandInject + 'ceph status'))):
        print('.', end='', file=sys.stderr)
        time.sleep(5)

def cleanup(arg1 = None, arg2 = None):
def cleanup(arg1 = None, arg2 = None, commandInject: str = ''):
    logMessage('cleaning up...', LOGLEVEL_INFO)

    if (args.noScrubbing):
        setCephScrubbing(True)
        setCephScrubbing(True, commandInject)

try:
    executeOnRemoteCommand = 'ssh ' + remoteConnectionCommand + ' '
    signal.signal(signal.SIGINT, cleanup)
    signal.signal(signal.SIGTERM, cleanup)
    mode = getBackupMode(source, destination)
    mode = getBackupMode(source, destination, executeOnRemoteCommand)

    if (args.waitHealthy or args.noScrubbing):
        waitForCephClusterHealthy()
        waitForCephClusterHealthy(executeOnRemoteCommand)
    if (args.noScrubbing):
        setCephScrubbing(False)
        setCephScrubbing(False, executeOnRemoteCommand)
        waitForCephScrubbingCompletion()
        waitForCephScrubbingCompletion(executeOnRemoteCommand)

    if (mode['mode'] == BACKUPMODE_INITIAL):
        snapshot = createCephRbdSnapshot(source['pool'], source['image'])
        snapshot = createCephRbdSnapshot(source['pool'], source['image'], executeOnRemoteCommand)
        #TODO: create target image
        #createZfsVolume(args.destination, getCephRbdProperties(args.source)['size'])
        #sourcePath = mapCephRbdImage(args.source + '@' + snapshot)
@@ -192,11 +201,11 @@ try:

        logMessage('copy finished', LOGLEVEL_INFO)
        createCephRbdSnapshot(destination['pool'], destination['image'])
        removeCephRbdSnapshot(source['pool'], source['image'], snapshot)
        removeCephRbdSnapshot(source['pool'], source['image'], snapshot, executeOnRemoteCommand)

    if (mode['mode'] == BACKUPMODE_INCREMENTAL):
        snapshot1 = mode['base_snapshot']
        snapshot2 = createCephRbdSnapshot(source['pool'], source['image'])
        snapshot2 = createCephRbdSnapshot(source['pool'], source['image'], executeOnRemoteCommand)

        logMessage('beginning incremental copy from ' + cephRbdObjectToPath(source) + ' to ' + cephRbdObjectToPath(destination), LOGLEVEL_INFO)

@@ -205,7 +214,7 @@ try:

        logMessage('copy finished', LOGLEVEL_INFO)
        createCephRbdSnapshot(destination['pool'], destination['image'])
        removeCephRbdSnapshot(source['pool'], source['image'], snapshot)
        removeCephRbdSnapshot(source['pool'], source['image'], snapshot, executeOnRemoteCommand)

    logMessage(bcolors.OKGREEN + 'Done with ' + cephRbdObjectToPath(source) + ' -> ' + cephRbdObjectToPath(destination) + bcolors.ENDC, LOGLEVEL_INFO)