add pkg2json prototype

master
Oscar Campos 2 years ago
parent a83e4cc45b
commit 8b48f4182c
  1. 141
      pkg2json/pkg2json

@ -0,0 +1,141 @@
#!/usr/bin/env python3
# pkg2json - convert a foo.pkg.tar.zst into JSON
import os
import io
import json
import gzip
import tarfile
import argparse
import zstandard as zstd
from typing import Tuple, Dict, Union
def main(input_file: str, output_dir: str) -> int:
"""Main script entry point
"""
pkg_data: Dict[str, Dict[str, str]] = decompress(input_file)
print(json.dumps(pkg_data, indent=4))
return 0
def decompress(input_file: str) -> Dict[str, Dict[str, str]]:
"""Decompress the given file and return an enclosing data structure
"""
tarbytes: bytes = decompress_zstandard(input_file)
pkginfo_bytes, mtree_bytes = decompress_tardata(tarbytes)
pkginfo = dictify(pkginfo_bytes.decode('utf8'))
mtree = dictify(decompress_gzip(mtree_bytes).decode('utf8'), ' ')
return {
'PKGINFO': pkginfo,
'MTREE': mtree,
}
def decompress_zstandard(input_file: str) -> bytes:
"""Decompress the given file as zstandard
"""
data: bytes = b''
with open(input_file, 'rb') as fd:
dctx = zstd.ZstdDecompressor()
reader = dctx.stream_reader(fd)
while True:
chunk = reader.read(16384)
if not chunk:
break
data += chunk
return data
def decompress_tardata(input_data: bytes) -> Tuple[bytes, bytes]:
"""Decompress the given input_data bytes as a tar file
"""
data = io.BytesIO(input_data)
with tarfile.open('input_data.tar', 'r', fileobj=data) as tar:
pkginfo_fd = tar.extractfile('.PKGINFO')
mtree_fd = tar.extractfile('.MTREE')
pkginfo_data = pkginfo_fd.read() if pkginfo_fd is not None else b''
mtree_data = mtree_fd.read() if mtree_fd is not None else b''
if not data.closed:
data.close()
return pkginfo_data, mtree_data
def decompress_gzip(input_data: bytes) -> bytes:
"""Decompress the given input_data bytes as a gz file
"""
data = io.BytesIO(input_data)
with gzip.GzipFile(mode='rb', fileobj=data) as gz:
mtree_data = gz.read()
if not data.closed:
data.close()
return mtree_data
def dictify(data: str, splitter: str = ' = ') -> Dict[str, str]:
"""Converts the given data into a Dictionary of strings
"""
d: Dict[str, str] = {}
for line in data.splitlines():
if line[0] == '#':
continue
key, value = line.split(splitter, 1)
d[key] = value
return d
def validate_parameters(input_file: str, output_dir: str) -> None:
"""
Validates that input_file exists and is a file and that output_dir exists
and is a directory, if any of those assertions fails it stops execution
"""
if not os.path.exists(input_file) or not os.path.isfile(input_file):
print(f'{input_file} does not exists or is not a valid file')
os._exit(1)
if not os.path.exists(output_dir) or not os.path.isdir(output_dir):
print(f'{output_dir} does not exists or is not a directory')
os._exit(1)
def prepare_argparse() -> Tuple[str, str, bool]:
"""Prepare argparse data and parse it
"""
parser = argparse.ArgumentParser(
description='Extract data from a pacman package and dump it as JSON',
)
parser.add_argument('input_file', action='store', help='input file')
parser.add_argument(
'output_dir', action='store', help='output directory', default='.',
)
prepared_args = parser.parse_args()
input_file, output_dir = prepared_args.input_file, prepared_args.output_dir
# validate input file and output dir existence
validate_parameters(input_file, output_dir)
return input_file, output_dir
if __name__ == '__main__':
os._exit(main(*prepare_argparse()))
Loading…
Cancel
Save