Spaces:
Configuration error

chat / database.py
yonkasoft's picture
Update database.py
10aa31e verified
raw
history blame
3.18 kB
import json
import os
import boto3
from pymongo import MongoClient
from dotenv import load_dotenv
load_dotenv()
# Çevresel değişkenlerden yapılandırma bilgilerini alıyoruz
DYNAMO_TABLE = os.getenv('DYNAMO_TABLE')
MONGO_URI = os.getenv('MONGO_URI')
INPUT_DB = os.getenv('INPUT_DB')
# AWS DynamoDB kaynağı
dynamodb = boto3.resource('dynamodb')
def lambda_handler(event, context):
"""AWS Lambda işleyici fonksiyonu."""
method = event['requestContext']['http']['method']
if method == 'POST':
return handle_post(event)
elif method == 'GET':
return handle_get(event)
else:
return {
'statusCode': 405,
'body': json.dumps('Method Not Allowed')
}
def handle_post(event):
"""POST isteklerini işleyin."""
table = dynamodb.Table(DYNAMO_TABLE)
try:
body = json.loads(event['body'])
title = body.get('title')
keywords = body.get('keywords')
text = body.get('text')
if not title or not keywords or not text:
return {
'statusCode': 400,
'body': json.dumps('Invalid input data')
}
# DynamoDB'ye veri ekleyin
table.put_item(
Item={
'title': title,
'keywords': keywords,
'text': text
}
)
# MongoDB'ye veri ekleyin
insert_data_into_input_db([{"title": title, "keywords": keywords, "text": text}])
return {
'statusCode': 200,
'body': json.dumps('Data inserted into DynamoDB and MongoDB!')
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps(f"Error: {str(e)}")
}
def handle_get(event):
"""GET isteklerini işleyin."""
table = dynamodb.Table(DYNAMO_TABLE)
try:
query_params = event.get('queryStringParameters', {})
title = query_params.get('title')
if not title:
return {
'statusCode': 400,
'body': json.dumps('Title parameter is required')
}
# DynamoDB'den veri alın
response = table.get_item(
Key={
'title': title
}
)
item = response.get('Item', {})
if not item:
return {
'statusCode': 404,
'body': json.dumps('Data not found')
}
return {
'statusCode': 200,
'body': json.dumps(item)
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps(f"Error: {str(e)}")
}
def connect_to_mongodb():
"""MongoDB'ye bağlan."""
client = MongoClient(MONGO_URI)
return client
def insert_data_into_input_db(data):
"""Input database'e veri ekle."""
client = connect_to_mongodb()
db = client[INPUT_DB]
collection = db['input'] # Kullanıcıdan alınacak verilerin saklandığı koleksiyon
collection.insert_many(data)
client.close()