import os |
import datetime, time |
import json |
from PIL import Image |
from tqdm import tqdm |
import torch, torchvision |
from torchvision import transforms |
from torchvision.datasets.vision import VisionDataset |
from typing import Any, Callable, Optional, Tuple, List |
import argparse |
def calcPrescale(image_width, image_height, scaleImgforCrop = 512): |
scale = 1.0 |
if ((image_width-scaleImgforCrop)*(image_height-scaleImgforCrop) > 0): |
if abs(1-image_width/scaleImgforCrop) < abs(1-image_height/scaleImgforCrop): |
scale = image_width/scaleImgforCrop |
else: |
scale = image_height/scaleImgforCrop |
return scale |
def prescaleImage(image, scale): |
image_width = int(image.size[0]/scale) |
image_height = int(image.size[1]/scale) |
image_res = image.resize((image_width, image_height)) |
return image_res |
def preProcessImages(org_images_path): |
corruptedImgs = [] |
ccrop_size = 512 |
folder_dir,folder_name = os.path.split(org_images_path) |
cur_dir = os.getcwd() |
processed_images_path = os.path.join(cur_dir,'datasets','wider','val') |
if not os.path.isdir(processed_images_path): |
os.makedirs(processed_images_path) |
imageNames = os.listdir(org_images_path) |
for i, image in enumerate(tqdm(imageNames)): |
try: |
if(image.split('.')[1] == 'jpg'): |
imgDir = os.path.join(org_images_path,image) |
img = Image.open(imgDir) |
image_width = img.size[0] |
image_height = img.size[1] |
scale = calcPrescale(image_width, image_height,scaleImgforCrop=ccrop_size) |
img_resized = prescaleImage(img, scale) |
width, height = img_resized.size |
left = (width - ccrop_size)/2 |
top = (height - ccrop_size)/2 |
right = (width + ccrop_size)/2 |
bottom = (height + ccrop_size)/2 |
img_ccropped = img_resized.crop((left, top, right, bottom)) |
img_ccropped.save(os.path.join(processed_images_path, image)) |
except: |
print('Cannot Load: ' + image + ', check if it is corrupted.') |
corruptedImgs.append(image) |
print('') |
print('Conversion Finished') |
print('') |
if len(corruptedImgs): |
print('Something wrong with the following images and they are not processed:') |
print(corruptedImgs) |
print('Please delete these images from associated annotations') |
return |
class CocoDetection(VisionDataset): |
"""`MS Coco Detection <https://cocodataset.org/#detection-2016>`_ Dataset. |
Args: |
root (string): Root directory where images are downloaded to. |
annFile (string): Path to json annotation file. |
scaleImgforCrop (int, optional): Img and target BBs are scaled with |
constant aspect ratio st: |
if image width, image height > scaleImgforCrop image is shrinked |
until width or height becomes equal to scaleImgforCrop |
if image width, image height < scaleImgforCrop image is expanded |
until width or height becomes equal to scaleImgforCrop |
else no scaling |
transform (callable, optional): A function/transform that takes in an |
PIL image and returns a transformed version. E.g, ``transforms.ToTensor`` |
target_transform (callable, optional): A function/transform that takes in |
the target and transforms it. |
transforms (callable, optional): A function/transform that takes input |
sample and its target as entry and returns a transformed version. |
""" |
def __init__( |
self, |
root: str, |
annFile: str, |
scaleImgforCrop: int= None, |
transform: Optional[Callable] = None, |
target_transform: Optional[Callable] = None, |
transforms: Optional[Callable] = None |
): |
super().__init__(root, transforms, transform, target_transform) |
from pycocotools.coco import COCO |
self.coco = COCO(annFile) |
self.ids = list(sorted(self.coco.imgs.keys())) |
self.annFilePath = os.path.join('.',annFile) |
self.catPersonId = self.coco.getCatIds(catNms=['person'])[0] |
self.scaleImgforCrop = scaleImgforCrop |
def _load_image(self, id: int) -> Image.Image: |
path = self.coco.loadImgs(id)[0]["file_name"] |
return Image.open(os.path.join(self.root, path)).convert("RGB") |
def _load_target(self, id) -> List[Any]: |
return self.coco.loadAnns(self.coco.getAnnIds(id, iscrowd=False)) |
def __getitem__(self, index: int) -> Tuple[Any, Any, Any]: |
id = self.ids[index] |
imgID = id |
try: |
image = self._load_image(id) |
except: |
print(f'********Unable to load image with id: {imgID}********') |
print('Please check if image is corrupted, and remove it from annotations if necessary.') |
target = copy.deepcopy(self._load_target(id)) |
image_width = image.size[0] |
image_height = image.size[1] |
scale = self._calcPrescale(image_width=image_width, image_height=image_height) |
image = self._prescaleImage(image, scale) |
for i, t in enumerate(target): |
BB = t['bbox'].copy() |
scaledBB = self._prescaleBB(BB,scale) |
target[i]['bbox'] = scaledBB |
image_width = image.size[0] |
image_height = image.size[1] |
centerCropped = False |
if self.transforms is not None: |
image, target = self.transforms(image, target) |
for t in self.transforms.transform.transforms: |
if (type(t) == torchvision.transforms.transforms.CenterCrop): |
centerCropped = True |
x_scale = image.size(2) / image_width |
y_scale = image.size(1) / image_height |
bbox_arr = [] |
for idx,ann in enumerate(target): |
if ann['category_id'] == self.catPersonId: |
crop_size = image.shape[1] |
if centerCropped: |
bbox = ann['bbox'].copy() |
croppedBB = self.cropBBox(bbox, crop_size,image_height,image_width) |
else: |
croppedBB = torch.tensor(ann['bbox']) |
if not (croppedBB == None): |
bbox_arr.append(croppedBB) |
if len(bbox_arr) != 0: |
bbox_arr = torch.stack(bbox_arr) |
wh = bbox_arr[:, 2:] |
xy = bbox_arr[:, :2] |
id_tensor = torch.tensor([id]).unsqueeze(0).expand(bbox_arr.size(0), -1) |
bbox_arr = torch.cat([id_tensor, xy, wh], dim=-1) |
else: |
bbox_arr = torch.tensor(bbox_arr) |
return image, bbox_arr , imgID |
def __len__(self) -> int: |
return len(self.ids) |
def get_labels(self): |
labels = [] |
for id in self.ids: |
anns = self._load_target(id) |
person_flag = False |
for ann in anns: |
person_flag = ann['category_id'] == self.catPersonId |
if person_flag == True: |
break |
if person_flag == True: |
labels.append(1) |
else: |
labels.append(0) |
return torch.tensor(labels) |
def get_cat_person_id(self): |
return self.catPersonId |
def get_coco_api(self): |
return self.coco |
def _calcPrescale(self, image_width, image_height): |
scale = 1.0 |
if self.scaleImgforCrop != None: |
if ((image_width-self.scaleImgforCrop)*(image_height-self.scaleImgforCrop) > 0): |
if abs(1-image_width/self.scaleImgforCrop) < abs(1-image_height/self.scaleImgforCrop): |
scale = image_width/self.scaleImgforCrop |
else: |
scale = image_height/self.scaleImgforCrop |
return scale |
def _prescaleImage(self, image, scale): |
image_width = int(image.size[0]/scale) |
image_height = int(image.size[1]/scale) |
t = transforms.Resize([image_height,image_width]) |
image = t(image) |
return image |
def _prescaleBB(self, BB, scale): |
scaledbb = [round(p/scale,1) for p in BB] |
return scaledbb |
def cropBBox(self,bbox,crop_size, image_height, image_width): |
bbox_aligned = [] |
x, y, w, h = bbox[0], bbox[1], bbox[2], bbox[3] |
if image_height < crop_size: |
offset = (crop_size - image_height) // 2 |
y = y + offset |
if (y+h) > crop_size: |
offset = (y+h)-crop_size |
h = h - offset |
if image_width < crop_size: |
offset = (crop_size - image_width) // 2 |
x = x + offset |
if (x+w) > crop_size: |
offset = (x+w)-crop_size |
w = w - offset |
if image_width > crop_size: |
offset = (image_width - crop_size) // 2 |
if offset > x: |
w = w -(offset-x) |
x = 0 |
else: |
x = x - offset |
if (x+w) > crop_size: |
offset = (x+w)-crop_size |
w = w - offset |
if image_height > crop_size: |
offset = (image_height - crop_size) // 2 |
if offset > y: |
h = h -(offset-y) |
y = 0 |
else: |
y = y - offset |
if (y+h) > crop_size: |
offset = (y+h)-crop_size |
h = h - offset |
bbox_aligned.append(x) |
bbox_aligned.append(y) |
bbox_aligned.append(w) |
bbox_aligned.append(h) |
if ((w <= 0) or (h <= 0)): |
return None |
else: |
x_scale, y_scale = 1.0,1.0 |
return torch.mul(torch.tensor(bbox_aligned), torch.tensor([x_scale, y_scale, x_scale, y_scale])) |
def __round_floats(self,o): |
''' |
Used to round floats before writing to json file |
''' |
if isinstance(o, float): |
return round(o, 2) |
if isinstance(o, dict): |
return {k: self.__round_floats(v) for k, v in o.items()} |
if isinstance(o, (list, tuple)): |
return [self.__round_floats(x) for x in o] |
return o |
def createResizedAnnotJson(self,targetFileName,cropsize = 512): |
''' |
Resizes person annotations after center crop operation and saves as json file to the |
directory of original annotations with the name "targetFileName" |
''' |
t1 = time.time() |
path, annotfilename = os.path.split(self.annFilePath) |
resizedAnnotPath = os.path.join(path,targetFileName) |
print('') |
print(f'Creating Json file for resized annotations: {resizedAnnotPath}') |
with open(self.annFilePath) as json_file: |
resizedanotDict = json.load(json_file) |
origannList = resizedanotDict['annotations'] |
centerCropped = False |
if self.transforms is not None: |
for t in self.transforms.transform.transforms: |
if (type(t) == torchvision.transforms.transforms.CenterCrop): |
centerCropped = True |
resizedannList = [] |
for resizedannot in origannList: |
currentcatID = resizedannot['category_id'] |
currentBB = resizedannot['bbox'] |
currentImgID = resizedannot['image_id'] |
image_width = self.coco.loadImgs(currentImgID)[0]['width'] |
image_height = self.coco.loadImgs(currentImgID)[0]['height'] |
scale = self._calcPrescale(image_width=image_width, image_height=image_height) |
image_width = image_width / scale |
image_height = image_height / scale |
if currentcatID == self.catPersonId: |
bbox = resizedannot['bbox'].copy() |
bbox = self._prescaleBB(bbox, scale) |
if centerCropped: |
croppedBB = self.cropBBox(bbox, cropsize,image_height,image_width) |
else: |
croppedBB = torch.tensor(bbox) |
if (croppedBB != None): |
croppedBB = croppedBB.tolist() |
resizedannot['bbox'] = self.__round_floats(croppedBB) |
resizedannot['area'] = self.__round_floats(croppedBB[2]*croppedBB[3]) |
resizedannList.append(resizedannot) |
else: |
resizedannList.append(resizedannot) |
resizedanotDict['annotations'] = resizedannList |
print('Saving resized annotations to json file...') |
resizedanotDict = json.dumps(resizedanotDict) |
with open(resizedAnnotPath, 'w') as outfile: |
outfile.write(resizedanotDict) |
print(f'{resizedAnnotPath} saved.') |
t2 = time.time() |
print(f'Elapsed time: {t2-t1} seconds') |
def create_image_info(image_id, file_name, image_size, |
date_captured=datetime.datetime.utcnow().isoformat(' '), |
license_id=1, coco_url="", flickr_url=""): |
image_info = { |
"id": image_id, |
"file_name": file_name, |
"width": image_size[0], |
"height": image_size[1], |
"date_captured": date_captured, |
"license": license_id, |
"coco_url": coco_url, |
"flickr_url": flickr_url |
} |
return image_info |
def create_annotation_info(annotation_id, image_id, category_info, bounding_box): |
is_crowd = category_info['is_crowd'] |
annotation_info = { |
"id": annotation_id, |
"image_id": image_id, |
"category_id": category_info["id"], |
"iscrowd": is_crowd, |
"bbox": bounding_box |
} |
return annotation_info |
def convWidertoCOCO(annotFile, orgImageDir): |
''' |
Converts wider dataset annotations to COCO format. |
Args: |
annotFile: Original annotation file |
orgImageDir: Original Images directory |
''' |
totalImgnum = 0 |
imgID = 0 |
annID = 0 |
imgList = [] |
annList = [] |
category_info= {} |
category_info['is_crowd'] = False |
category_info['id'] = 1 |
data ={} |
data['info'] = {'description': 'Example Dataset', 'url': '', 'version': '0.1.0', 'year': 2022, 'contributor': 'ljp', 'date_created': '2019-07-18 06:56:33.567522'} |
data['categories'] = [{'id': 1, 'name': 'person', 'supercategory': 'person'}] |
data['licences'] = [{'id': 1, 'name': 'Attribution-NonCommercial-ShareAlike License', 'url': 'http://creativecommons.org/licenses/by-nc-sa/2.0/'}] |
with open(annotFile) as f: |
for _, annot_raw in enumerate(tqdm(f)): |
imgID += 1 |
annot_raw = annot_raw.split() |
imgName = annot_raw[:1][0] |
totalImgnum +=1 |
imageFullPath = os.path.join(orgImageDir,imgName) |
try: |
curImg = Image.open(imageFullPath) |
image_size = curImg.size |
BBs_str = annot_raw[1:] |
bb_raw = [int(bb) for bb in BBs_str] |
imgInf = create_image_info(image_id = imgID, file_name = imgName, image_size =image_size, |
date_captured=datetime.datetime.utcnow().isoformat(' '), |
license_id=1, coco_url="", flickr_url="") |
imgList.append(imgInf) |
bb = [] |
for i, p in enumerate(bb_raw): |
bb.append(p) |
if ((i+1)%4 == 0): |
annID += 1 |
ann = create_annotation_info(annID, imgID, category_info = category_info, bounding_box = bb) |
annList.append(ann) |
bb = [] |
except: |
print(f'Cannot create annot for {imgName}, image does not exist in given directory.') |
data['annotations'] = annList |
data['images'] = imgList |
cur_dir = os.getcwd() |
processed_annot_path = os.path.join(cur_dir,'datasets','wider','annotations') |
if not os.path.isdir(processed_annot_path): |
os.makedirs(processed_annot_path) |
orgCOCOAnnotFile = os.path.join( processed_annot_path ,'orig_annot.json') |
with open(orgCOCOAnnotFile, 'w') as fp: |
json.dump(data, fp) |
print('Annotations saved as: ' + orgCOCOAnnotFile) |
print(f'Created {annID} COCO annotations for total {totalImgnum} images') |
print('') |
return orgCOCOAnnotFile |
def main(): |
parser = argparse.ArgumentParser(description='This script converts original Wider Person' |
'Validation Dataset images to 512 x 512' |
'Then resisez the annotations accordingly, saves new images and annotations under datasets folder') |
parser.add_argument('-ip', '--wider_images_path', type=str, required = True, |
help='path of the folder containing original images') |
parser.add_argument('-af', '--wider_annotfile', type=str, required = True, |
help='full path of original annotations file e.g. ./some/path/some_annot.json') |
args = parser.parse_args() |
wider_images_path = args.wider_images_path |
wider_annotfile = args.wider_annotfile |
print('') |
print('Prescaling and Center-cropping original images to 512 x 512') |
preProcessImages(wider_images_path) |
print('\n'*2) |
print('Converting original annotations to COCO format') |
orgCOCOAnnotFile = convWidertoCOCO(wider_annotfile, wider_images_path) |
print('\n'*2) |
print('Prescaling/Center-cropping original annotations in COCO format') |
transform = transforms.Compose([transforms.CenterCrop(512), transforms.ToTensor()]) |
dataset = CocoDetection(root=wider_images_path, annFile=orgCOCOAnnotFile, transform=transform,scaleImgforCrop= 512) |
targetFileName = 'instances_val.json' |
dataset.createResizedAnnotJson(targetFileName=targetFileName) |
os.remove(orgCOCOAnnotFile) |
if __name__ == '__main__': |
main() |