Here's the code I am using:
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context()
context.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
page = context.new_page()
page.on("response", lambda res : print(res.status) if "capture_res" in res.url else None)
with page.expect_response(lambda res: "capture_res" in res.url and res.status==200) as response_info:
page.goto("page_url")
The capture_res
and page_url
are just placeholders. The output I am getting is:
401
401
200
Future exception was never retrieved
future: <Future finished exception=Error('Connection closed')>
playwright._impl._api_types.Error: Connection closed
What exactly am I doing wrong?
This thread is trying to answer question "What is causing the 'Connection closed' error in the provided code and how can it be fixed?"
Use Debug Mode to find where the issue is coming from https://docs.python.org/3.7/library/asyncio-dev.html#debug-mode
Right now, my code looks like this:
import asyncio
import aiohttp
from bs4 import BeautifulSoup as bs
from playwright.async_api import async_playwright
from playwright._impl._api_types import TimeoutError as TOErr
from asyncio.exceptions import InvalidStateError
class TokenFactory:
def __init__(self, url):
self._url = url
self._token = None
self._retry_count = 0
def change_url(self, url):
self._url = url
async def get_token(self):
if self._token is None:
await self.regenerate_token()
return self._token
async def regenerate_token(self):
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False)
context = await browser.new_context()
await context.add_init_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
page = await context.new_page()
async with page.expect_response(lambda res: "testpattern" in res.url and res.status == 200) as response_info:
await page.goto(self._url)
response_value = await response_info.value
self._token = response_value.request.headers.get("x-access-token")
self._retry_count = 0
except (KeyError, TOErr, InvalidStateError):
self._retry_count += 1
if self._retry_count > 5:
raise Exception("Too many retries to regenerate token. Exiting...")
self.regenerate_token()
async def test():
factory = TokenFactory("google.com")
token = await factory.get_token()
print(token)
asyncio.run(test(), debug=True)
Rayrun is a community for QA engineers. I am constantly looking for new ways to add value to people learning Playwright and other browser automation frameworks. If you have feedback, email [email protected].