Skip to main content

snippets_py

Integration

New integration from credential
storage_client = yepcode.integration.google_storage("credential-slug")
New integration from plain authentication data
from google.cloud.storage.client import Client
from google.oauth2.service_account import Credentials

project_id = "yepcode"
credentialsDict = {
"type": "service_account",
"project_id": "yepcode",
"private_key_id": "XXXXX",
"private_key": "-----BEGIN PRIVATE KEY-----\nx\n-----END PRIVATE KEY-----",
"client_email": "yepcode@example.org",
"client_id": "1234567890",
"auth_uri": "https://example.org",
"token_uri": "https://example.org",
"auth_provider_x509_cert_url": "https://example.org",
"client_x509_cert_url": "https://example.org",
}
credentials = Credentials.from_service_account_info(credentialsDict)
storage_client = Client(project=project_id, credentials=credentials)

Create a New Bucket

Create a new bucket
storage_client.create_bucket("bucket_name")

Create a Notification

Create a notification
bucket = storage_client.bucket("bucket_name")
notification = bucket.notification(topic_name="topic_name")
notification.create()
print(f"Successfully created notification with ID {notification.notification_id} for bucket {bucket_name}")

Delete a Bucket

Delete a bucket
bucket = storage_client.get_bucket("bucket_name")
bucket.delete()

Delete a Notification

Delete a notification
bucket = storage_client.bucket("bucket_name")
notification = bucket.notification(notification_id=notification_id)
notification.delete()

List Buckets

List buckets
for bucket in storage_client.list_buckets():
print(bucket.name)

List Notifications

List notifications
bucket = storage_client.bucket("bucket_name")
for notification in bucket.list_notifications():
print(notification.notification_id)

Upload a File

Upload a file
bucket = storage_client.bucket("bucket_name")
blob = bucket.blob("destination_file_name")
source_file = BytesIO(b"Some data") # Use a BytesIO object as the "file" to upload

blob.upload_from_file(source_file)

Download a File

Download a file
import io

bucket = storage_client.bucket("bucket_name")
blob = bucket.blob("file_name")
byte_stream = io.BytesIO()

blob.download_to_file(byte_stream)
byte_stream.seek(0)

print(byte_stream.read())